未验证 提交 9a0f6ae8 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #21595 from taosdata/fix/TS-3247

fix(query/sys): pause/resume with sys scan operator
......@@ -16,13 +16,13 @@
#ifndef TDENGINE_STORAGEAPI_H
#define TDENGINE_STORAGEAPI_H
#include "tsimplehash.h"
#include "tscalablebf.h"
#include "function.h"
#include "index.h"
#include "taosdef.h"
#include "tmsg.h"
#include "tcommon.h"
#include "index.h"
#include "function.h"
#include "tmsg.h"
#include "tscalablebf.h"
#include "tsimplehash.h"
#ifdef __cplusplus
extern "C" {
......@@ -46,7 +46,7 @@ typedef struct SMetaEntry {
int8_t type;
int8_t flags; // TODO: need refactor?
tb_uid_t uid;
char * name;
char* name;
union {
struct {
SSchemaWrapper schemaRow;
......@@ -57,43 +57,45 @@ typedef struct SMetaEntry {
int64_t ctime;
int32_t ttlDays;
int32_t commentLen;
char * comment;
char* comment;
tb_uid_t suid;
uint8_t *pTags;
uint8_t* pTags;
} ctbEntry;
struct {
int64_t ctime;
int32_t ttlDays;
int32_t commentLen;
char * comment;
char* comment;
int32_t ncid; // next column id
SSchemaWrapper schemaRow;
} ntbEntry;
struct {
STSma *tsma;
STSma* tsma;
} smaEntry;
};
uint8_t *pBuf;
uint8_t* pBuf;
} SMetaEntry;
typedef struct SMetaReader {
int32_t flags;
void * pMeta;
SDecoder coder;
SMetaEntry me;
void * pBuf;
int32_t szBuf;
struct SStoreMeta* pAPI;
int32_t flags;
void* pMeta;
SDecoder coder;
SMetaEntry me;
void* pBuf;
int32_t szBuf;
struct SStoreMeta* pAPI;
} SMetaReader;
typedef struct SMTbCursor {
void * pDbc;
void * pKey;
void * pVal;
void* pMeta;
void* pDbc;
void* pKey;
void* pVal;
int32_t kLen;
int32_t vLen;
SMetaReader mr;
int8_t paused;
} SMTbCursor;
typedef struct SRowBuffPos {
......@@ -107,22 +109,22 @@ typedef struct SRowBuffPos {
typedef struct SMetaTableInfo {
int64_t suid;
int64_t uid;
SSchemaWrapper *schema;
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN];
} SMetaTableInfo;
typedef struct SSnapContext {
SMeta * pMeta; // todo remove it
int64_t snapVersion;
void * pCur;
int64_t suid;
int8_t subType;
SHashObj * idVersion;
SHashObj * suidInfo;
SArray * idList;
int32_t index;
bool withMeta;
bool queryMeta; // true-get meta, false-get data
SMeta* pMeta; // todo remove it
int64_t snapVersion;
void* pCur;
int64_t suid;
int8_t subType;
SHashObj* idVersion;
SHashObj* suidInfo;
SArray* idList;
int32_t index;
bool withMeta;
bool queryMeta; // true-get meta, false-get data
} SSnapContext;
typedef struct {
......@@ -139,10 +141,9 @@ typedef struct {
// int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
// bool tqNextBlockInWal(STqReader* pReader, const char* idstr);
// bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
// int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
// SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx);
// int32_t setForSnapShot(SSnapContext *ctx, int64_t uid);
// int32_t destroySnapContext(SSnapContext *ctx);
// int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t
// *uid); SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx); int32_t setForSnapShot(SSnapContext
// *ctx, int64_t uid); int32_t destroySnapContext(SSnapContext *ctx);
// clang-format off
/*-------------------------------------------------new api format---------------------------------------------------*/
......@@ -219,16 +220,16 @@ typedef struct SStoreTqReader {
bool (*tqReaderIsQueriedTable)();
bool (*tqReaderCurrentBlockConsumed)();
struct SWalReader *(*tqReaderGetWalReader)(); // todo remove it
int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it
struct SWalReader* (*tqReaderGetWalReader)(); // todo remove it
int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it
int32_t (*tqReaderSetSubmitMsg)(); // todo remove it
bool (*tqReaderNextBlockFilterOut)();
} SStoreTqReader;
typedef struct SStoreSnapshotFn {
int32_t (*createSnapshot)(SSnapContext *ctx, int64_t uid);
int32_t (*destroySnapshot)(SSnapContext *ctx);
int32_t (*createSnapshot)(SSnapContext* ctx, int64_t uid);
int32_t (*destroySnapshot)(SSnapContext* ctx);
SMetaTableInfo (*getMetaTableInfoFromSnapshot)(SSnapContext* ctx);
int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid);
} SStoreSnapshotFn;
......@@ -252,42 +253,54 @@ int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, in
int32_t payloadLen, double selectivityRatio);
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen);
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t
payloadLen);
*/
typedef struct SStoreMeta {
SMTbCursor *(*openTableMetaCursor)(void *pVnode); // metaOpenTbCursor
void (*closeTableMetaCursor)(SMTbCursor *pTbCur); // metaCloseTbCursor
int32_t (*cursorNext)(SMTbCursor *pTbCur, ETableType jumpTableType); // metaTbCursorNext
int32_t (*cursorPrev)(SMTbCursor *pTbCur, ETableType jumpTableType); // metaTbCursorPrev
int32_t (*getTableTags)(void *pVnode, uint64_t suid, SArray *uidList);
int32_t (*getTableTagsByUid)(void *pVnode, int64_t suid, SArray *uidList);
const void *(*extractTagVal)(const void *tag, int16_t type, STagVal *tagVal); // todo remove it
int32_t (*getTableUidByName)(void *pVnode, char *tbName, uint64_t *uid);
int32_t (*getTableTypeByName)(void *pVnode, char *tbName, ETableType *tbType);
int32_t (*getTableNameByUid)(void *pVnode, uint64_t uid, char *tbName);
bool (*isTableExisted)(void *pVnode, tb_uid_t uid);
int32_t (*metaGetCachedTbGroup)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
int32_t (*metaPutTbGroupToCache)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen);
int32_t (*getCachedTableList)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes);
int32_t (*putCachedTableList)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen, double selectivityRatio);
void *(*storeGetIndexInfo)();
void *(*getInvertIndex)(void* pVnode);
int32_t (*getChildTableList)(void *pVnode, int64_t suid, SArray *list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList
void *storeGetVersionRange;
void *storeGetLastTimestamp;
int32_t (*getTableSchema)(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); // tsdbGetTableSchema
SMTbCursor* (*openTableMetaCursor)(void* pVnode); // metaOpenTbCursor
void (*closeTableMetaCursor)(SMTbCursor* pTbCur); // metaCloseTbCursor
void (*pauseTableMetaCursor)(SMTbCursor* pTbCur); // metaPauseTbCursor
void (*resumeTableMetaCursor)(SMTbCursor* pTbCur, int8_t first); // metaResumeTbCursor
int32_t (*cursorNext)(SMTbCursor* pTbCur, ETableType jumpTableType); // metaTbCursorNext
int32_t (*cursorPrev)(SMTbCursor* pTbCur, ETableType jumpTableType); // metaTbCursorPrev
int32_t (*getTableTags)(void* pVnode, uint64_t suid, SArray* uidList);
int32_t (*getTableTagsByUid)(void* pVnode, int64_t suid, SArray* uidList);
const void* (*extractTagVal)(const void* tag, int16_t type, STagVal* tagVal); // todo remove it
int32_t (*getTableUidByName)(void* pVnode, char* tbName, uint64_t* uid);
int32_t (*getTableTypeByName)(void* pVnode, char* tbName, ETableType* tbType);
int32_t (*getTableNameByUid)(void* pVnode, uint64_t uid, char* tbName);
bool (*isTableExisted)(void* pVnode, tb_uid_t uid);
int32_t (*metaGetCachedTbGroup)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
int32_t (*metaPutTbGroupToCache)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t payloadLen);
int32_t (*getCachedTableList)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
bool* acquireRes);
int32_t (*putCachedTableList)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t payloadLen, double selectivityRatio);
void* (*storeGetIndexInfo)();
void* (*getInvertIndex)(void* pVnode);
int32_t (*getChildTableList)(
void* pVnode, int64_t suid,
SArray* list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList
void* storeGetVersionRange;
void* storeGetLastTimestamp;
int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema
// db name, vgId, numOfTables, numOfSTables
int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
void (*getBasicInfo)(void *pVnode, const char **dbname, int32_t *vgId, int64_t* numOfTables, int64_t* numOfNormalTables);// vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
int32_t (*getNumOfChildTables)(
void* pVnode, int64_t uid,
int64_t* numOfTables); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) &
// metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
int64_t (*getNumOfRowsInMem)(void* pVnode);
/**
......@@ -298,24 +311,24 @@ int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
} SStoreMeta;
typedef struct SStoreMetaReader {
void (*initReader)(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta* pAPI);
void (*clearReader)(SMetaReader *pReader);
void (*readerReleaseLock)(SMetaReader *pReader);
int32_t (*getTableEntryByUid)(SMetaReader *pReader, tb_uid_t uid);
int32_t (*getTableEntryByName)(SMetaReader *pReader, const char *name);
int32_t (*getEntryGetUidCache)(SMetaReader *pReader, tb_uid_t uid);
void (*initReader)(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI);
void (*clearReader)(SMetaReader* pReader);
void (*readerReleaseLock)(SMetaReader* pReader);
int32_t (*getTableEntryByUid)(SMetaReader* pReader, tb_uid_t uid);
int32_t (*getTableEntryByName)(SMetaReader* pReader, const char* name);
int32_t (*getEntryGetUidCache)(SMetaReader* pReader, tb_uid_t uid);
} SStoreMetaReader;
typedef struct SUpdateInfo {
SArray *pTsBuckets;
SArray* pTsBuckets;
uint64_t numBuckets;
SArray *pTsSBFs;
SArray* pTsSBFs;
uint64_t numSBFs;
int64_t interval;
int64_t watermark;
TSKEY minTS;
SScalableBf *pCloseWinSBF;
SHashObj *pMap;
SScalableBf* pCloseWinSBF;
SHashObj* pMap;
uint64_t maxDataVersion;
} SUpdateInfo;
......@@ -334,15 +347,15 @@ typedef struct SStateStore {
int32_t (*streamStateAddIfNotExist)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t (*streamStateReleaseBuf)(SStreamState* pState, const SWinKey* key, void* pVal);
void (*streamStateFreeVal)(void* val);
void (*streamStateFreeVal)(void* val);
int32_t (*streamStatePut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t (*streamStateGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
bool (*streamStateCheck)(SStreamState* pState, const SWinKey* key);
bool (*streamStateCheck)(SStreamState* pState, const SWinKey* key);
int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal);
int32_t (*streamStateDel)(SStreamState* pState, const SWinKey* key);
int32_t (*streamStateClear)(SStreamState* pState);
void (*streamStateSetNumber)(SStreamState* pState, int32_t number);
void (*streamStateSetNumber)(SStreamState* pState, int32_t number);
int32_t (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
......@@ -353,36 +366,37 @@ typedef struct SStateStore {
int32_t (*streamStateCurNext)(SStreamState* pState, SStreamStateCur* pCur);
int32_t (*streamStateCurPrev)(SStreamState* pState, SStreamStateCur* pCur);
SStreamStateCur* (*streamStateGetAndCheckCur)(SStreamState* pState, SWinKey* key);
SStreamStateCur* (*streamStateSeekKeyNext)(SStreamState* pState, const SWinKey* key);
SStreamStateCur* (*streamStateFillSeekKeyNext)(SStreamState* pState, const SWinKey* key);
SStreamStateCur* (*streamStateFillSeekKeyPrev)(SStreamState* pState, const SWinKey* key);
void (*streamStateFreeCur)(SStreamStateCur* pCur);
SStreamStateCur* (*streamStateGetAndCheckCur)(SStreamState* pState, SWinKey* key);
SStreamStateCur* (*streamStateSeekKeyNext)(SStreamState* pState, const SWinKey* key);
SStreamStateCur* (*streamStateFillSeekKeyNext)(SStreamState* pState, const SWinKey* key);
SStreamStateCur* (*streamStateFillSeekKeyPrev)(SStreamState* pState, const SWinKey* key);
void (*streamStateFreeCur)(SStreamStateCur* pCur);
int32_t (*streamStateGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen);
int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen);
int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t (*streamStateSessionDel)(SStreamState* pState, const SSessionKey* key);
int32_t (*streamStateSessionClear)(SStreamState* pState);
int32_t (*streamStateSessionGetKVByCur)(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t (*streamStateStateAddIfNotExist)(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
int32_t (*streamStateSessionGetKeyByRange)(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey);
SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark);
TSKEY (*updateInfoFillBlockData)(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol);
bool (*updateInfoIsUpdated)(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
bool (*updateInfoIsTableInserted)(SUpdateInfo *pInfo, int64_t tbUid);
void (*updateInfoDestroy)(SUpdateInfo *pInfo);
SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark);
TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol);
bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
void (*updateInfoDestroy)(SUpdateInfo* pInfo);
SUpdateInfo* (*updateInfoInitP)(SInterval *pInterval, int64_t watermark);
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo *pInfo);
void (*updateInfoDestoryColseWinSBF)(SUpdateInfo *pInfo);
int32_t (*updateInfoSerialize)(void *buf, int32_t bufLen, const SUpdateInfo *pInfo);
int32_t (*updateInfoDeserialize)(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark);
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);
void (*updateInfoDestoryColseWinSBF)(SUpdateInfo* pInfo);
int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo);
int32_t (*updateInfoDeserialize)(void* buf, int32_t bufLen, SUpdateInfo* pInfo);
SStreamStateCur* (*streamStateSessionSeekKeyNext)(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* (*streamStateSessionSeekKeyCurrentPrev)(SStreamState* pState, const SSessionKey* key);
......@@ -396,11 +410,11 @@ typedef struct SStateStore {
bool (*needClearDiskBuff)(struct SStreamFileState* pFileState);
SStreamState* (*streamStateOpen)(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
void (*streamStateClose)(SStreamState* pState, bool remove);
int32_t (*streamStateBegin)(SStreamState* pState);
int32_t (*streamStateCommit)(SStreamState* pState);
void (*streamStateDestroy)(SStreamState* pState, bool remove);
int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark);
void (*streamStateClose)(SStreamState* pState, bool remove);
int32_t (*streamStateBegin)(SStreamState* pState);
int32_t (*streamStateCommit)(SStreamState* pState);
void (*streamStateDestroy)(SStreamState* pState, bool remove);
int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark);
} SStateStore;
typedef struct SStorageAPI {
......
......@@ -109,6 +109,8 @@ typedef SVCreateTSmaReq SSmaCfg;
SMTbCursor* metaOpenTbCursor(void* pVnode);
void metaCloseTbCursor(SMTbCursor* pTbCur);
void metaPauseTbCursor(SMTbCursor* pTbCur);
void metaResumeTbCursor(SMTbCursor* pTbCur, int8_t first);
int32_t metaTbCursorNext(SMTbCursor* pTbCur, ETableType jumpTableType);
int32_t metaTbCursorPrev(SMTbCursor* pTbCur, ETableType jumpTableType);
......
......@@ -17,8 +17,8 @@
#include "osMemory.h"
#include "tencode.h"
void _metaReaderInit(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI) {
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta *pAPI) {
SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
metaReaderInit(pReader, pMeta, flags);
pReader->pAPI = pAPI;
}
......@@ -71,7 +71,7 @@ _err:
}
bool metaIsTableExist(void *pVnode, tb_uid_t uid) {
SVnode* pVnodeObj = pVnode;
SVnode *pVnodeObj = pVnode;
metaRLock(pVnodeObj->pMeta); // query uid.idx
if (tdbTbGet(pVnodeObj->pMeta->pUidIdx, &uid, sizeof(uid), NULL, NULL) < 0) {
......@@ -143,7 +143,7 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
metaReaderInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
code = metaReaderGetTableEntryByUid(&mr, uid);
if (code < 0) {
metaReaderClear(&mr);
......@@ -179,7 +179,7 @@ int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) {
SMetaReader *pReader = &mr;
// query name.idx
if (tdbTbGet(((SMeta*)pReader->pMeta)->pNameIdx, tbName, strlen(tbName) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
if (tdbTbGet(((SMeta *)pReader->pMeta)->pNameIdx, tbName, strlen(tbName) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
metaReaderClear(&mr);
return -1;
......@@ -195,7 +195,7 @@ int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) {
int metaGetTableTypeByName(void *pVnode, char *tbName, ETableType *tbType) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
metaReaderInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
code = metaGetTableEntryByName(&mr, tbName);
if (code == 0) *tbType = mr.me.type;
......@@ -221,12 +221,13 @@ SMTbCursor *metaOpenTbCursor(void *pVnode) {
return NULL;
}
SVnode* pVnodeObj = pVnode;
metaReaderInit(&pTbCur->mr, pVnodeObj->pMeta, 0);
tdbTbcOpen(pVnodeObj->pMeta->pUidIdx, (TBC **)&pTbCur->pDbc, NULL);
SVnode *pVnodeObj = pVnode;
// metaReaderInit(&pTbCur->mr, pVnodeObj->pMeta, 0);
tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
// tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
pTbCur->pMeta = pVnodeObj->pMeta;
pTbCur->paused = 1;
metaResumeTbCursor(pTbCur, 1);
return pTbCur;
}
......@@ -234,14 +235,45 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) {
if (pTbCur) {
tdbFree(pTbCur->pKey);
tdbFree(pTbCur->pVal);
metaReaderClear(&pTbCur->mr);
if (pTbCur->pDbc) {
tdbTbcClose((TBC *)pTbCur->pDbc);
if (!pTbCur->paused) {
metaReaderClear(&pTbCur->mr);
if (pTbCur->pDbc) {
tdbTbcClose((TBC *)pTbCur->pDbc);
}
}
taosMemoryFree(pTbCur);
}
}
void metaPauseTbCursor(SMTbCursor *pTbCur) {
if (!pTbCur->paused) {
metaReaderClear(&pTbCur->mr);
tdbTbcClose((TBC *)pTbCur->pDbc);
pTbCur->paused = 1;
}
}
void metaResumeTbCursor(SMTbCursor *pTbCur, int8_t first) {
if (pTbCur->paused) {
metaReaderInit(&pTbCur->mr, pTbCur->pMeta, 0);
tdbTbcOpen(((SMeta *)pTbCur->pMeta)->pUidIdx, (TBC **)&pTbCur->pDbc, NULL);
if (first) {
tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
} else {
int c = 0;
tdbTbcMoveTo(pTbCur->pDbc, pTbCur->pKey, pTbCur->kLen, &c);
if (c < 0) {
tdbTbcMoveToPrev(pTbCur->pDbc);
} else {
tdbTbcMoveToNext(pTbCur->pDbc);
}
}
pTbCur->paused = 0;
}
}
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) {
int ret;
void *pBuf;
......@@ -974,7 +1006,7 @@ typedef struct {
} SIdxCursor;
int32_t metaFilterCreateTime(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
SMeta *pMeta = ((SVnode*)pVnode)->pMeta;
SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
SMetaFltParam *param = arg;
int32_t ret = 0;
......@@ -1034,7 +1066,7 @@ END:
}
int32_t metaFilterTableName(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
SMeta *pMeta = ((SVnode*)pVnode)->pMeta;
SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
SMetaFltParam *param = arg;
int32_t ret = 0;
char *buf = NULL;
......@@ -1101,7 +1133,7 @@ END:
return ret;
}
int32_t metaFilterTtl(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
SMeta *pMeta = ((SVnode*)pVnode)->pMeta;
SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
SMetaFltParam *param = arg;
int32_t ret = 0;
char *buf = NULL;
......@@ -1132,7 +1164,7 @@ END:
return 0;
}
int32_t metaFilterTableIds(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
SMeta *pMeta = ((SVnode*)pVnode)->pMeta;
SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
SMetaFltParam *param = arg;
SMetaEntry oStbEntry = {0};
......@@ -1318,7 +1350,7 @@ static int32_t metaGetTableTagByUid(SMeta *pMeta, int64_t suid, int64_t uid, voi
}
int32_t metaGetTableTagsByUids(void *pVnode, int64_t suid, SArray *uidList) {
SMeta* pMeta = ((SVnode*) pVnode)->pMeta;
SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
const int32_t LIMIT = 128;
int32_t isLock = false;
......@@ -1350,8 +1382,8 @@ int32_t metaGetTableTagsByUids(void *pVnode, int64_t suid, SArray *uidList) {
return 0;
}
int32_t metaGetTableTags(void* pVnode, uint64_t suid, SArray *pUidTagInfo) {
SMCtbCursor *pCur = metaOpenCtbCursor(((SVnode*)pVnode)->pMeta, suid, 1);
int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) {
SMCtbCursor *pCur = metaOpenCtbCursor(((SVnode *)pVnode)->pMeta, suid, 1);
// If len > 0 means there already have uids, and we only want the
// tags of the specified tables, of which uid in the uid list. Otherwise, all table tags are retrieved and kept
......@@ -1456,11 +1488,11 @@ _exit:
return code;
}
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t* numOfTables) {
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables) {
int32_t code = 0;
*numOfTables = 0;
SVnode* pVnodeObj = pVnode;
SVnode *pVnodeObj = pVnode;
metaRLock(pVnodeObj->pMeta);
// fast path: search cache
......
......@@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
#include "storageapi.h"
#include "vnodeInt.h"
#include "tstreamUpdate.h"
#include "meta.h"
#include "vnodeInt.h"
static void initTsdbReaderAPI(TsdReader* pReader);
static void initMetadataAPI(SStoreMeta* pMeta);
......@@ -56,10 +56,10 @@ void initTsdbReaderAPI(TsdReader* pReader) {
pReader->tsdReaderResetStatus = tsdbReaderReset;
pReader->tsdReaderGetDataBlockDistInfo = tsdbGetFileBlocksDistInfo;
pReader->tsdReaderGetNumOfInMemRows = tsdbGetNumOfRowsInMemTable; // todo this function should be moved away
pReader->tsdReaderGetNumOfInMemRows = tsdbGetNumOfRowsInMemTable; // todo this function should be moved away
pReader->tsdSetQueryTableList = tsdbSetTableList;
pReader->tsdSetReaderTaskId = (void (*)(void *, const char *))tsdbReaderSetId;
pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId;
}
void initMetadataAPI(SStoreMeta* pMeta) {
......@@ -67,6 +67,8 @@ void initMetadataAPI(SStoreMeta* pMeta) {
pMeta->openTableMetaCursor = metaOpenTbCursor;
pMeta->closeTableMetaCursor = metaCloseTbCursor;
pMeta->pauseTableMetaCursor = metaPauseTbCursor;
pMeta->resumeTableMetaCursor = metaResumeTbCursor;
pMeta->cursorNext = metaTbCursorNext;
pMeta->cursorPrev = metaTbCursorPrev;
......@@ -78,7 +80,7 @@ void initMetadataAPI(SStoreMeta* pMeta) {
pMeta->storeGetIndexInfo = vnodeGetIdx;
pMeta->getInvertIndex = vnodeGetIvtIdx;
pMeta->extractTagVal = (const void *(*)(const void *, int16_t, STagVal *))metaGetTableTagVal;
pMeta->extractTagVal = (const void* (*)(const void*, int16_t, STagVal*))metaGetTableTagVal;
pMeta->getTableTags = metaGetTableTags;
pMeta->getTableTagsByUid = metaGetTableTagsByUids;
......@@ -86,7 +88,7 @@ void initMetadataAPI(SStoreMeta* pMeta) {
pMeta->getTableTypeByName = metaGetTableTypeByName;
pMeta->getTableNameByUid = metaGetTableNameByUid;
pMeta->getTableSchema = tsdbGetTableSchema; // todo refactor
pMeta->getTableSchema = tsdbGetTableSchema; // todo refactor
pMeta->storeGetTableList = vnodeGetTableList;
pMeta->getCachedTableList = metaGetCachedTableUidList;
......@@ -106,7 +108,7 @@ void initTqAPI(SStoreTqReader* pTq) {
pTq->tqReaderNextBlockInWal = tqNextBlockInWal;
pTq->tqNextBlockImpl = tqNextBlockImpl;// todo remove it
pTq->tqNextBlockImpl = tqNextBlockImpl; // todo remove it
pTq->tqReaderAddTables = tqReaderAddTbUidList;
pTq->tqReaderSetQueryTableList = tqReaderSetTbUidList;
......@@ -116,10 +118,10 @@ void initTqAPI(SStoreTqReader* pTq) {
pTq->tqReaderIsQueriedTable = tqReaderIsQueriedTable;
pTq->tqReaderCurrentBlockConsumed = tqCurrentBlockConsumed;
pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it
pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it
pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it
pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it
pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it
pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it
pTq->tqGetResultBlock = tqGetResultBlock;
pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
......@@ -199,7 +201,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateClose = streamStateClose;
pStore->streamStateBegin = streamStateBegin;
pStore->streamStateCommit = streamStateCommit;
pStore->streamStateDestroy= streamStateDestroy;
pStore->streamStateDestroy = streamStateDestroy;
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
}
......@@ -239,4 +241,4 @@ void initSnapshotFn(SStoreSnapshotFn* pSnapshot) {
pSnapshot->destroySnapshot = destroySnapContext;
pSnapshot->getMetaTableInfoFromSnapshot = getMetaTableInfoFromSnapshot;
pSnapshot->getTableInfoFromSnapshot = getTableInfoFromSnapshot;
}
\ No newline at end of file
}
......@@ -25,6 +25,7 @@
#include "tdatablock.h"
#include "tmsg.h"
#include "index.h"
#include "operator.h"
#include "query.h"
#include "querytask.h"
......@@ -32,7 +33,6 @@
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "index.h"
typedef int (*__optSysFilter)(void* a, void* b, int16_t dtype);
typedef int32_t (*__sys_filte)(void* pMeta, SNode* cond, SArray* result);
......@@ -540,12 +540,12 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
int32_t restore = pInfo->restore;
pInfo->restore = false;
while (restore || ((ret = pAPI->metaFn.cursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0)) {
if (restore) {
restore = false;
}
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
......@@ -626,8 +626,8 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
}
static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSysTableScanInfo* pInfo = pOperator->info;
if (pOperator->status == OP_EXEC_DONE) {
......@@ -1100,8 +1100,8 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
}
static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSysTableScanInfo* pInfo = pOperator->info;
......@@ -1288,11 +1288,16 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int8_t firstMetaCursor = 0;
SSysTableScanInfo* pInfo = pOperator->info;
if (pInfo->pCur == NULL) {
pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode);
firstMetaCursor = 1;
}
if (!firstMetaCursor) {
pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0);
}
blockDataCleanup(pInfo->pRes);
......@@ -1436,12 +1441,14 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
numOfRows = 0;
if (pInfo->pRes->info.rows > 0) {
pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur);
break;
}
}
}
if (numOfRows > 0) {
pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur);
p->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows;
......@@ -1485,7 +1492,8 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
} else {
if (pInfo->showRewrite == false) {
if (pCondition != NULL && pInfo->pIdx == NULL) {
SSTabFltArg arg = {.pMeta = pInfo->readHandle.vnode, .pVnode = pInfo->readHandle.vnode, .pAPI = &pTaskInfo->storageAPI};
SSTabFltArg arg = {
.pMeta = pInfo->readHandle.vnode, .pVnode = pInfo->readHandle.vnode, .pAPI = &pTaskInfo->storageAPI};
SSysTableIndex* idx = taosMemoryMalloc(sizeof(SSysTableIndex));
idx->init = 0;
......@@ -1827,7 +1835,7 @@ void destroySysScanOperator(void* param) {
pInfo->pIdx = NULL;
}
if(pInfo->pSchema) {
if (pInfo->pSchema) {
taosHashCleanup(pInfo->pSchema);
pInfo->pSchema = NULL;
}
......@@ -2144,7 +2152,7 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) {
return -1;
}
static int32_t doGetTableRowSize(SReadHandle *pHandle, uint64_t uid, int32_t* rowLen, const char* idstr) {
static int32_t doGetTableRowSize(SReadHandle* pHandle, uint64_t uid, int32_t* rowLen, const char* idstr) {
*rowLen = 0;
SMetaReader mr = {0};
......@@ -2194,17 +2202,17 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
SBlockDistInfo* pBlockScanInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
int32_t code = doGetTableRowSize(&pBlockScanInfo->readHandle, pBlockScanInfo->uid,
(int32_t*)&blockDistInfo.rowSize, GET_TASKID(pTaskInfo));
int32_t code = doGetTableRowSize(&pBlockScanInfo->readHandle, pBlockScanInfo->uid, (int32_t*)&blockDistInfo.rowSize,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
pAPI->tsdReader.tsdReaderGetDataBlockDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
blockDistInfo.numOfInmemRows = (int32_t) pAPI->tsdReader.tsdReaderGetNumOfInMemRows(pBlockScanInfo->pHandle);
blockDistInfo.numOfInmemRows = (int32_t)pAPI->tsdReader.tsdReaderGetNumOfInMemRows(pBlockScanInfo->pHandle);
SSDataBlock* pBlock = pBlockScanInfo->pResBlock;
......@@ -2289,7 +2297,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, (void**)&pInfo->pHandle, pTaskInfo->id.str, false, NULL);
code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock,
(void**)&pInfo->pHandle, pTaskInfo->id.str, false, NULL);
cleanupQueryTableDataCond(&cond);
if (code != 0) {
goto _error;
......@@ -2316,4 +2325,4 @@ _error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
}
\ No newline at end of file
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册