From 313fd6b2de8516740ac0ab13f31a2579301bb061 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 5 Jan 2021 11:33:23 +0000 Subject: [PATCH] partial work --- src/tsdb/inc/tsdbMain.h | 53 +---------- src/tsdb/inc/tsdbReadImpl.h | 103 ++++++++++++++++++++ src/tsdb/src/tsdbCommit.c | 184 +++++++++++++++--------------------- src/tsdb/src/tsdbFile.c | 19 +++- src/tsdb/src/tsdbReadImpl.c | 175 ++++++++++++++++++++++++++++++++++ 5 files changed, 376 insertions(+), 158 deletions(-) create mode 100644 src/tsdb/inc/tsdbReadImpl.h create mode 100644 src/tsdb/src/tsdbReadImpl.c diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index feaeea9972..6142fd1880 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -350,6 +350,7 @@ typedef struct { void tsdbInitDFile(SDFile* pDFile, int vid, int fid, int ver, int level, int id, const SDFInfo* pInfo, TSDB_FILE_T ftype); +void tsdbInitDFileWithOld(SDFile* pDFile, SDFile* pOldDFile); int tsdbOpenDFile(SDFile* pDFile, int flags); void tsdbCloseDFile(SDFile* pDFile); int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence); @@ -627,57 +628,9 @@ static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { return pBufBlock; } -// ================= tsdbRWHelper.c -typedef struct { - int32_t tid; - uint32_t len; - uint32_t offset; - uint32_t hasLast : 2; - uint32_t numOfBlocks : 30; - uint64_t uid; - TSKEY maxKey; -} SBlockIdx; - -typedef struct { - int64_t last : 1; - int64_t offset : 63; - int32_t algorithm : 8; - int32_t numOfRows : 24; - int32_t len; - int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols - int16_t numOfSubBlocks; - int16_t numOfCols; // not including timestamp column - TSKEY keyFirst; - TSKEY keyLast; -} SBlock; +#include "tsdbReadImpl.h" -typedef struct { - int32_t delimiter; // For recovery usage - int32_t tid; - uint64_t uid; - SBlock blocks[]; -} SBlockInfo; - -typedef struct { - int16_t colId; - int32_t len; - int32_t type : 8; - int32_t offset : 24; - int64_t sum; - int64_t max; - int64_t min; - int16_t maxIndex; - int16_t minIndex; - int16_t numOfNull; - char padding[2]; -} SBlockCol; - -typedef struct { - int32_t delimiter; // For recovery usage - int32_t numOfCols; // For recovery usage - uint64_t uid; // For recovery usage - SBlockCol cols[]; -} SBlockData; +// ================= tsdbRWHelper.c typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; diff --git a/src/tsdb/inc/tsdbReadImpl.h b/src/tsdb/inc/tsdbReadImpl.h new file mode 100644 index 0000000000..3960ce0b6d --- /dev/null +++ b/src/tsdb/inc/tsdbReadImpl.h @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_READ_IMPL_H_ +#define _TD_TSDB_READ_IMPL_H_ + +#include "taosdef.h" +#include "tdataformat.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SReadH SReadH; + +typedef struct { + int32_t tid; + uint32_t len; + uint32_t offset; + uint32_t hasLast : 2; + uint32_t numOfBlocks : 30; + uint64_t uid; + TSKEY maxKey; +} SBlockIdx; + +typedef struct { + int64_t last : 1; + int64_t offset : 63; + int32_t algorithm : 8; + int32_t numOfRows : 24; + int32_t len; + int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols + int16_t numOfSubBlocks; + int16_t numOfCols; // not including timestamp column + TSKEY keyFirst; + TSKEY keyLast; +} SBlock; + +typedef struct { + int32_t delimiter; // For recovery usage + int32_t tid; + uint64_t uid; + SBlock blocks[]; +} SBlockInfo; + +typedef struct { + int16_t colId; + int32_t len; + int32_t type : 8; + int32_t offset : 24; + int64_t sum; + int64_t max; + int64_t min; + int16_t maxIndex; + int16_t minIndex; + int16_t numOfNull; + char padding[2]; +} SBlockCol; + +typedef struct { + int32_t delimiter; // For recovery usage + int32_t numOfCols; // For recovery usage + uint64_t uid; // For recovery usage + SBlockCol cols[]; +} SBlockData; + +struct SReadH { + STsdbRepo * pRepo; + SDFileSet * pSet; + SArray * aBlkIdx; + int cidx; + STable * pTable; + SBlockIdx * pBlockIdx; + SBlockInfo *pBlkInfo; + SBlockData *pBlkData; + SDataCols * pDCols[2]; + void * pBuf; + void * pCBuf; +}; + +#define TSDB_READ_REPO(rh) (rh)->pRepo +#define TSDB_READ_FSET(rh) (rh)->pSet +#define TSDB_READ_BUF(rh) (rh)->pBuf +#define TSDB_READ_COMP_BUF(rh) (rh)->pCBuf +#define TSDB_READ_FSET_IS_SET(rh) ((rh)->pSet != NULL) + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_TSDB_READ_IMPL_H_*/ \ No newline at end of file diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 62d84b66b6..669c736b74 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -14,6 +14,8 @@ */ #include "tsdbMain.h" +#define TSDB_IVLD_FID INT_MIN + typedef struct { int minFid; int midFid; @@ -77,6 +79,7 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { SCommitH ch = {0}; SFSIter fsIter = {0}; SDFileSet *pOldSet = NULL; + int fid; if (pMem->numOfRows <= 0) return 0; @@ -84,20 +87,30 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { return -1; } - // TODO - int sfid = MIN(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision), 1 /*TODO*/); - int efid = MAX(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision), 1 /*TODO*/); + tsdbSeekCommitIter(ch.iters, pMem->maxTables, ch.rtn.minKey); tsdbInitFSIter(pRepo, &fsIter); pOldSet = tsdbFSIterNext(&fsIter); - for (int fid = sfid; fid <= efid; fid++) { - if (tsdbCommitToFile(pRepo, pOldSet, &ch, fid) < 0) { - tsdbDestroyCommitH(&ch, pMem->maxTables); - return -1; - } - - if (pOldSet != NULL && pOldSet->fid == fid) { + fid = tsdbNextCommitFid(ch.iters, pMem->maxTables); + + while (true) { + if (pOldSet == NULL && fid == TSDB_IVLD_FID) break; + + if (pOldSet == NULL || (fid != TSDB_IVLD_FID && pOldSet->fid > fid)) { + ASSERT(fid >= ch.rtn.minFid); + // commit to new SDFileSet fid + tsdbCommitToFile(pRepo, NULL, &ch, fid); + fid = tsdbNextCommitFid(ch.iters, pMem->maxTables); + } else if (fid != TSDB_IVLD_FID && pOldSet->fid == fid) { + ASSERT(fid >= ch.rtn.minFid); + // commit to fid with old SDFileSet + tsdbCommitToFile(pRepo, pOldSet, &ch, fid); + fid = tsdbNextCommitFid(ch.iters, pMem->maxTables); pOldSet = tsdbFSIterNext(&fsIter); + } else { + // check if pOldSet need to be changed + tsdbCommitToFile(pRepo, pOldSet, &ch, TSDB_IVLD_FID); + pOldSet = tsdbFSIterNext(&fsIter) } } @@ -195,115 +208,64 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS } static int tsdbCommitToFile(STsdbRepo *pRepo, SDFileSet *pOldSet, SCommitH *pch, int fid) { - STsdbCfg * pCfg = &(pRepo->config); - SMemTable *pMem = pRepo->imem; - TSKEY minKey, maxKey; - bool hasData; - SDFileSet rSet, wSet; + SDFileSet rSet; + SDFileSet wSet; + int level; - tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); - hasData = tsdbHasDataToCommit(pch->iters, pMem->maxTables, minKey, maxKey); - - if (pOldSet == NULL || pOldSet->fid != fid) { // need to create SDFileSet and commit - if (!hasData) return 0; - - tsdbInitDFileSet(&wSet, REPO_ID(pRepo), fid, 0/*TODO*/, level, TFS_UNDECIDED_ID); - tsdbOpenDFileSet(&wSet, O_WRONLY | O_CREAT); - tsdbUpdateDFileSetHeader(&wSet); - } else { - int level = tsdbGetFidLevel(fid, &(pch->rtn)); - - // Check if SDFileSet expires - if (level < 0) { - if (hasData) { - tsdbSeekCommitIter(pch->iters, pMem->maxTables, maxKey + 1); - } - return 0; - } - - // TODO: Check if SDFileSet in correct level - if (true /*pOldSet level is not the same as level*/) { - tsdbInitDFileSet(&rSet, REPO_ID(pRepo), fid, 0/*TODO*/, level, TFS_UNDECIDED_ID); - // TODO: check if level is correct - tsdbOpenDFileSet(&wSet, O_WRONLY|O_CREAT); - } - } - - // TODO: close the file set - if (!hasData) { - tsdbUpdateDFileSet(pRepo, &rSet); + if (pOldSet && pOldSet->fid < pch->rtn.minFid) { // file is deleted + ASSERT(fid == TSDB_IVLD_FID); return 0; } - { - // TODO: commit the memory data - } - - if (tsdbUpdateDFileSet(pRepo, &wSet) < 0) { - return -1; - } - - return 0; + // if (pOldSet) { + // ASSERT(fid == TSDB_IVLD_FID || pOldSet->fid == fid); + // if (true /* TODO: pOldSet not in correct level*/) { + // // TODO: Check if pOldSet is on correct level, if not, move it to correct level + // } else { + // tsdbInitDFile(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_HEAD), REPO_ID(pRepo), fid, 0 /*TODO*/, 0 /*TODO*/, 0 + // /*TODO*/, + // NULL, TSDB_FILE_HEAD); + // // TODO: init data + // tsdbInitDFileWithOld(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_DATA), TSDB_DFILE_IN_SET(pOldSet, TSDB_FILE_DATA)); + + // // TODO: init last file + // SDFile *pDFile = TSDB_DFILE_IN_SET(pOldSet, TSDB_FILE_LAST); + // if (pDFile->info->size < 32K) { + + // } else { + + // } + + // tsdbInitDFileWithOld(&oSet, pOldSet); + // pReadSet = &oSet; + // } + // } else { + // ASSERT(fid != TSDB_IVLD_FID); + + // // Create a new file group + // tsdbInitDFileSet(&nSet, REPO_ID(pRepo), fid, 0 /*TODO*/, tsdbGetFidLevel(fid, &(pch->rtn)), TFS_UNDECIDED_ID); + // tsdbOpenDFileSet(&nSet, O_WRONLY | O_CREAT); + // tsdbUpdateDFileSetHeader(&nSet); + // } -#if 0 - STsdbCfg * pCfg = &(pRepo->config); - SMemTable *pMem = pRepo->imem; - TSKEY minKey, maxKey; - SDFileSet oldSet = {0}; - SDFileSet newSet = {0}; - int level; - - tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); - - level = tsdbGetFidLevel(fid, &(pch->rtn)); - if (pOldSet) { // fset exists, check if the file shold be removed or upgrade tier level - if (level < 0) { // if out of data, remove it and ignore expired memory data - tsdbRemoveExpiredDFileSet(pRepo, fid); - tsdbSeekCommitIter(pch->iters, pMem->maxTables, maxKey + 1); - return 0; - } - - // Move the data file set to correct level - tsdbMoveDFileSet(pOldSet, level); - } else { // fset not exist, create the fset - pOldSet = &oldSet; - if (tsdbCreateDFileSet(fid, level, pOldSet) < 0) { - // TODO - return -1; - } + { + // TODO: set rSet and wSet, the read file set and write file set } - if (tsdbHasDataToCommit(pch->iters, pMem->maxTables, minKey, maxKey)) { - if (tsdbSetAndOpenHelperFile(&(pch->whelper), pOldSet, &newSet) < 0) return -1; - - if (tsdbLoadCompIdx(&pch->whelper, NULL) < 0) return -1; - - for (int tid = 0; tid < pMem->maxTables; tid++) { - SCommitIter *pIter = pch->iters + tid; - if (pIter->pTable == NULL) continue; - - if (tsdbSetHelperTable(&(pch->whelper), pIter->pTable, pRepo) < 0) return -1; - - TSDB_RLOCK_TABLE(pIter->pTable); - - if (pIter->pIter != NULL) { // has data in memory to commit - // TODO - } - - TSDB_RUNLOCK_TABLE(pIter->pTable); - - if (tsdbMoveLastBlockIfNeccessary() < 0) return -1; + if (fid == TSDB_IVLD_FID) { + // TODO: copy rSet as wSet + } else { + tsdbSetAndOpenCommitFSet(pch, &rSet, &wSet); - if (tsdbWriteCompInfo() < 0) return -1; + for (int i = 0; i < pMem->maxTable; i++) { + tsdbCommitTableData; + /* code */ } - if (tsdbWriteCompIdx() < 0) return -1; + tsdbCloseAndUnSetCommitFSet(pch); } - tsdbUpdateDFileSet(pRepo, &newSet); - - return 0; -#endif + tsdbUpdateDFileSet(pRepo, &wSet); } static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) { @@ -425,4 +387,12 @@ static int tsdbGetFidLevel(int fid, SRtn *pRtn) { } else { return -1; } +} + +static int tsdbNextCommitFid(SCommitIter *iters, int niters) { + int fid = TSDB_IVLD_FID; + + // TODO + + return fid; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 785933000b..88581e44c4 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -128,6 +128,11 @@ void tsdbInitDFile(SDFile *pDFile, int vid, int fid, int ver, int level, int id, tfsInitFile(&(pDFile->f), level, id, NULL /*TODO*/); } +void tsdbInitDFileWithOld(SDFile *pDFile, SDFile *pOldDFile) { + *pDFile = *pOldDFile; + TSDB_FILE_SET_CLOSED(pDFile); +} + int tsdbOpenDFile(SDFile *pDFile, int flags) { ASSERT(!TSDB_FILE_OPENED(pDFile)); @@ -172,6 +177,18 @@ int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte) { return nwrite; } +int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte) { + ASSERT(TSDB_FILE_OPENED(pDFile)); + + int64_t nread = taosRead(pDFile->fd, buf, nbyte); + if (nread < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return nread; +} + int64_t tsdbTellDFile(SDFile *pDFile) { return tsdbSeekDFile(pDFile, 0, SEEK_CUR); } int tsdbEncodeDFile(void **buf, SDFile *pDFile) { @@ -250,7 +267,7 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) { return 0; } -int tsdbMoveDFileSet(SDFileSet *pOldSet, int tolevel, SDFileSet *pNewSet) { +int tsdbMoveDFileSet(SDFileSet *pOldSet, SDFileSet *pNewSet) { // TODO return 0; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c new file mode 100644 index 0000000000..a05d979f57 --- /dev/null +++ b/src/tsdb/src/tsdbReadImpl.c @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tchecksum.h" +#include "tsdbMain.h" + +int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) { + // TODO + return 0; +} + +void tsdbDestroyReadH(SReadH *pReadh) { + // TODO +} + +int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet) { + // TODO + return 0; +} + +void tsdbCloseAndUnsetFSet(SReadH *pReadh) { + // TODO +} + +int tsdbLoadBlockIdx(SReadH *pReadh) { + SDFile * pDFile = TSDB_DFILE_IN_SET(TSDB_READ_FSET(pReadh)); + SBlockIdx blkIdx; + + if (tsdbSeekDFile(pDFile, pDFile->info.offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to load SBlockIdx part while seek file %s sinces %s", TSDB_READ_REPO_ID(pReadh), , + tstrerror(terrno)); + return -1; + } + + int64_t nread = tsdbReadDFile(pDFile, TSDB_READ_BUF(pReadh), pDFile->info.len); + if (nread < 0) { + tsdbError("vgId:%d failed to load SBlockIdx part while seek file %s sinces %s", TSDB_READ_REPO_ID(pReadh), , + tstrerror(terrno)); + return -1; + } + + if (nread < pDFile->info.len) { + tsdbError("vgId:%d failed to load SBlockIdx part while seek file %s sinces %s", TSDB_READ_REPO_ID(pReadh), , + tstrerror(terrno)); + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), pDFile->info.len)) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + void *ptr = TSDB_READ_BUF(pReadh); + while (POINTER_DISTANCE(ptr, TSDB_READ_BUF(pReadh)) < (pDFile->info.len - sizeof(TSCKSUM))) { + ptr = tsdbDecodeSBlockIdx(ptr, &blkIdx); + + if (taosArrayPush(pReadh->aBlcIdx, (void *)(&blkIdx)) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + } + + return 0; +} + +int tsdbSetReadTable(SReadH *pReadh, STable *pTable) { + STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); + + if (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if (tdInitDataCols(pReadh->pDCols[1], pSchema) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + + size_t size = taosArrayGetSize(pReadh->aBlkIdx); + if (size > 0) { + while (true) { + if (pReadh->cidx >= size) { + pReadh->pBlockIdx = NULL; + break; + } + + SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx); + if (pBlkIdx->tid == TABLE_TID(pTable)) { + if (pBlkIdx->uid == TABLE_UID(pTable)) { + pReadh->pBlockIdx = pBlkIdx; + } else { + pReadh->pBlockIdx = NULL; + } + pReadh->cidx++; + break; + } else if (pBlkIdx->tid > TABLE_TID(pTable)) { + pReadh->pBlockIdx = NULL; + break; + } else { + pReadh->cidx++; + } + } + } else { + pReadh->pBlockIdx = NULL; + } + + return 0; +} + +int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) { + // TODO + return 0; +} + +int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pInfo) { + // TODO + return 0; +} + +int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pInfo, int16_t *colIds, int numOfColsIds) { + // TODO + return 0; +} + +int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) { + // TODO + return 0; +} + +int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) { + int tlen = 0; + + tlen += taosEncodeVariantI32(buf, pIdx->tid); + tlen += taosEncodeVariantU32(buf, pIdx->len); + tlen += taosEncodeVariantU32(buf, pIdx->offset); + tlen += taosEncodeFixedU8(buf, pIdx->hasLast); + tlen += taosEncodeVariantU32(buf, pIdx->numOfBlocks); + tlen += taosEncodeFixedU64(buf, pIdx->uid); + tlen += taosEncodeFixedU64(buf, pIdx->maxKey); + + return tlen; +} + +void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) { + uint8_t hasLast = 0; + uint32_t numOfBlocks = 0; + uint64_t value = 0; + + if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL; + if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL; + if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL; + if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL; + pIdx->hasLast = hasLast; + if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL; + pIdx->numOfBlocks = numOfBlocks; + if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL; + pIdx->uid = (int64_t)value; + if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL; + pIdx->maxKey = (TSKEY)value; + + return buf; +} -- GitLab