未验证 提交 4cd6278f 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #13479 from taosdata/feat/row_refact

feat: vnode multi-version
......@@ -34,6 +34,7 @@ typedef struct SValue SValue;
typedef struct SColVal SColVal;
typedef struct STSRow2 STSRow2;
typedef struct STSRowBuilder STSRowBuilder;
typedef struct SColData SColData;
typedef struct STagVal STagVal;
typedef struct STag STag;
......@@ -41,6 +42,9 @@ typedef struct STag STag;
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t nCols, STSchema **ppTSchema);
void tTSchemaDestroy(STSchema *pTSchema);
// SValue
int tValueCmprFn(const SValue *pValue1, const SValue *pValue2, int8_t type);
// STSRow2
#define COL_VAL_NONE(CID) ((SColVal){.cid = (CID), .isNone = 1})
#define COL_VAL_NULL(CID) ((SColVal){.cid = (CID), .isNull = 1})
......@@ -166,6 +170,12 @@ struct STag {
};
#pragma pack(pop)
struct SColData {
int16_t cid;
uint32_t nData;
uint8_t *pData;
};
#if 1 //================================================================================================================================================
// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap.
#define TD_SUPPORT_BITMAP
......
......@@ -36,8 +36,6 @@ typedef struct {
#define GET_BIT1(p, i) (((p)[(i) / 8] >> ((i) % 8)) & ((uint8_t)1))
#define GET_BIT2(p, i) (((p)[(i) / 4] >> ((i) % 4)) & ((uint8_t)3))
static FORCE_INLINE int tSKVIdxCmprFn(const void *p1, const void *p2);
// SValue
static FORCE_INLINE int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type) {
int32_t n = 0;
......@@ -141,6 +139,11 @@ static FORCE_INLINE int32_t tGetValue(uint8_t *p, SValue *pValue, int8_t type) {
return n;
}
int tValueCmprFn(const SValue *pValue1, const SValue *pValue2, int8_t type) {
// TODO
return 0;
}
// STSRow2 ========================================================================
static void setBitMap(uint8_t *pb, uint8_t v, int32_t idx, uint8_t flags) {
if (pb) {
......
......@@ -36,6 +36,7 @@ target_sources(
# tsdb
"src/tsdb/tsdbCommit.c"
"src/tsdb/tsdbCommit2.c"
"src/tsdb/tsdbFile.c"
"src/tsdb/tsdbFS.c"
"src/tsdb/tsdbOpen.c"
......
......@@ -34,18 +34,97 @@ extern "C" {
typedef struct TSDBROW TSDBROW;
typedef struct TSDBKEY TSDBKEY;
typedef struct TABLEID TABLEID;
typedef struct SDelOp SDelOp;
static int tsdbKeyCmprFn(const void *p1, const void *p2);
// tsdbMemTable2.c ==============================================================================================
typedef struct SMemTable SMemTable;
typedef struct SMemData SMemData;
typedef struct SMemDataIter SMemDataIter;
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable);
void tsdbMemTableDestroy2(SMemTable *pMemTable);
int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk);
int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
/* SMemDataIter */
void tsdbMemDataIterOpen(SMemData *pMemData, TSDBKEY *pKey, int8_t backward, SMemDataIter *pIter);
bool tsdbMemDataIterNext(SMemDataIter *pIter);
void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow);
// tsdbCommit2.c ==============================================================================================
int32_t tsdbBegin2(STsdb *pTsdb);
int32_t tsdbCommit2(STsdb *pTsdb);
// tsdbFile.c ==============================================================================================
typedef int32_t TSDB_FILE_T;
typedef struct SDFInfo SDFInfo;
typedef struct SDFile SDFile;
typedef struct SDFileSet SDFileSet;
// SDFile
void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype);
void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile);
int tsdbOpenDFile(SDFile *pDFile, int flags);
void tsdbCloseDFile(SDFile *pDFile);
int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence);
int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte);
void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm);
int tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset);
int tsdbRemoveDFile(SDFile *pDFile);
int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte);
int tsdbCopyDFile(SDFile *pSrc, SDFile *pDest);
int tsdbEncodeSDFile(void **buf, SDFile *pDFile);
void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile);
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType);
int tsdbUpdateDFileHeader(SDFile *pDFile);
int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo);
int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype, uint32_t *version);
// SDFileSet
void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver);
void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet);
int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet);
void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet);
int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet);
void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet);
int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to);
int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader);
int tsdbUpdateDFileSetHeader(SDFileSet *pSet);
int tsdbScanAndTryFixDFileSet(STsdb *pRepo, SDFileSet *pSet);
void tsdbCloseDFileSet(SDFileSet *pSet);
int tsdbOpenDFileSet(SDFileSet *pSet, int flags);
void tsdbRemoveDFileSet(SDFileSet *pSet);
int tsdbCopyDFileSet(SDFileSet *pSrc, SDFileSet *pDest);
void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey);
// tsdbFS.c ==============================================================================================
typedef struct STsdbFS STsdbFS;
typedef struct SFSIter SFSIter;
typedef struct STsdbFSMeta STsdbFSMeta;
STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg);
void *tsdbFreeFS(STsdbFS *pfs);
int tsdbOpenFS(STsdb *pRepo);
void tsdbCloseFS(STsdb *pRepo);
void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd);
int tsdbEndFSTxn(STsdb *pRepo);
int tsdbEndFSTxnWithError(STsdbFS *pfs);
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta);
// void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction);
void tsdbFSIterSeek(SFSIter *pIter, int fid);
SDFileSet *tsdbFSIterNext(SFSIter *pIter);
int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta);
int tsdbRLockFS(STsdbFS *pFs);
int tsdbWLockFS(STsdbFS *pFs);
int tsdbUnLockFS(STsdbFS *pFs);
// tsdbMemTable ================
typedef struct STsdbRow STsdbRow;
typedef struct STbData STbData;
typedef struct STsdbMemTable STsdbMemTable;
typedef struct SMergeInfo SMergeInfo;
......@@ -56,15 +135,6 @@ void tsdbMemTableDestroy(STsdbMemTable *pMemTable);
int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead,
SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
// tsdbCommit ================
// tsdbFS ================
typedef struct STsdbFS STsdbFS;
// tsdbSma ================
typedef struct SSmaEnv SSmaEnv;
typedef struct SSmaEnvs SSmaEnvs;
// structs
typedef struct {
int minFid;
......@@ -99,7 +169,7 @@ struct STable {
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
int tsdbPrepareCommit(STsdb *pTsdb);
// int tsdbPrepareCommit(STsdb *pTsdb);
typedef enum {
TSDB_FILE_HEAD = 0, // .head
TSDB_FILE_DATA, // .data
......@@ -110,7 +180,7 @@ typedef enum {
TSDB_FILE_META, // meta
} E_TSDB_FILE_T;
typedef struct {
struct SDFInfo {
uint32_t magic;
uint32_t fver;
uint32_t len;
......@@ -119,22 +189,22 @@ typedef struct {
uint32_t offset;
uint64_t size;
uint64_t tombSize;
} SDFInfo;
};
typedef struct {
struct SDFile {
SDFInfo info;
STfsFile f;
TdFilePtr pFile;
uint8_t state;
} SDFile;
};
typedef struct {
struct SDFileSet {
int fid;
int8_t state; // -128~127
uint8_t ver; // 0~255, DFileSet version
uint16_t reserve;
SDFile files[TSDB_FILE_MAX];
} SDFileSet;
};
struct STbData {
tb_uid_t uid;
......@@ -159,11 +229,11 @@ struct STsdbMemTable {
SHashObj *pHashIdx;
};
typedef struct {
struct STsdbFSMeta {
uint32_t version; // Commit version from 0 to increase
int64_t totalPoints; // total points
int64_t totalStorage; // Uncompressed total storage
} STsdbFSMeta;
};
// ==================
typedef struct {
......@@ -176,8 +246,6 @@ struct STsdbFS {
TdThreadRwlock lock;
SFSStatus *cstatus; // current status
SHashObj *metaCache; // meta cache
SHashObj *metaCacheComp; // meta cache for compact
bool intxn;
SFSStatus *nstatus; // new status
};
......@@ -243,31 +311,21 @@ static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator *pIter) {
// tsdbReadImpl
typedef struct SReadH SReadH;
struct TSDBKEY {
int64_t version;
TSKEY ts;
};
typedef struct {
uint64_t suid;
uint64_t uid;
uint32_t len;
uint32_t offset;
uint32_t hasLast : 2;
uint32_t numOfBlocks : 30;
uint64_t uid;
TSKEY maxKey;
TSDBKEY maxKey;
} SBlockIdx;
#ifdef TD_REFACTOR_3
typedef struct {
int64_t last : 1;
int64_t offset : 63;
int32_t algorithm : 8;
int32_t numOfRows : 24;
int32_t len;
int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
int16_t numOfSubBlocks;
int16_t numOfCols; // not including timestamp column
TSKEY keyFirst;
TSKEY keyLast;
} SBlock;
#else
typedef enum {
TSDB_SBLK_VER_0 = 0,
TSDB_SBLK_VER_MAX,
......@@ -290,16 +348,9 @@ typedef struct {
int64_t offset;
uint64_t aggrStat : 1;
uint64_t aggrOffset : 63;
TSKEY keyFirst;
TSKEY keyLast;
} SBlockV0;
#define SBlock SBlockV0 // latest SBlock definition
static FORCE_INLINE bool tsdbIsSupBlock(SBlock *pBlock) { return pBlock->numOfSubBlocks == 1; }
static FORCE_INLINE bool tsdbIsSubBlock(SBlock *pBlock) { return pBlock->numOfSubBlocks == 0; }
#endif
TSDBKEY minKey;
TSDBKEY maxKey;
} SBlock;
typedef struct {
int32_t delimiter; // For recovery usage
......@@ -308,33 +359,13 @@ typedef struct {
SBlock blocks[];
} SBlockInfo;
#ifdef TD_REFACTOR_3
typedef struct {
int16_t colId;
uint16_t bitmap : 1; // 0: no bitmap if all rows are NORM, 1: has bitmap if has NULL/NORM rows
uint16_t reserve : 15;
int32_t len;
uint32_t type : 8;
uint32_t offset : 24;
int64_t sum;
int64_t max;
int64_t min;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
uint8_t offsetH;
char padding[1];
} SBlockCol;
#else
typedef struct {
int16_t colId;
uint16_t type : 6;
uint16_t blen : 10; // 0 no bitmap if all rows are NORM, > 0 bitmap length
uint32_t len; // data length + bitmap length
uint32_t offset;
} SBlockColV0;
#define SBlockCol SBlockColV0 // latest SBlockCol definition
} SBlockCol;
typedef struct {
int16_t colId;
......@@ -344,31 +375,7 @@ typedef struct {
int64_t sum;
int64_t max;
int64_t min;
} SAggrBlkColV0;
#define SAggrBlkCol SAggrBlkColV0 // latest SAggrBlkCol definition
#endif
// Code here just for back-ward compatibility
static FORCE_INLINE void tsdbSetBlockColOffset(SBlockCol *pBlockCol, uint32_t offset) {
#ifdef TD_REFACTOR_3
pBlockCol->offset = offset & ((((uint32_t)1) << 24) - 1);
pBlockCol->offsetH = (uint8_t)(offset >> 24);
#else
pBlockCol->offset = offset;
#endif
}
static FORCE_INLINE uint32_t tsdbGetBlockColOffset(SBlockCol *pBlockCol) {
#ifdef TD_REFACTOR_3
uint32_t offset1 = pBlockCol->offset;
uint32_t offset2 = pBlockCol->offsetH;
return (offset1 | (offset2 << 24));
#else
return pBlockCol->offset;
#endif
}
} SAggrBlkCol;
typedef struct {
int32_t delimiter; // For recovery usage
......@@ -409,8 +416,7 @@ struct SReadH {
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) \
(sizeof(SBlockData) + sizeof(SBlockColV##blkVer) * (ncols) + sizeof(TSCKSUM))
#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM))
static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) {
switch (blkVer) {
......@@ -420,7 +426,7 @@ static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) {
}
}
#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkColV##blkVer) * (ncols) + sizeof(TSCKSUM))
#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkCol) * (ncols) + sizeof(TSCKSUM))
static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) {
switch (blkVer) {
......@@ -518,11 +524,11 @@ static FORCE_INLINE void *taosTZfree(void *ptr) {
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn);
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t minutes, int8_t precision) {
if (key < 0) {
return (int)((key + 1) / tsTickPerMin[precision] / days - 1);
return (int)((key + 1) / tsTickPerMin[precision] / minutes - 1);
} else {
return (int)((key / tsTickPerMin[precision] / days));
return (int)((key / tsTickPerMin[precision] / minutes));
}
}
......@@ -546,7 +552,6 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
#define TSDB_FILE_STATE_OK 0
#define TSDB_FILE_STATE_BAD 1
#define TSDB_FILE_INFO(tf) (&((tf)->info))
#define TSDB_FILE_F(tf) (&((tf)->f))
#define TSDB_FILE_PFILE(tf) ((tf)->pFile)
#define TSDB_FILE_FULL_NAME(tf) (TSDB_FILE_F(tf)->aname)
......@@ -564,7 +569,6 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
typedef int32_t TSDB_FILE_T;
typedef enum {
TSDB_FS_VER_0 = 0,
TSDB_FS_VER_MAX,
......@@ -585,129 +589,9 @@ static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest v
}
}
void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype);
void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile);
int tsdbEncodeSDFile(void **buf, SDFile *pDFile);
void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile);
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType);
int tsdbUpdateDFileHeader(SDFile *pDFile);
int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo);
int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype, uint32_t *version);
static FORCE_INLINE void tsdbSetDFileInfo(SDFile *pDFile, SDFInfo *pInfo) { pDFile->info = *pInfo; }
static FORCE_INLINE int tsdbOpenDFile(SDFile *pDFile, int flags) {
ASSERT(!TSDB_FILE_OPENED(pDFile));
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), flags);
if (pDFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
static FORCE_INLINE void tsdbCloseDFile(SDFile *pDFile) {
if (TSDB_FILE_OPENED(pDFile)) {
taosCloseFile(&pDFile->pFile);
TSDB_FILE_SET_CLOSED(pDFile);
}
}
static FORCE_INLINE int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence) {
// ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t loffset = taosLSeekFile(TSDB_FILE_PFILE(pDFile), offset, whence);
if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return loffset;
}
static FORCE_INLINE int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nwrite = taosWriteFile(pDFile->pFile, buf, nbyte);
if (nwrite < nbyte) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nwrite;
}
static FORCE_INLINE void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm) {
pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM));
}
static FORCE_INLINE int tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t toffset;
if ((toffset = tsdbSeekDFile(pDFile, 0, SEEK_END)) < 0) {
return -1;
}
ASSERT(pDFile->info.size == toffset);
if (offset) {
*offset = toffset;
}
if (tsdbWriteDFile(pDFile, buf, nbyte) < 0) {
return -1;
}
pDFile->info.size += nbyte;
return (int)nbyte;
}
static FORCE_INLINE int tsdbRemoveDFile(SDFile *pDFile) { return tfsRemoveFile(TSDB_FILE_F(pDFile)); }
static FORCE_INLINE int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nread = taosReadFile(pDFile->pFile, buf, nbyte);
if (nread < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nread;
}
static FORCE_INLINE int tsdbCopyDFile(SDFile *pSrc, SDFile *pDest) {
if (tfsCopyFile(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
tsdbSetDFileInfo(pDest, TSDB_FILE_INFO(pSrc));
return 0;
}
// =============== SDFileSet
typedef struct {
int fid;
int8_t state;
uint8_t ver;
uint16_t reserve;
#if 0
SDFInfo info;
#endif
STfsFile f;
TdFilePtr pFile;
} SSFile; // files split by days with fid
#define TSDB_LATEST_FSET_VER 0
#define TSDB_FSET_FID(s) ((s)->fid)
#define TSDB_FSET_STATE(s) ((s)->state)
#define TSDB_FSET_VER(s) ((s)->ver)
......@@ -727,55 +611,6 @@ typedef struct {
} \
} while (0);
void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver);
void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet);
int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet);
void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet);
int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet);
void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet);
int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to);
int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader);
int tsdbUpdateDFileSetHeader(SDFileSet *pSet);
int tsdbScanAndTryFixDFileSet(STsdb *pRepo, SDFileSet *pSet);
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet *pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
tsdbCloseDFile(TSDB_DFILE_IN_SET(pSet, ftype));
}
}
static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet *pSet, int flags) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbOpenDFile(TSDB_DFILE_IN_SET(pSet, ftype), flags) < 0) {
tsdbCloseDFileSet(pSet);
return -1;
}
}
return 0;
}
static FORCE_INLINE void tsdbRemoveDFileSet(SDFileSet *pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
(void)tsdbRemoveDFile(TSDB_DFILE_IN_SET(pSet, ftype));
}
}
static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet *pSrc, SDFileSet *pDest) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbCopyDFile(TSDB_DFILE_IN_SET(pSrc, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) {
tsdbRemoveDFileSet(pDest);
return -1;
}
}
return 0;
}
static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey) {
*minKey = fid * days * tsTickPerMin[precision];
*maxKey = *minKey + days * tsTickPerMin[precision] - 1;
}
static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet *pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) {
......@@ -803,69 +638,26 @@ typedef struct {
#define FS_VERSION(pfs) ((pfs)->cstatus->meta.version)
#define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version)
typedef struct {
struct SFSIter {
int direction;
uint64_t version; // current FS version
STsdbFS *pfs;
int index; // used to position next fset when version the same
int fid; // used to seek when version is changed
SDFileSet *pSet;
} SFSIter;
};
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg);
void *tsdbFreeFS(STsdbFS *pfs);
int tsdbOpenFS(STsdb *pRepo);
void tsdbCloseFS(STsdb *pRepo);
void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd);
int tsdbEndFSTxn(STsdb *pRepo);
int tsdbEndFSTxnWithError(STsdbFS *pfs);
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta);
// void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction);
void tsdbFSIterSeek(SFSIter *pIter, int fid);
SDFileSet *tsdbFSIterNext(SFSIter *pIter);
int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta);
static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) {
int code = taosThreadRwlockRdlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static FORCE_INLINE int tsdbWLockFS(STsdbFS *pFs) {
int code = taosThreadRwlockWrlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
int code = taosThreadRwlockUnlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
struct TSDBROW {
int64_t version;
STSRow2 tsRow;
};
struct TSDBKEY {
int64_t version;
TSKEY ts;
struct TABLEID {
tb_uid_t suid;
tb_uid_t uid;
};
struct SDelOp {
......@@ -875,6 +667,25 @@ struct SDelOp {
SDelOp *pNext;
};
typedef struct {
tb_uid_t suid;
tb_uid_t uid;
int64_t version;
TSKEY sKey;
TSKEY eKey;
} SDelInfo;
struct SMemTable {
STsdb *pTsdb;
int32_t nRef;
TSDBKEY minKey;
TSDBKEY maxKey;
int64_t nRows;
int64_t nDelOp;
SArray *aSkmInfo;
SArray *aMemData;
};
static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1;
TSDBKEY *pKey2 = (TSDBKEY *)p2;
......@@ -894,6 +705,34 @@ static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) {
return 0;
}
typedef struct SMemSkipListNode SMemSkipListNode;
typedef struct SMemSkipList {
uint32_t seed;
int32_t size;
int8_t maxLevel;
int8_t level;
SMemSkipListNode *pHead;
SMemSkipListNode *pTail;
} SMemSkipList;
struct SMemData {
tb_uid_t suid;
tb_uid_t uid;
TSDBKEY minKey;
TSDBKEY maxKey;
SDelOp *delOpHead;
SDelOp *delOpTail;
SMemSkipList sl;
};
struct SMemDataIter {
SMemData *pMemData;
int8_t backward;
TSDBROW *pRow;
SMemSkipListNode *pNode; // current node
TSDBROW row;
};
#endif
#ifdef __cplusplus
......
......@@ -87,7 +87,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp *pMetaRsp);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
......@@ -112,7 +112,7 @@ int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
int tsdbClose(STsdb** pTsdb);
int tsdbBegin(STsdb* pTsdb);
int tsdbCommit(STsdb* pTsdb);
int32_t tsdbCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
......
......@@ -86,7 +86,8 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter,
SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn);
int tsdbBegin(STsdb *pTsdb) {
if (!pTsdb) return 0;
......@@ -100,69 +101,19 @@ int tsdbBegin(STsdb *pTsdb) {
return 0;
}
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
SDiskID did;
SDFileSet nSet = {0};
STsdbFS *pfs = REPO_FS(pRepo);
int level;
ASSERT(pSet->fid >= pRtn->minFid);
level = tsdbGetFidLevel(pSet->fid, pRtn);
if (tfsAllocDisk(pRepo->pVnode->pTfs, level, &did) < 0) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
}
if (did.level > TSDB_FSET_LEVEL(pSet)) {
// Need to move the FSET to higher level
tsdbInitDFileSet(pRepo, &nSet, did, pSet->fid, FS_TXN_VERSION(pfs));
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
tsdbError("vgId:%d, failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno));
return -1;
}
if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
return -1;
}
tsdbInfo("vgId:%d, FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id);
} else {
// On a correct level
if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
return -1;
}
}
return 0;
}
int tsdbPrepareCommit(STsdb *pTsdb) {
if (pTsdb->mem == NULL) return 0;
ASSERT(pTsdb->imem == NULL);
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
return 0;
}
int tsdbCommit(STsdb *pRepo) {
int32_t tsdbCommit(STsdb *pTsdb) {
int32_t code = 0;
SCommitH commith = {0};
SDFileSet *pSet = NULL;
int fid;
// if (pRepo->imem == NULL) return 0;
pRepo->imem = pRepo->mem;
pRepo->mem = NULL;
ASSERT(pTsdb->imem == NULL && pTsdb->mem);
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
tsdbStartCommit(pRepo);
// Resource initialization
if (tsdbInitCommitH(&commith, pRepo) < 0) {
// start commit
tsdbStartCommit(pTsdb);
if (tsdbInitCommitH(&commith, pTsdb) < 0) {
return -1;
}
......@@ -170,14 +121,14 @@ int tsdbCommit(STsdb *pRepo) {
tsdbSeekCommitIter(&commith, commith.rtn.minKey);
while ((pSet = tsdbFSIterNext(&(commith.fsIter)))) {
if (pSet->fid < commith.rtn.minFid) {
tsdbInfo("vgId:%d, FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid,
tsdbInfo("vgId:%d, FSET %d on level %d disk id %d expires, remove it", REPO_ID(pTsdb), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
} else {
break;
}
}
// Loop to commit to each file
// commit
fid = tsdbNextCommitFid(&(commith));
while (true) {
// Loop over both on disk and memory
......@@ -186,7 +137,7 @@ int tsdbCommit(STsdb *pRepo) {
if (pSet && (fid == TSDB_IVLD_FID || pSet->fid < fid)) {
// Only has existing FSET but no memory data to commit in this
// existing FSET, only check if file in correct retention
if (tsdbApplyRtnOnFSet(pRepo, pSet, &(commith.rtn)) < 0) {
if (tsdbApplyRtnOnFSet(pTsdb, pSet, &(commith.rtn)) < 0) {
tsdbDestroyCommitH(&commith);
return -1;
}
......@@ -217,12 +168,64 @@ int tsdbCommit(STsdb *pRepo) {
}
}
// end commit
tsdbDestroyCommitH(&commith);
tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS);
tsdbEndCommit(pTsdb, TSDB_CODE_SUCCESS);
return code;
}
static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
SDiskID did;
SDFileSet nSet = {0};
STsdbFS *pfs = REPO_FS(pRepo);
int level;
ASSERT(pSet->fid >= pRtn->minFid);
level = tsdbGetFidLevel(pSet->fid, pRtn);
if (tfsAllocDisk(pRepo->pVnode->pTfs, level, &did) < 0) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
}
if (did.level > TSDB_FSET_LEVEL(pSet)) {
// Need to move the FSET to higher level
tsdbInitDFileSet(pRepo, &nSet, did, pSet->fid, FS_TXN_VERSION(pfs));
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
tsdbError("vgId:%d, failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno));
return -1;
}
if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
return -1;
}
tsdbInfo("vgId:%d, FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id);
} else {
// On a correct level
if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
return -1;
}
}
return 0;
}
// int tsdbPrepareCommit(STsdb *pTsdb) {
// if (pTsdb->mem == NULL) return 0;
// ASSERT(pTsdb->imem == NULL);
// pTsdb->imem = pTsdb->mem;
// pTsdb->mem = NULL;
// return 0;
// }
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
TSKEY minKey, midKey, maxKey, now;
......@@ -543,8 +546,8 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
return -1;
}
tsdbDebug("vgId:%d, FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo), TSDB_FSET_FID(pSet),
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
tsdbDebug("vgId:%d, FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo),
TSDB_FSET_FID(pSet), TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
} else {
pCommith->isRFileSet = false;
}
......@@ -716,7 +719,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// extern int32_t tsTsdbMetaCompactRatio;
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf,
static int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf,
SBlockIdx *pIdx) {
size_t nSupBlocks;
size_t nSubBlocks;
......@@ -769,7 +772,7 @@ int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray
pIdx->uid = TABLE_UID(pTable);
pIdx->hasLast = pBlock->last ? 1 : 0;
pIdx->maxKey = pBlock->keyLast;
pIdx->maxKey = pBlock->maxKey;
pIdx->numOfBlocks = (uint32_t)nSupBlocks;
pIdx->len = tlen;
pIdx->offset = (uint32_t)offset;
......@@ -777,7 +780,7 @@ int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray
return 0;
}
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
SBlockIdx *pBlkIdx;
size_t nidx = taosArrayGetSize(pIdxA);
int tlen = 0, size;
......@@ -890,7 +893,7 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
return -1;
}
} else {
if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) {
if (tsdbCommitMemData(pCommith, pIter, pBlock->minKey.ts - 1, true) < 0) {
return -1;
}
}
......@@ -985,9 +988,9 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
TSKEY key = *(TSKEY *)arg1;
SBlock *pBlock = (SBlock *)arg2;
if (key < pBlock->keyFirst) {
if (key < pBlock->minKey.ts) {
return -1;
} else if (key > pBlock->keyLast) {
} else if (key > pBlock->maxKey.ts) {
return 1;
} else {
return 0;
......@@ -1011,7 +1014,7 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
* @param ppExBuf
* @return int
*/
int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
SBlockData *pBlockData = NULL;
......@@ -1170,7 +1173,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)));
if (ncol != 0) {
tsdbSetBlockColOffset(pBlockCol, toffset);
pBlockCol->offset = toffset;
pBlockCol->len = flen; // data + bitmaps
pBlockCol->blen = tBitmapsLen;
++tcol;
......@@ -1215,8 +1218,8 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
pBlock->numOfSubBlocks = isSuper ? 1 : 0;
pBlock->numOfCols = nColsNotAllNull;
pBlock->numOfBSma = nColsOfBlockSma;
pBlock->keyFirst = dataColsKeyFirst(pDataCols);
pBlock->keyLast = dataColsKeyLast(pDataCols);
pBlock->minKey.ts = dataColsKeyFirst(pDataCols);
pBlock->maxKey.ts = dataColsKeyLast(pDataCols);
pBlock->aggrStat = aggrStatus;
pBlock->blkVer = SBlockVerLatest;
pBlock->aggrOffset = (uint64_t)offsetAggr;
......@@ -1224,7 +1227,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
tsdbDebug("vgId:%d, uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
REPO_ID(pRepo), TABLE_UID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast);
pBlock->numOfCols, pBlock->minKey.ts, pBlock->maxKey.ts);
return 0;
}
......@@ -1307,7 +1310,7 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
if (bidx == nBlocks - 1) {
keyLimit = pCommith->maxKey;
} else {
keyLimit = pBlock[1].keyFirst - 1;
keyLimit = pBlock[1].minKey.ts - 1;
}
SSkipListIterator titer = *(pIter->pIter);
......@@ -1349,8 +1352,8 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
}
subBlocks[pBlock->numOfSubBlocks] = block;
supBlock = *pBlock;
supBlock.keyFirst = mInfo.keyFirst;
supBlock.keyLast = mInfo.keyLast;
supBlock.minKey.ts = mInfo.keyFirst;
supBlock.maxKey.ts = mInfo.keyLast;
supBlock.numOfSubBlocks++;
supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed;
supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
typedef struct {
SMemTable *pMemTable;
int32_t minutes;
int8_t precision;
TSKEY nCommitKey;
int32_t fid;
TSKEY minKey;
TSKEY maxKey;
SReadH readh;
SDFileSet wSet;
SArray *aBlkIdx;
SArray *aSupBlk;
SArray *aSubBlk;
SArray *aDelInfo;
} SCommitH;
static int32_t tsdbCommitStart(SCommitH *pCHandle, STsdb *pTsdb);
static int32_t tsdbCommitEnd(SCommitH *pCHandle);
static int32_t tsdbCommitImpl(SCommitH *pCHandle);
int32_t tsdbBegin2(STsdb *pTsdb) {
int32_t code = 0;
ASSERT(pTsdb->mem == NULL);
code = tsdbMemTableCreate2(pTsdb, (SMemTable **)&pTsdb->mem);
if (code) {
tsdbError("vgId:%d failed to begin TSDB since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
goto _exit;
}
_exit:
return code;
}
int32_t tsdbCommit2(STsdb *pTsdb) {
int32_t code = 0;
SCommitH ch = {0};
// start to commit
code = tsdbCommitStart(&ch, pTsdb);
if (code) {
goto _exit;
}
// commit
code = tsdbCommitImpl(&ch);
if (code) {
goto _err;
}
// end commit
code = tsdbCommitEnd(&ch);
if (code) {
goto _exit;
}
_exit:
return code;
_err:
tsdbError("vgId:%d failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbCommitStart(SCommitH *pCHandle, STsdb *pTsdb) {
int32_t code = 0;
SMemTable *pMemTable = (SMemTable *)pTsdb->mem;
tsdbInfo("vgId:%d start to commit", TD_VID(pTsdb->pVnode));
// switch to commit
ASSERT(pTsdb->imem == NULL && pTsdb->mem);
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
// open handle
pCHandle->pMemTable = pMemTable;
pCHandle->minutes = pTsdb->keepCfg.days;
pCHandle->precision = pTsdb->keepCfg.precision;
pCHandle->nCommitKey = pMemTable->minKey.ts;
code = tsdbInitReadH(&pCHandle->readh, pTsdb);
if (code) {
goto _err;
}
pCHandle->aBlkIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCHandle->aBlkIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pCHandle->aSupBlk = taosArrayInit(0, sizeof(SBlock));
if (pCHandle->aSupBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pCHandle->aSubBlk = taosArrayInit(0, sizeof(SBlock));
if (pCHandle->aSubBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pCHandle->aDelInfo = taosArrayInit(0, sizeof(SDelInfo));
if (pCHandle->aDelInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
// start FS transaction
tsdbStartFSTxn(pTsdb, 0, 0);
return code;
_err:
return code;
}
static int32_t tsdbCommitEnd(SCommitH *pCHandle) {
int32_t code = 0;
STsdb *pTsdb = pCHandle->pMemTable->pTsdb;
SMemTable *pMemTable = (SMemTable *)pTsdb->imem;
// end transaction
code = tsdbEndFSTxn(pTsdb);
if (code) {
goto _err;
}
// close handle
taosArrayClear(pCHandle->aDelInfo);
taosArrayClear(pCHandle->aSubBlk);
taosArrayClear(pCHandle->aSupBlk);
taosArrayClear(pCHandle->aBlkIdx);
tsdbDestroyReadH(&pCHandle->readh);
// destroy memtable (todo: unref it)
pTsdb->imem = NULL;
tsdbMemTableDestroy2(pMemTable);
tsdbInfo("vgId:%d commit over", TD_VID(pTsdb->pVnode));
return code;
_err:
return code;
}
static int32_t tsdbCommitTableStart(SCommitH *pCHandle) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitTableEnd(SCommitH *pCHandle) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitTable(SCommitH *pCHandle, SMemData *pMemData, SBlockIdx *pBlockIdx) {
int32_t code = 0;
SMemDataIter iter = {0};
// commit table start
code = tsdbCommitTableStart(pCHandle);
if (code) {
goto _err;
}
// commit table impl
if (pMemData && pBlockIdx) {
// TODO
} else if (pMemData) {
// TODO
} else {
// TODO
}
// commit table end
code = tsdbCommitTableEnd(pCHandle);
if (code) {
goto _err;
}
return code;
_err:
return code;
}
static int32_t tsdbTableIdCmprFn(const void *p1, const void *p2) {
TABLEID *pId1 = (TABLEID *)p1;
TABLEID *pId2 = (TABLEID *)p2;
if (pId1->suid < pId2->suid) {
return -1;
} else if (pId1->suid > pId2->suid) {
return 1;
}
if (pId1->uid < pId2->uid) {
return -1;
} else if (pId1->uid > pId2->uid) {
return 1;
}
return 0;
}
static int32_t tsdbWriteBlockIdx(SDFile *pFile, SArray *pArray, uint8_t **ppBuf) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitFileStart(SCommitH *pCHandle) {
int32_t code = 0;
STsdb *pTsdb = pCHandle->pMemTable->pTsdb;
SDFileSet *pSet = NULL;
taosArrayClear(pCHandle->aBlkIdx);
return code;
}
static int32_t tsdbCommitFileEnd(SCommitH *pCHandle) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitFile(SCommitH *pCHandle) {
int32_t code = 0;
SMemData *pMemData;
SBlockIdx *pBlockIdx;
int32_t iMemData;
int32_t nMemData;
int32_t iBlockIdx;
int32_t nBlockIdx;
// commit file start
code = tsdbCommitFileStart(pCHandle);
if (code) {
goto _err;
}
// commit file impl
iMemData = 0;
nMemData = taosArrayGetSize(pCHandle->pMemTable->aMemData);
iBlockIdx = 0;
nBlockIdx = 0; // todo
for (;;) {
if (iMemData >= nMemData && iBlockIdx >= nBlockIdx) break;
pMemData = NULL;
pBlockIdx = NULL;
if (iMemData < nMemData) {
pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iMemData);
}
if (iBlockIdx < nBlockIdx) {
// pBlockIdx = ;
}
if (pMemData && pBlockIdx) {
int32_t c = tsdbTableIdCmprFn(pMemData, pBlockIdx);
if (c < 0) {
iMemData++;
pBlockIdx = NULL;
} else if (c == 0) {
iMemData++;
iBlockIdx++;
} else {
iBlockIdx++;
pMemData = NULL;
}
} else {
if (pMemData) {
iMemData++;
} else {
iBlockIdx++;
}
}
code = tsdbCommitTable(pCHandle, pMemData, pBlockIdx);
if (code) {
goto _err;
}
}
// commit file end
code = tsdbCommitFileEnd(pCHandle);
if (code) {
goto _err;
}
return code;
_err:
return code;
}
static int32_t tsdbCommitData(SCommitH *pCHandle) {
int32_t code = 0;
int32_t fid;
if (pCHandle->pMemTable->nRows == 0) goto _exit;
// loop to commit to each file
for (;;) {
if (pCHandle->nCommitKey == TSKEY_MAX) break;
pCHandle->fid = TSDB_KEY_FID(pCHandle->nCommitKey, pCHandle->minutes, pCHandle->precision);
tsdbGetFidKeyRange(pCHandle->minutes, pCHandle->precision, pCHandle->fid, &pCHandle->minKey, &pCHandle->maxKey);
code = tsdbCommitFile(pCHandle);
if (code) {
goto _err;
}
}
_exit:
return code;
_err:
return code;
}
static int32_t delInfoCmprFn(const void *p1, const void *p2) {
SDelInfo *pDelInfo1 = (SDelInfo *)p1;
SDelInfo *pDelInfo2 = (SDelInfo *)p2;
if (pDelInfo1->suid < pDelInfo2->suid) {
return -1;
} else if (pDelInfo1->suid > pDelInfo2->suid) {
return 1;
}
if (pDelInfo1->uid < pDelInfo2->uid) {
return -1;
} else if (pDelInfo1->uid > pDelInfo2->uid) {
return 1;
}
if (pDelInfo1->version < pDelInfo2->version) {
return -1;
} else if (pDelInfo1->version > pDelInfo2->version) {
return 1;
}
return 0;
}
static int32_t tsdbCommitDelete(SCommitH *pCHandle) {
int32_t code = 0;
SDelInfo delInfo;
SMemData *pMemData;
if (pCHandle->pMemTable->nDelOp == 0) goto _exit;
// load del array (todo)
// loop to append SDelInfo
for (int32_t iMemData = 0; iMemData < taosArrayGetSize(pCHandle->pMemTable->aMemData); iMemData++) {
pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iMemData);
for (SDelOp *pDelOp = pMemData->delOpHead; pDelOp; pDelOp = pDelOp->pNext) {
delInfo = (SDelInfo){.suid = pMemData->suid,
.uid = pMemData->uid,
.version = pDelOp->version,
.sKey = pDelOp->sKey,
.eKey = pDelOp->eKey};
if (taosArrayPush(pCHandle->aDelInfo, &delInfo) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
}
taosArraySort(pCHandle->aDelInfo, delInfoCmprFn);
// write to new file
_exit:
return code;
_err:
return code;
}
static int32_t tsdbCommitCache(SCommitH *pCHandle) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitImpl(SCommitH *pCHandle) {
int32_t code = 0;
// commit data
code = tsdbCommitData(pCHandle);
if (code) {
goto _err;
}
// commit delete
code = tsdbCommitDelete(pCHandle);
if (code) {
goto _err;
}
// commit cache if need (todo)
if (0) {
code = tsdbCommitCache(pCHandle);
if (code) {
goto _err;
}
}
return code;
_err:
return code;
}
\ No newline at end of file
......@@ -37,11 +37,11 @@ static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired);
// static int tsdbProcessExpiredFS(STsdb *pRepo);
// static int tsdbCreateMeta(STsdb *pRepo);
static void tsdbGetRootDir(int repoid, const char* dir, char dirName[]) {
static void tsdbGetRootDir(int repoid, const char *dir, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s", repoid, dir);
}
static void tsdbGetDataDir(int repoid, const char* dir, char dirName[]) {
static void tsdbGetDataDir(int repoid, const char *dir, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/data", repoid, dir);
}
......@@ -216,16 +216,7 @@ STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg) {
return NULL;
}
pfs->metaCache = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (pfs->metaCache == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbFreeFS(pfs);
return NULL;
}
pfs->intxn = false;
pfs->metaCacheComp = NULL;
pfs->nstatus = tsdbNewFSStatus(maxFSet);
if (pfs->nstatus == NULL) {
tsdbFreeFS(pfs);
......@@ -238,8 +229,6 @@ STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg) {
void *tsdbFreeFS(STsdbFS *pfs) {
if (pfs) {
pfs->nstatus = tsdbFreeFSStatus(pfs->nstatus);
taosHashCleanup(pfs->metaCache);
pfs->metaCache = NULL;
pfs->cstatus = tsdbFreeFSStatus(pfs->cstatus);
taosThreadRwlockDestroy(&(pfs->lock));
taosMemoryFree(pfs);
......@@ -963,13 +952,6 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
}
static int tsdbRestoreCurrent(STsdb *pRepo) {
// // Loop to recover mfile
// if (tsdbRestoreMeta(pRepo) < 0) {
// tsdbError("vgId:%d, failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno));
// return -1;
// }
// Loop to recover dfile set
if (tsdbRestoreDFileSet(pRepo) < 0) {
tsdbError("vgId:%d, failed to restore DFileSet since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
......@@ -1052,3 +1034,30 @@ static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired) {
tsdbCloseDFileSet(&fset);
}
}
int tsdbRLockFS(STsdbFS *pFs) {
int code = taosThreadRwlockRdlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
int tsdbWLockFS(STsdbFS *pFs) {
int code = taosThreadRwlockWrlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
int tsdbUnLockFS(STsdbFS *pFs) {
int code = taosThreadRwlockUnlock(&(pFs->lock));
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
\ No newline at end of file
......@@ -25,7 +25,7 @@ static const char *TSDB_FNAME_SUFFIX[] = {
"meta", // TSDB_FILE_META
};
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, const char* dname, char *fname);
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, const char *dname, char *fname);
// static int tsdbRollBackMFile(SMFile *pMFile);
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo);
static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo);
......@@ -448,3 +448,136 @@ static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, c
}
}
}
int tsdbOpenDFile(SDFile *pDFile, int flags) {
ASSERT(!TSDB_FILE_OPENED(pDFile));
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), flags);
if (pDFile->pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
void tsdbCloseDFile(SDFile *pDFile) {
if (TSDB_FILE_OPENED(pDFile)) {
taosCloseFile(&pDFile->pFile);
TSDB_FILE_SET_CLOSED(pDFile);
}
}
int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence) {
// ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t loffset = taosLSeekFile(TSDB_FILE_PFILE(pDFile), offset, whence);
if (loffset < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return loffset;
}
int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nwrite = taosWriteFile(pDFile->pFile, buf, nbyte);
if (nwrite < nbyte) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nwrite;
}
void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm) {
pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM));
}
int tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t toffset;
if ((toffset = tsdbSeekDFile(pDFile, 0, SEEK_END)) < 0) {
return -1;
}
ASSERT(pDFile->info.size == toffset);
if (offset) {
*offset = toffset;
}
if (tsdbWriteDFile(pDFile, buf, nbyte) < 0) {
return -1;
}
pDFile->info.size += nbyte;
return (int)nbyte;
}
int tsdbRemoveDFile(SDFile *pDFile) { return tfsRemoveFile(TSDB_FILE_F(pDFile)); }
int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nread = taosReadFile(pDFile->pFile, buf, nbyte);
if (nread < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nread;
}
int tsdbCopyDFile(SDFile *pSrc, SDFile *pDest) {
if (tfsCopyFile(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pDest->info = pSrc->info;
return 0;
}
void tsdbCloseDFileSet(SDFileSet *pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
tsdbCloseDFile(TSDB_DFILE_IN_SET(pSet, ftype));
}
}
int tsdbOpenDFileSet(SDFileSet *pSet, int flags) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbOpenDFile(TSDB_DFILE_IN_SET(pSet, ftype), flags) < 0) {
tsdbCloseDFileSet(pSet);
return -1;
}
}
return 0;
}
void tsdbRemoveDFileSet(SDFileSet *pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
(void)tsdbRemoveDFile(TSDB_DFILE_IN_SET(pSet, ftype));
}
}
int tsdbCopyDFileSet(SDFileSet *pSrc, SDFileSet *pDest) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbCopyDFile(TSDB_DFILE_IN_SET(pSrc, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) {
tsdbRemoveDFileSet(pDest);
return -1;
}
}
return 0;
}
void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey) {
*minKey = fid * days * tsTickPerMin[precision];
*maxKey = *minKey + days * tsTickPerMin[precision] - 1;
}
\ No newline at end of file
......@@ -15,42 +15,15 @@
#include "tsdb.h"
typedef struct SMemData SMemData;
typedef struct SMemSkipList SMemSkipList;
typedef struct SMemSkipListNode SMemSkipListNode;
struct SMemSkipListNode {
int8_t level;
SMemSkipListNode *forwards[0];
};
struct SMemSkipList {
uint32_t seed;
int32_t size;
int8_t maxLevel;
int8_t level;
SMemSkipListNode *pHead;
SMemSkipListNode *pTail;
};
struct SMemData {
tb_uid_t suid;
typedef struct {
tb_uid_t uid;
TSDBKEY minKey;
TSDBKEY maxKey;
SDelOp *delOpHead;
SDelOp *delOpTail;
SMemSkipList sl;
};
struct SMemTable {
STsdb *pTsdb;
int32_t nRef;
TSDBKEY minKey;
TSDBKEY maxKey;
int64_t nRows;
SArray *pArray; // SArray<SMemData>
};
STSchema *pTSchema;
} SSkmInfo;
#define SL_MAX_LEVEL 5
......@@ -59,14 +32,17 @@ struct SMemTable {
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
#define SL_MOVE_BACKWARD 0x1
#define SL_MOVE_FROM_POS 0x2
static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData);
static int memDataPCmprFn(const void *p1, const void *p2);
static int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
static int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow);
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl);
static void memDataMovePos(SMemData *pMemData, TSDBROW *pRow, int8_t isForward, SMemSkipListNode **pos);
static int32_t memDataPutRow(SVBufPool *pPool, SMemData *pMemData, TSDBROW *pRow, int8_t isForward,
SMemSkipListNode **pos);
static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version,
SVSubmitBlk *pSubmitBlk);
static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags);
// SMemTable ==============================================
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) {
......@@ -83,8 +59,9 @@ int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable) {
pMemTable->minKey = (TSDBKEY){.version = INT64_MAX, .ts = TSKEY_MAX};
pMemTable->maxKey = (TSDBKEY){.version = -1, .ts = TSKEY_MIN};
pMemTable->nRows = 0;
pMemTable->pArray = taosArrayInit(512, sizeof(SMemData *));
if (pMemTable->pArray == NULL) {
pMemTable->nDelOp = 0;
pMemTable->aMemData = taosArrayInit(512, sizeof(SMemData *));
if (pMemTable->aMemData == NULL) {
taosMemoryFree(pMemTable);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
......@@ -99,7 +76,7 @@ _err:
}
void tsdbMemTableDestroy2(SMemTable *pMemTable) {
taosArrayDestroyEx(pMemTable->pArray, NULL /*TODO*/);
taosArrayDestroyEx(pMemTable->aMemData, NULL /*TODO*/);
taosMemoryFree(pMemTable);
}
......@@ -123,30 +100,11 @@ int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmit
}
// do insert
int32_t nt;
int32_t n = 0;
uint8_t *p = pSubmitBlk->pData;
int32_t nRow = 0;
SMemSkipListNode *pos[SL_MAX_LEVEL] = {0};
for (int8_t iLevel = 0; iLevel < SL_MAX_LEVEL; iLevel++) {
pos[iLevel] = pMemData->sl.pTail;
}
while (n < pSubmitBlk->nData) {
nt = tGetTSRow(p + n, &row.tsRow);
n += nt;
ASSERT(n <= pSubmitBlk->nData);
memDataMovePos(pMemData, &row, nRow ? 1 : 0, pos);
code = memDataPutRow(pTsdb->pVnode->inUse, pMemData, &row, nRow ? 1 : 0, pos);
code = tsdbInsertTableDataImpl(pMemTable, pMemData, version, pSubmitBlk);
if (code) {
goto _err;
}
nRow++;
}
return code;
_err:
......@@ -192,6 +150,8 @@ int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_ui
// update the state of pMemTable, pMemData, last and lastrow (todo)
}
pMemTable->nDelOp++;
tsdbDebug("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " sKey:%" PRId64 " eKey:%" PRId64
" since %s",
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
......@@ -204,6 +164,92 @@ _err:
return code;
}
void tsdbMemDataIterOpen(SMemData *pMemData, TSDBKEY *pKey, int8_t backward, SMemDataIter *pIter) {
SMemSkipListNode *pos[SL_MAX_LEVEL];
pIter->pMemData = pMemData;
pIter->backward = backward;
pIter->pRow = NULL;
if (pKey == NULL) {
// create from head or tail
if (backward) {
pIter->pNode = SL_NODE_BACKWARD(pMemData->sl.pTail, 0);
} else {
pIter->pNode = SL_NODE_FORWARD(pMemData->sl.pHead, 0);
}
} else {
// create from a key
if (backward) {
memDataMovePosTo(pMemData, pos, pKey, SL_MOVE_BACKWARD);
pIter->pNode = SL_NODE_BACKWARD(pos[0], 0);
} else {
memDataMovePosTo(pMemData, pos, pKey, 0);
pIter->pNode = SL_NODE_FORWARD(pos[0], 0);
}
}
}
bool tsdbMemDataIterNext(SMemDataIter *pIter) {
SMemSkipListNode *pHead = pIter->pMemData->sl.pHead;
SMemSkipListNode *pTail = pIter->pMemData->sl.pTail;
pIter->pRow = NULL;
if (pIter->backward) {
ASSERT(pIter->pNode != pTail);
if (pIter->pNode == pHead) {
return false;
}
pIter->pNode = SL_NODE_BACKWARD(pIter->pNode, 0);
if (pIter->pNode == pHead) {
return false;
}
} else {
ASSERT(pIter->pNode != pHead);
if (pIter->pNode == pTail) {
return false;
}
pIter->pNode = SL_NODE_FORWARD(pIter->pNode, 0);
if (pIter->pNode == pTail) {
return false;
}
}
return true;
}
void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow) {
if (pIter->pRow) {
*ppRow = pIter->pRow;
} else {
SMemSkipListNode *pHead = pIter->pMemData->sl.pHead;
SMemSkipListNode *pTail = pIter->pMemData->sl.pTail;
if (pIter->backward) {
ASSERT(pIter->pNode != pTail);
if (pIter->pNode == pHead) {
*ppRow = NULL;
} else {
tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row);
*ppRow = &pIter->row;
}
} else {
ASSERT(pIter->pNode != pHead);
if (pIter->pNode == pTail) {
*ppRow = NULL;
} else {
tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row);
*ppRow = &pIter->row;
}
}
}
}
static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, SMemData **ppMemData) {
int32_t code = 0;
int32_t idx = 0;
......@@ -213,9 +259,9 @@ static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_ui
int8_t maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;
// get
idx = taosArraySearchIdx(pMemTable->pArray, &pMemDataT, memDataPCmprFn, TD_GE);
idx = taosArraySearchIdx(pMemTable->aMemData, &pMemDataT, memDataPCmprFn, TD_GE);
if (idx >= 0) {
pMemData = (SMemData *)taosArrayGet(pMemTable->pArray, idx);
pMemData = (SMemData *)taosArrayGet(pMemTable->aMemData, idx);
if (memDataPCmprFn(&pMemDataT, &pMemData) == 0) goto _exit;
}
......@@ -247,7 +293,7 @@ static int32_t tsdbGetOrCreateMemData(SMemTable *pMemTable, tb_uid_t suid, tb_ui
}
if (idx < 0) idx = 0;
if (taosArrayInsert(pMemTable->pArray, idx, &pMemData) == NULL) {
if (taosArrayInsert(pMemTable->aMemData, idx, &pMemData) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
......@@ -310,86 +356,179 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
return level;
}
static void memDataMovePos(SMemData *pMemData, TSDBROW *pRow, int8_t isForward, SMemSkipListNode **pos) {
TSDBKEY *pKey;
static void memDataMovePosTo(SMemData *pMemData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) {
SMemSkipListNode *px;
SMemSkipListNode *pn;
TSDBKEY *pTKey;
int c;
int backward = flags & SL_MOVE_BACKWARD;
int fromPos = flags & SL_MOVE_FROM_POS;
if (isForward) {
// TODO
} else {
SMemSkipListNode *px = pMemData->sl.pTail;
if (backward) {
px = pMemData->sl.pTail;
for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= pMemData->sl.level; iLevel--) {
pos[iLevel] = px;
}
for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) {
if (iLevel < pMemData->sl.level) {
SMemSkipListNode *p = SL_NODE_BACKWARD(px, iLevel);
if (pMemData->sl.level) {
if (fromPos) px = pos[pMemData->sl.level - 1];
while (p != pMemData->sl.pHead) {
pKey = (TSDBKEY *)SL_NODE_DATA(p);
for (int8_t iLevel = pMemData->sl.level - 1; iLevel >= 0; iLevel--) {
pn = SL_NODE_BACKWARD(px, iLevel);
while (pn != pMemData->sl.pHead) {
pTKey = (TSDBKEY *)SL_NODE_DATA(pn);
c = tsdbKeyCmprFn(pKey, pRow);
c = tsdbKeyCmprFn(pTKey, pKey);
if (c <= 0) {
break;
} else {
px = p;
p = SL_NODE_BACKWARD(px, iLevel);
px = pn;
pn = SL_NODE_BACKWARD(px, iLevel);
}
}
pos[iLevel] = px;
}
}
} else {
px = pMemData->sl.pHead;
for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= pMemData->sl.level; iLevel--) {
pos[iLevel] = px;
}
}
static void memMovePosFrom(SMemData *pMemData, SMemSkipListNode *pNode, TSDBROW *pRow, int8_t isForward,
SMemSkipListNode **pos) {
SMemSkipListNode *px = pNode;
TSDBKEY *pKey;
SMemSkipListNode *p;
int c;
if (pMemData->sl.level) {
if (fromPos) px = pos[pMemData->sl.level - 1];
if (isForward) {
} else {
ASSERT(pNode != pMemData->sl.pHead);
for (int8_t iLevel = pMemData->sl.level - 1; iLevel >= 0; iLevel--) {
pn = SL_NODE_FORWARD(px, iLevel);
while (pn != pMemData->sl.pHead) {
pTKey = (TSDBKEY *)SL_NODE_DATA(pn);
for (int8_t iLevel = pMemData->sl.maxLevel - 1; iLevel >= 0; iLevel--) {
p = SL_NODE_BACKWARD(px, iLevel);
while (p != pMemData->sl.pHead) {
pKey = (TSDBKEY *)SL_NODE_DATA(p);
c = tsdbKeyCmprFn(pKey, pRow);
if (c <= 0) {
c = tsdbKeyCmprFn(pTKey, pKey);
if (c >= 0) {
break;
} else {
px = p;
p = SL_NODE_BACKWARD(px, iLevel);
px = pn;
pn = SL_NODE_FORWARD(px, iLevel);
}
}
pos[iLevel] = px;
}
}
}
}
static int32_t memDataPutRow(SVBufPool *pPool, SMemData *pMemData, TSDBROW *pRow, int8_t isForward,
SMemSkipListNode **pos) {
static int32_t memDataDoPut(SMemTable *pMemTable, SMemData *pMemData, SMemSkipListNode **pos, TSDBROW *pRow,
int8_t forward) {
int32_t code = 0;
int8_t level;
SMemSkipListNode *pNode;
SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
// node
level = tsdbMemSkipListRandLevel(&pMemData->sl);
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow));
if (pNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pNode->level = level;
for (int8_t iLevel = 0; iLevel < level; iLevel++) {
SL_NODE_FORWARD(pNode, iLevel) = NULL;
SL_NODE_BACKWARD(pNode, iLevel) = NULL;
}
tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), pRow);
// put
for (int8_t iLevel = 0; iLevel < pNode->level; iLevel++) {
SMemSkipListNode *px = pos[iLevel];
// do the read put
if (isForward) {
// TODO
if (forward) {
SMemSkipListNode *pNext = SL_NODE_FORWARD(px, iLevel);
SL_NODE_FORWARD(pNode, iLevel) = pNext;
SL_NODE_BACKWARD(pNode, iLevel) = px;
SL_NODE_BACKWARD(pNext, iLevel) = pNode;
SL_NODE_FORWARD(px, iLevel) = pNode;
} else {
// TODO
SMemSkipListNode *pPrev = SL_NODE_BACKWARD(px, iLevel);
SL_NODE_FORWARD(pNode, iLevel) = px;
SL_NODE_BACKWARD(pNode, iLevel) = pPrev;
SL_NODE_FORWARD(pPrev, iLevel) = pNode;
SL_NODE_BACKWARD(px, iLevel) = pNode;
}
}
pMemData->sl.size++;
if (pMemData->sl.level < pNode->level) {
pMemData->sl.level = pNode->level;
}
_exit:
return code;
}
static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, SMemData *pMemData, int64_t version,
SVSubmitBlk *pSubmitBlk) {
int32_t code = 0;
int32_t n = 0;
uint8_t *p = pSubmitBlk->pData;
int32_t nRow = 0;
TSDBROW row = {.version = version};
SMemSkipListNode *pos[SL_MAX_LEVEL];
ASSERT(pSubmitBlk->nData);
// backward put first data
n += tGetTSRow(p + n, &row.tsRow);
ASSERT(n <= pSubmitBlk->nData);
memDataMovePosTo(pMemData, pos, &(TSDBKEY){.version = version, .ts = row.tsRow.ts}, SL_MOVE_BACKWARD);
code = memDataDoPut(pMemTable, pMemData, pos, &row, 0);
if (code) {
goto _exit;
}
nRow++;
if (tsdbKeyCmprFn((TSDBKEY *)&row, &pMemData->minKey) < 0) {
pMemData->minKey = *(TSDBKEY *)&row;
}
if (tsdbKeyCmprFn((TSDBKEY *)&row, &pMemTable->minKey) < 0) {
pMemTable->minKey = *(TSDBKEY *)&row;
}
// forward put rest
for (int8_t iLevel = 0; iLevel < pMemData->sl.maxLevel; iLevel++) {
pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
}
while (n < pSubmitBlk->nData) {
n += tGetTSRow(p + n, &row.tsRow);
ASSERT(n <= pSubmitBlk->nData);
memDataMovePosTo(pMemData, pos, &(TSDBKEY){.version = version, .ts = row.tsRow.ts}, SL_MOVE_FROM_POS);
code = memDataDoPut(pMemTable, pMemData, pos, &row, 1);
if (code) {
goto _exit;
}
nRow++;
}
if (tsdbKeyCmprFn((TSDBKEY *)&row, &pMemData->maxKey) > 0) {
pMemData->maxKey = *(TSDBKEY *)&row;
}
if (tsdbKeyCmprFn((TSDBKEY *)&row, &pMemTable->maxKey) > 0) {
pMemTable->maxKey = *(TSDBKEY *)&row;
}
pMemTable->nRows += nRow;
_exit:
return code;
......
......@@ -21,7 +21,7 @@
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
#define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block) \
((SDataBlockInfo){.window = {.skey = (_block)->keyFirst, .ekey = (_block)->keyLast}, \
((SDataBlockInfo){.window = {.skey = (_block)->minKey.ts, .ekey = (_block)->maxKey.ts}, \
.numOfCols = (_block)->numOfCols, \
.rows = (_block)->numOfRows, \
.uid = (_checkInfo)->tableId})
......@@ -1105,12 +1105,12 @@ static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY s
if (numOfBlocks == 1) break;
if (skey > pBlock[midSlot].keyLast) {
if (skey > pBlock[midSlot].maxKey.ts) {
if (numOfBlocks == 2) break;
if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break;
if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].minKey.ts)) break;
firstSlot = midSlot + 1;
} else if (skey < pBlock[midSlot].keyFirst) {
if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].keyLast)) break;
} else if (skey < pBlock[midSlot].minKey.ts) {
if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].maxKey.ts)) break;
lastSlot = midSlot - 1;
} else {
break; // got the slot
......@@ -1177,12 +1177,12 @@ static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, in
int32_t start = binarySearchForBlock(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
int32_t end = start;
if (s > pCompInfo->blocks[start].keyLast) {
if (s > pCompInfo->blocks[start].maxKey.ts) {
return 0;
}
// todo speedup the procedure of located end block
while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
while (end < (int32_t)compIndex->numOfBlocks && (pCompInfo->blocks[end].minKey.ts <= e)) {
end += 1;
}
......@@ -1275,7 +1275,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
pBlock->numOfRows = pCols->numOfRows;
// Convert from TKEY to TSKEY for primary timestamp column if current block has timestamp before 1970-01-01T00:00:00Z
if (pBlock->keyFirst < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
if (pBlock->minKey.ts < 0 && colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
int64_t* src = pCols->cols[0].pData;
for (int32_t i = 0; i < pBlock->numOfRows; ++i) {
src[i] = tdGetKey(src[i]);
......@@ -1287,7 +1287,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
tsdbDebug("%p load file block into buffer, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%" PRId64
" us, %s",
pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime,
pTsdbReadHandle, slotIndex, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->numOfRows, elapsedTime,
pTsdbReadHandle->idStr);
return TSDB_CODE_SUCCESS;
......@@ -1295,7 +1295,8 @@ _error:
pBlock->numOfRows = 0;
tsdbError("%p error occurs in loading file block, index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, %s",
pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pTsdbReadHandle->idStr);
pTsdbReadHandle, slotIndex, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->numOfRows,
pTsdbReadHandle->idStr);
return terrno;
}
......@@ -1423,7 +1424,7 @@ static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBloc
if (asc) {
// query ended in/started from current block
if (pTsdbReadHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
if (pTsdbReadHandle->window.ekey < pBlock->maxKey.ts || pCheckInfo->lastKey > pBlock->minKey.ts) {
if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
*exists = false;
return code;
......@@ -1432,35 +1433,35 @@ static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBloc
SDataCols* pTSCol = pTsdbReadHandle->rhelper.pDCols[0];
assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
if (pCheckInfo->lastKey > pBlock->keyFirst) {
if (pCheckInfo->lastKey > pBlock->minKey.ts) {
cur->pos =
binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
} else {
cur->pos = 0;
}
assert(pCheckInfo->lastKey <= pBlock->keyLast);
assert(pCheckInfo->lastKey <= pBlock->maxKey.ts);
doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
} else { // the whole block is loaded in to buffer
cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
}
} else { // desc order, query ended in current block
if (pTsdbReadHandle->window.ekey > pBlock->keyFirst || pCheckInfo->lastKey < pBlock->keyLast) {
if (pTsdbReadHandle->window.ekey > pBlock->minKey.ts || pCheckInfo->lastKey < pBlock->maxKey.ts) {
if ((code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
*exists = false;
return code;
}
SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0];
if (pCheckInfo->lastKey < pBlock->keyLast) {
if (pCheckInfo->lastKey < pBlock->maxKey.ts) {
cur->pos =
binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pTsdbReadHandle->order);
} else {
cur->pos = pBlock->numOfRows - 1;
}
assert(pCheckInfo->lastKey >= pBlock->keyFirst);
assert(pCheckInfo->lastKey >= pBlock->minKey.ts);
doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
} else {
cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
......@@ -1981,8 +1982,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
cur->pos >= 0 && cur->pos < pBlock->numOfRows);
// Even Multi-Version supported, the records with duplicated TSKEY would be merged inside of tsdbLoadData interface.
TSKEY* tsArray = pCols->cols[0].pData;
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst &&
tsArray[pBlock->numOfRows - 1] == pBlock->keyLast);
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->minKey.ts &&
tsArray[pBlock->numOfRows - 1] == pBlock->maxKey.ts);
bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
int32_t step = ascScan ? 1 : -1;
......@@ -3576,8 +3577,8 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDat
assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
pPrimaryColStatis->numOfNull = 0;
pPrimaryColStatis->min = pBlockInfo->compBlock->keyFirst;
pPrimaryColStatis->max = pBlockInfo->compBlock->keyLast;
pPrimaryColStatis->min = pBlockInfo->compBlock->minKey.ts;
pPrimaryColStatis->max = pBlockInfo->compBlock->maxKey.ts;
pHandle->suppInfo.plist[0] = &pHandle->suppInfo.pstatis[0];
// update the number of NULL data rows
......
......@@ -339,8 +339,8 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
}
ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows);
ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->keyFirst);
ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->keyLast);
ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->minKey.ts);
ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->maxKey.ts);
return 0;
}
......@@ -457,8 +457,8 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
}
ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows);
ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->keyFirst);
ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->keyLast);
ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->minKey.ts);
ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->maxKey.ts);
return 0;
}
......@@ -559,7 +559,7 @@ int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) {
tlen += taosEncodeFixedU8(buf, pIdx->hasLast);
tlen += taosEncodeVariantU32(buf, pIdx->numOfBlocks);
tlen += taosEncodeFixedU64(buf, pIdx->uid);
tlen += taosEncodeFixedU64(buf, pIdx->maxKey);
tlen += taosEncodeFixedU64(buf, pIdx->maxKey.ts);
return tlen;
}
......@@ -579,7 +579,7 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
pIdx->uid = (int64_t)value;
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
pIdx->maxKey = (TSKEY)value;
pIdx->maxKey.ts = (TSKEY)value;
return buf;
}
......@@ -726,7 +726,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
if (dcol != 0) {
pBlockCol = &(pBlockData->cols[ccol]);
tcolId = pBlockCol->colId;
toffset = tsdbGetBlockColOffset(pBlockCol);
toffset = pBlockCol->offset;
tlen = pBlockCol->len;
pDataCol->bitmap = pBlockCol->blen > 0 ? 1 : 0;
} else {
......@@ -942,8 +942,8 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlockCol->len) < 0) return -1;
if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), tsize) < 0) return -1;
int64_t offset = pBlock->offset + tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer) +
tsdbGetBlockColOffset(pBlockCol);
int64_t offset =
pBlock->offset + tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer) + pBlockCol->offset;
if (tsdbSeekDFile(pDFile, offset, SEEK_SET) < 0) {
tsdbError("vgId:%d, failed to load block column data while seek file %s to offset %" PRId64 " since %s",
TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), offset, tstrerror(terrno));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册