diff --git a/src/tsdb/src/tsdbReadUtil.c b/src/tsdb/src/tsdbReadUtil.c index 1eb65016b001f39ee5432fc5969f10859d282ef3..54966daf4db458e9c3e5979b1b117a4eb02469b1 100644 --- a/src/tsdb/src/tsdbReadUtil.c +++ b/src/tsdb/src/tsdbReadUtil.c @@ -12,10 +12,153 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include +#include +#include + #include "tsdbMain.h" +#include "tchecksum.h" typedef struct { - SFileGroup fGroup; - TSKEY minKey; - TSKEY maxKey; + STsdbRepo * pRepo; + SFileGroup fGroup; + TSKEY minKey; + TSKEY maxKey; + SBlockIdx * pBlockIdx; + int nBlockIdx; + uint64_t uid; + int32_t tid; + SBlockInfo *pBlockInfo; + SDataCols * pDataCols[2]; + void * pBuf; + void * pCBuf; } SReadHandle; + +#define TSDB_READ_FILE(pReadH, type) (&((pReadH)->fGroup.files[(type)])) + +int tsdbInitReadHandle(SReadHandle *pReadH, STsdbRepo *pRepo) { + pReadH->pRepo = pRepo; + return 0; +} + +void tsdbDestroyReadHandle(SReadHandle *pReadH) { + // TODO +} + +int tsdbSetAndOpenFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { + STsdbRepo *pRepo = pReadH->pRepo; + pReadH->fGroup = *pFGroup; + + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + pReadH->fGroup.files[type].fd = -1; + } + + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + SFile *pFile = &(pReadH->fGroup.files[type]); + + if (pFile->fname[0] != '\0') { + pFile->fd = open(pFile->fname, O_RDONLY); + if (pFile->fd < 0) { + tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + tsdbCloseAndUnsetFile(pReadH); + return -1; + } + } + } + + return 0; +} + +void tsdbCloseAndUnsetFile(SReadHandle *pReadH) { + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + SFile *pFile = &(pReadH->fGroup.files[type]); + + if (pFile->fd > 0) { + (void)close(pFile->fd); + pFile->fd = -1; + } + } +} + +int tsdbLoadBlockIdx(SReadHandle *pReadH) { + STsdbRepo *pRepo = pReadH->pRepo; + SFile * pFile = &(pReadH->fGroup.files[TSDB_FILE_TYPE_HEAD]); + + if (pFile->fd < 0 || pFile->info.len == 0) { + pReadH->nBlockIdx = 0; + return 0; + } + + if (tsdbAllocBuf(&(pReadH->pBuf), pFile->info.len) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (lseek(pFile->fd, pFile->info.offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + ssize_t ret = taosTRead(pFile->fd, pReadH->pBuf, pFile->info.len); + if (ret < 0) { + tsdbError("vgId:%d failed to read block idx part from file %s since %s", REPO_ID(pRepo), pFile->fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (ret < pFile->info.len || !taosCheckChecksumWhole((uint8_t *)pReadH->pBuf, pFile->info.len)) { + tsdbError("vgId:%d block idx part is corrupted in file %s, offset %u len %u", REPO_ID(pRepo), pFile->fname, + pFile->info.offset, pFile->info.len); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + if (tsdbDecodeBlockIdxArray(pReadH) < 0) { + tsdbError("vgId:%d error occurs while decoding block idx part from file %s", REPO_ID(pRepo), pFile->fname); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + return 0; +} + +static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) { + void *pBuf = pReadH->pBuf; + SFile *pFile = &(pReadH->fGroup.files[TSDB_FILE_TYPE_HEAD]); + + pReadH->nBlockIdx = 0; + while (POINTER_DISTANCE(pBuf, pReadH->pBuf) < (int)(pFile->info.len - sizeof(TSCKSUM))) { + if (tsdbAllocBuf(&((void *)(pReadH->pBlockIdx), sizeof(SBlockIdx) * (pReadH->nBlockIdx + 1))) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + pBuf = tsdbDecodeBlockIdx(pBuf, &(pReadH->pBlockIdx[pReadH->nBlockIdx])); + if (pBuf == NULL) { + tsdbError("vgId:%d failed to decode block idx part from file %s", REPO_ID(pRepo), pFile->fname); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + pReadH->nBlockIdx++; + ASSERT(pReadH->nBlockIdx == 1 || (pReadH->pBlockIdx[pReadH->nBlockIdx-1].tid < (pReadH->pBlockIdx[pReadH->nBlockIdx-2].tid)); + } + return 0; +} + +static int tsdbAllocBuf(void **ppBuf, int size) { + void *pBuf = *pBuf; + + int tsize = taosTSizeof(pBuf); + if (tsize == 0) tsize = 1024; + + while (tsize < size) { + tsize *= 2; + } + + *ppBuf = taosTRealloc(pBuf, tsize); + if (*ppBuf == NULL) return -1; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbUtil.c b/src/tsdb/src/tsdbUtil.c index 68ce673331ce8a5db0784b5a496d944365816253..6ea28e580e2db677686272534669971221a709e1 100644 --- a/src/tsdb/src/tsdbUtil.c +++ b/src/tsdb/src/tsdbUtil.c @@ -68,4 +68,40 @@ int tsdbGetNextSeqNum(int currentNum) { } else { return 0; } +} + +int tsdbEncodeBlockIdx(void **buf, SBlockIdx *pBlockIdx) { + int tlen = 0; + + tlen += taosEncodeVariantI32(buf, pBlockIdx->tid); + tlen += taosEncodeVariantU32(buf, pBlockIdx->len); + tlen += taosEncodeVariantU32(buf, pBlockIdx->offset); + tlen += taosEncodeFixedU8(buf, pBlockIdx->hasLast); + tlen += taosEncodeVariantU32(buf, pBlockIdx->numOfBlocks); + tlen += taosEncodeFixedU64(buf, pBlockIdx->uid); + tlen += taosEncodeFixedU64(buf, pBlockIdx->maxKey); + + return tlen; +} + +void *tsdbDecodeBlockIdx(void *buf, SBlockIdx *pBlockIdx) { + uint8_t hasLast = 0; + uint32_t numOfBlocks = 0; + uint64_t uid = 0; + uint64_t maxKey = 0; + + if ((buf = taosDecodeVariantI32(buf, &(pBlockIdx->tid))) == NULL) return NULL; + if ((buf = taosDecodeVariantU32(buf, &(pBlockIdx->len))) == NULL) return NULL; + if ((buf = taosDecodeVariantU32(buf, &(pBlockIdx->offset))) == NULL) return NULL; + if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL; + if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL; + if ((buf = taosDecodeFixedU64(buf, &uid)) == NULL) return NULL; + if ((buf = taosDecodeFixedU64(buf, &maxKey)) == NULL) return NULL; + + pBlockIdx->hasLast = hasLast; + pBlockIdx->numOfBlocks = numOfBlocks; + pBlockIdx->uid = value; + pBlockIdx->maxKey = (TSKEY)maxKey; + + return buf; } \ No newline at end of file