diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 8d4949d9b4364fd1e8c70cbb883aa56468724108..810067d7362b8ab9867f43b042d8dae2158702d8 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -278,7 +278,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); void tdResetDataCols(SDataCols *pCols); int tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); -void tdFreeDataCols(SDataCols *pCols); +void *tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index f21205479396ba606a1212f30350df2e0b3f59b5..3da9dd32b4a0a0748cbd75c1183cdf8ec45ecbf1 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -337,12 +337,13 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { return 0; } -void tdFreeDataCols(SDataCols *pCols) { +SDataCols *tdFreeDataCols(SDataCols *pCols) { if (pCols) { tfree(pCols->buf); tfree(pCols->cols); free(pCols); } + return NULL; } SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { diff --git a/src/os/inc/osMemory.h b/src/os/inc/osMemory.h index 439e4cab72e4192d9c13a02519d268586a06f13e..2cf7e14d2f4bc9fc124cdf4de167c5b2cb93f4bb 100644 --- a/src/os/inc/osMemory.h +++ b/src/os/inc/osMemory.h @@ -35,7 +35,7 @@ void taosDumpMemoryLeak(); void * taosTMalloc(size_t size); void * taosTCalloc(size_t nmemb, size_t size); void * taosTRealloc(void *ptr, size_t size); -void taosTZfree(void *ptr); +void * taosTZfree(void *ptr); size_t taosTSizeof(void *ptr); void taosTMemset(void *ptr, int c); diff --git a/src/os/src/detail/osMemory.c b/src/os/src/detail/osMemory.c index 53310d179c0090382e009de949e5158146dc282a..291a54b6695106ba3b457d148b1439e283d6ceff 100644 --- a/src/os/src/detail/osMemory.c +++ b/src/os/src/detail/osMemory.c @@ -512,8 +512,9 @@ void * taosTRealloc(void *ptr, size_t size) { return (void *)((char *)tptr + sizeof(size_t)); } -void taosTZfree(void *ptr) { +void* taosTZfree(void* ptr) { if (ptr) { - free((void *)((char *)ptr - sizeof(size_t))); + free((void*)((char*)ptr - sizeof(size_t))); } + return NULL; } \ No newline at end of file diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 86c89664458b9e63d57f553aaa07f1a108a29d6c..489d2a4a1046df0601073d0f9b25e1212bdd191b 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -365,6 +365,7 @@ typedef struct { SDFile files[TSDB_FILE_MAX]; } SDFileSet; +#define TSDB_FILE_FULL_NAME(f) TFILE_NAME(&((f)->f)) #define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t)) void tsdbInitDFileSet(SDFileSet* pSet, int vid, int fid, int ver, int level, int id); @@ -632,6 +633,7 @@ static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { #include "tsdbReadImpl.h" +#if 0 // ================= tsdbRWHelper.c typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t; @@ -730,6 +732,8 @@ static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) { } } +#endif + // ================= tsdbScan.c typedef struct { SFileGroup fGroup; diff --git a/src/tsdb/inc/tsdbReadImpl.h b/src/tsdb/inc/tsdbReadImpl.h index 3960ce0b6d89e264143e8b50dd2176ea14745acd..5b099e27e6ba0c0a6cd2430e93e409c40a167201 100644 --- a/src/tsdb/inc/tsdbReadImpl.h +++ b/src/tsdb/inc/tsdbReadImpl.h @@ -78,23 +78,64 @@ typedef struct { struct SReadH { STsdbRepo * pRepo; - SDFileSet * pSet; + SDFileSet rSet; // File set SArray * aBlkIdx; + STable * pTable; // Table info + SBlockIdx * pBlkIdx; int cidx; - STable * pTable; - SBlockIdx * pBlockIdx; SBlockInfo *pBlkInfo; - SBlockData *pBlkData; + SBlockData *pBlkData; // Block info SDataCols * pDCols[2]; void * pBuf; void * pCBuf; }; -#define TSDB_READ_REPO(rh) (rh)->pRepo -#define TSDB_READ_FSET(rh) (rh)->pSet +#define TSDB_READ_REPO(rh) ((rh)->pRepo) +#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh)) +#define TSDB_READ_FSET(rh) &((rh)->rSet) +#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD) +#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA) +#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST) #define TSDB_READ_BUF(rh) (rh)->pBuf #define TSDB_READ_COMP_BUF(rh) (rh)->pCBuf -#define TSDB_READ_FSET_IS_SET(rh) ((rh)->pSet != NULL) + +#define TSDB_BLOCK_STATIS_SIZE(ncols) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM)) + +int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo); +void tsdbDestroyReadH(SReadH *pReadh); +int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet); +void tsdbCloseAndUnsetFSet(SReadH *pReadh); +int tsdbLoadBlockIdx(SReadH *pReadh); +int tsdbSetReadTable(SReadH *pReadh, STable *pTable); +int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); +int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo); +int tsdbLoadBlockDataCols(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo, const int16_t *colIds, + const int numOfColsIds); +int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); +int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); +void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); +void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols); + +static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) { + void * pBuf = *ppBuf; + size_t tsize = taosTSizeof(pBuf); + + if (tsize < size) { + if (tsize == 0) tsize = 1024; + + while (tsize < size) { + tsize *= 2; + } + + *ppBuf = taosTRealloc(pBuf, tsize); + if (*ppBuf == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + } + + return 0; +} #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 276285a3470a7deb2aadef907dfa72be3e90dc2b..f6b9bd5a990b93f63caf0de99e491dee4f3a52ee 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -37,6 +37,12 @@ typedef struct { SDataCols * pDataCols; } SCommitH; +#define TSDB_COMMIT_REPO(ch) TSDB_READ_REPO(&(ch->readh)) +#define TSDB_COMMIT_WRITE_FSET(ch) ((ch)->pWSet) +#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD) +#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA) +#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST) + void *tsdbCommitData(STsdbRepo *pRepo) { if (tsdbStartCommit(pRepo) < 0) { tsdbError("vgId:%d failed to commit data while startting to commit since %s", REPO_ID(pRepo), tstrerror(terrno)); @@ -510,5 +516,150 @@ static int tsdbAppendCommit(SCommitIter *pIter, TSKEY keyEnd) { static int tsdbMergeCommit(SCommitIter *pIter, SBlock *pBlock, TSKEY keyEnd) { // TODO + return 0; +} + +static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, + bool isSuper) { + STsdbCfg * pCfg = &(pHelper->pRepo->config); + SBlockData *pCompData = (SBlockData *)(pHelper->pBuffer); + int64_t offset = 0; + int rowsToWrite = pDataCols->numOfRows; + + ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); + ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true); + + offset = lseek(pFile->fd, 0, SEEK_END); + if (offset < 0) { + tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), TSDB_FILE_NAME(pFile), + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + int nColsNotAllNull = 0; + for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column + SDataCol * pDataCol = pDataCols->cols + ncol; + SBlockCol *pCompCol = pCompData->cols + nColsNotAllNull; + + if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it + continue; + } + + memset(pCompCol, 0, sizeof(*pCompCol)); + + pCompCol->colId = pDataCol->colId; + pCompCol->type = pDataCol->type; + if (tDataTypeDesc[pDataCol->type].getStatisFunc) { + (*tDataTypeDesc[pDataCol->type].getStatisFunc)( + (TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max), + &(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull)); + } + nColsNotAllNull++; + } + + ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); + + // Compress the data if neccessary + int tcol = 0; + int32_t toffset = 0; + int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull); + int32_t lsize = tsize; + int32_t keyLen = 0; + for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { + if (ncol != 0 && tcol >= nColsNotAllNull) break; + + SDataCol * pDataCol = pDataCols->cols + ncol; + SBlockCol *pCompCol = pCompData->cols + tcol; + + if (ncol != 0 && (pDataCol->colId != pCompCol->colId)) continue; + void *tptr = POINTER_SHIFT(pCompData, lsize); + + int32_t flen = 0; // final length + int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite); + + if (pCfg->compression) { + if (pCfg->compression == TWO_STAGE_COMP) { + pHelper->compBuffer = taosTRealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES); + if (pHelper->compBuffer == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } + } + + flen = (*(tDataTypeDesc[pDataCol->type].compFunc))( + (char *)pDataCol->pData, tlen, rowsToWrite, tptr, (int32_t)taosTSizeof(pHelper->pBuffer) - lsize, + pCfg->compression, pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer)); + } else { + flen = tlen; + memcpy(tptr, pDataCol->pData, flen); + } + + // Add checksum + ASSERT(flen > 0); + flen += sizeof(TSCKSUM); + taosCalcChecksumAppend(0, (uint8_t *)tptr, flen); + pFile->info.magic = + taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)), sizeof(TSCKSUM)); + + if (ncol != 0) { + pCompCol->offset = toffset; + pCompCol->len = flen; + tcol++; + } else { + keyLen = flen; + } + + toffset += flen; + lsize += flen; + } + + pCompData->delimiter = TSDB_FILE_DELIMITER; + pCompData->uid = pHelper->tableInfo.uid; + pCompData->numOfCols = nColsNotAllNull; + + taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize); + pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pCompData, tsize - sizeof(TSCKSUM)), + sizeof(TSCKSUM)); + + // Write the whole block to file + if (taosWrite(pFile->fd, (void *)pCompData, lsize) < lsize) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, + TSDB_FILE_NAME(pFile), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // Update pBlock membership vairables + pBlock->last = isLast; + pBlock->offset = offset; + pBlock->algorithm = pCfg->compression; + pBlock->numOfRows = rowsToWrite; + pBlock->len = lsize; + pBlock->keyLen = keyLen; + pBlock->numOfSubBlocks = isSuper ? 1 : 0; + pBlock->numOfCols = nColsNotAllNull; + pBlock->keyFirst = dataColsKeyFirst(pDataCols); + pBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1); + + tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 + " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, + REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, TSDB_FILE_NAME(pFile), (int64_t)(pBlock->offset), + (int)(pBlock->numOfRows), pBlock->len, pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast); + + pFile->info.size += pBlock->len; + // ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR)); + + return 0; + +_err: + return -1; +} + +static int tsdbWriteBlockInfo(SCommitH *pCommih) { + SDFile *pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); + + // TODO + return 0; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c index a05d979f57ce89472f243029a95ac385ea15f1d0..85352b1dcc18775b7a299ab0a3925e7f71b5b465 100644 --- a/src/tsdb/src/tsdbReadImpl.c +++ b/src/tsdb/src/tsdbReadImpl.c @@ -16,17 +16,74 @@ #include "tchecksum.h" #include "tsdbMain.h" +#define TSDB_KEY_COL_OFFSET 0 + +static void tsdbResetReadH(SReadH *pReadh); +static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols); +static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows, + int maxPoints, char *buffer, int bufferSize); +static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, + int numOfColIds); +static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol); + int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) { - // TODO + ASSERT(pReadh != NULL); + + pReadh->pRepo = pRepo; + + pReadh->aBlkIdx = taosArrayInit(sizeof(SBlockIdx), 1024); + if (pReadh->aBlkIdx == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + pReadh->pDCols[0] = tdNewDataCols(); + if (pReadh->pDCols[0] == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestroyReadH(pReadh); + return -1; + } + + pReadh->pDCols[0] = tdNewDataCols(); + if (pReadh->pDCols[0] == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestroyReadH(pReadh); + return -1; + } + + pReadh->pDCols[1] = tdNewDataCols(); + if (pReadh->pDCols[1] == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestroyReadH(pReadh); + return -1; + } + return 0; } void tsdbDestroyReadH(SReadH *pReadh) { - // TODO + if (pReadh == NULL) return; + + pReadh->pCBuf = taosTZfree(pReadh->pCBuf); + pReadh->pBuf = taosTZfree(pReadh->pBuf); + pReadh->pDCols[0] = tdFreeDataCols(pReadh->pDCols[0]); + pReadh->pDCols[1] = tdFreeDataCols(pReadh->pDCols[1]); + pReadh->pBlkData = taosTZfree(pReadh->pBlkData); + pReadh->pBlkInfo = tdFreeDataCols(pReadh->pBlkInfo); + pReadh->cidx = 0; + pReadh->pBlkIdx = NULL; + pReadh->pTable = NULL; + pReadh->aBlkIdx = taosArrayDestroy(pReadh->aBlkIdx); + tsdbCloseDFileSet(TSDB_READ_FSET(pReadh)); + pReadh->pRepo = NULL; } int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet) { - // TODO + tsdbResetReadH(pReadh); + + pReadh->rSet = *pSet; + if (tsdbOpenDFileSet(TSDB_READ_FSET(pReadh), O_RDONLY) < 0) return -1; + return 0; } @@ -35,41 +92,59 @@ void tsdbCloseAndUnsetFSet(SReadH *pReadh) { } int tsdbLoadBlockIdx(SReadH *pReadh) { - SDFile * pDFile = TSDB_DFILE_IN_SET(TSDB_READ_FSET(pReadh)); + SDFile * pHeadf = TSDB_DFILE_IN_SET(TSDB_READ_FSET(pReadh)); SBlockIdx blkIdx; - if (tsdbSeekDFile(pDFile, pDFile->info.offset, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to load SBlockIdx part while seek file %s sinces %s", TSDB_READ_REPO_ID(pReadh), , - tstrerror(terrno)); + ASSERT(taosArrayGetSize(pReadh->aBlkIdx) == 0); + + // No data at all, just return + if (pHeadf->info.offset <= 0) return 0; + + if (tsdbSeekDFile(pHeadf, pHeadf->info.offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to load SBlockIdx part while seek file %s sinces %s, offset:%u len :%u", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pHeadf->info.offset, + pHeadf->info.len); return -1; } - int64_t nread = tsdbReadDFile(pDFile, TSDB_READ_BUF(pReadh), pDFile->info.len); + if (tsdbMakeRoom(&(TSDB_READ_BUF(pReadh)), pHeadf->info.len) < 0) return -1; + + int64_t nread = tsdbReadDFile(pHeadf, TSDB_READ_BUF(pReadh), pHeadf->info.len); if (nread < 0) { - tsdbError("vgId:%d failed to load SBlockIdx part while seek file %s sinces %s", TSDB_READ_REPO_ID(pReadh), , - tstrerror(terrno)); + tsdbError("vgId:%d failed to load SBlockIdx part while read file %s sinces %s, offset:%u len :%u", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pHeadf->info.offset, + pHeadf->info.len); return -1; } - if (nread < pDFile->info.len) { - tsdbError("vgId:%d failed to load SBlockIdx part while seek file %s sinces %s", TSDB_READ_REPO_ID(pReadh), , - tstrerror(terrno)); + if (nread < pHeadf->info.len) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d SBlockIdx part in file %s is corrupted, offset:%u expected bytes:%u read bytes: %" PRId64, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pHeadf->info.offset, pHeadf->info.len, nread); return -1; } - if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), pDFile->info.len)) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), pHeadf->info.len)) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d SBlockIdx part in file %s is corrupted since wrong checksum, offset:%u len :%u", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pHeadf->info.offset, pHeadf->info.len); return -1; } void *ptr = TSDB_READ_BUF(pReadh); - while (POINTER_DISTANCE(ptr, TSDB_READ_BUF(pReadh)) < (pDFile->info.len - sizeof(TSCKSUM))) { + int tsize = 0; + while (POINTER_DISTANCE(ptr, TSDB_READ_BUF(pReadh)) < (pHeadf->info.len - sizeof(TSCKSUM))) { ptr = tsdbDecodeSBlockIdx(ptr, &blkIdx); + ASSERT(ptr != NULL); - if (taosArrayPush(pReadh->aBlcIdx, (void *)(&blkIdx)) < 0) { + if (taosArrayPush(pReadh->aBlkIdx, (void *)(&blkIdx)) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } + + tsize++; + ASSERT(tsize == 1 || ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 2))->tid < + ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 1))->tid) } return 0; @@ -121,22 +196,140 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) { } int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) { - // TODO + ASSERT(pReadh->pBlkIdx != NULL); + + SDFile * pHeadf = TSDB_READ_HEAD_FILE(pReadh); + SBlockIdx *pBlkIdx = pReadh->pBlkIdx; + + if (tsdbSeekDFile(pHeadf, pBlkIdx->offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to load SBlockInfo part while seek file %s to offset %u since %s", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, tstrerror(terrno)); + return -1; + } + + if (tsdbMakeRoom((void **)(&(pReadh->pBlkInfo)), pBlkIdx->len) < 0) return -1; + + int64_t nread = tsdbReadDFile(pHeadf, (void *)(pReadh->pBlkInfo), pBlkIdx->len); + if (nread < 0) { + tsdbError("vgId:%d failed to load SBlockInfo part while read file %s sinces %s, offset:%u len :%u", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pBlkIdx->offset, pBlkIdx->len); + return -1; + } + + if (nread < pBlkIdx->len) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d SBlockInfo part in file %s is corrupted, offset:%u expected bytes:%u read bytes: %" PRId64, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len, nread); + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkInfo), pBlkIdx->len)) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d SBlockInfo part in file %s is corrupted since wrong checksum, offset:%u len :%u", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len); + return -1; + } + + if (pTarget) { + memcpy(pTarget, (void *)pReadh->pBlkInfo, pBlkIdx->len); + } + return 0; } -int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pInfo) { - // TODO +int tsdbLoadBlockData(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo) { + ASSERT(pBlock->numOfSubBlocks > 0); + + const SBlock *iBlock = pBlock; + if (pBlock->numOfSubBlocks > 1) { + if (pBlockInfo) { + iBlock = POINTER_SHIFT(pBlockInfo, pBlock->offset); + } else { + iBlock = POINTER_SHIFT(pReadh->pBlkInfo, pBlock->offset); + } + } + + tdResetDataCols(pReadh->pDCols[0]); + if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[0]) < 0) return -1; + for (int i = 1; i < pBlock->numOfSubBlocks; i++) { + tdResetDataCols(pReadh->pDCols[1]); + iBlock++; + if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1; + if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows) < 0) return -1; + } + + ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows); + ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->keyFirst); + ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->keyLast); + return 0; } -int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pInfo, int16_t *colIds, int numOfColsIds) { - // TODO +int tsdbLoadBlockDataCols(SReadH *pReadh, const SBlock *pBlock, const SBlockInfo *pBlockInfo, const int16_t *colIds, + const int numOfColsIds) { + ASSERT(pBlock->numOfSubBlocks > 0); + + const SBlock *iBlock = pBlock; + if (pBlock->numOfSubBlocks > 1) { + if (pBlockInfo) { + iBlock = POINTER_SHIFT(pBlockInfo, pBlock->offset); + } else { + iBlock = POINTER_SHIFT(pReadh->pBlkInfo, pBlock->offset); + } + } + + tdResetDataCols(pReadh->pDCols[0]); + if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[0], colIds, numOfColsIds) < 0) return -1; + for (int i = 1; i < pBlock->numOfSubBlocks; i++) { + tdResetDataCols(pReadh->pDCols[1]); + iBlock++; + if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1; + if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows) < 0) return -1; + } + + ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows); + ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->keyFirst); + ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->keyLast); + return 0; } int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) { - // TODO + ASSERT(pBlock->numOfSubBlocks <= 1); + + SDFile *pFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh); + + if (tsdbSeekDFile(pFile, pBlock->offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %u since %s", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlock->offset, tstrerror(terrno)); + return -1; + } + + size_t size = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols); + if (tsdbMakeRoom((void **)(&(pReadh->pBlkData), size)) < 0) return -1; + + int64_t nread = tsdbReadDFile(pFile, (void *)(pReadh->pBlkData), size); + if (nread < 0) { + tsdbError("vgId:%d failed to load block statis part while read file %s sinces %s, offset:%" PRId64 " len :%" PRIzu, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pFile), tstrerror(terrno), pBlock->offset, size); + return -1; + } + + if (nread < size) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d block statis part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu + " read bytes: %" PRId64, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pFile), pBlock->offset, size, nread); + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), size)) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pFile), pBlock->offset, size); + return -1; + } + return 0; } @@ -173,3 +366,286 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) { return buf; } + +void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols) { + SBlockData *pBlockData = pReadh->pBlkData; + + for (int i = 0, j = 0; i < numOfCols;) { + if (j >= pBlockData->numOfCols) { + pStatis[i].numOfNull = -1; + i++; + continue; + } + + if (pStatis[i].colId == pBlockData->cols[j].colId) { + pStatis[i].sum = pBlockData->cols[j].sum; + pStatis[i].max = pBlockData->cols[j].max; + pStatis[i].min = pBlockData->cols[j].min; + pStatis[i].maxIndex = pBlockData->cols[j].maxIndex; + pStatis[i].minIndex = pBlockData->cols[j].minIndex; + pStatis[i].numOfNull = pBlockData->cols[j].numOfNull; + i++; + j++; + } else if (pStatis[i].colId < pBlockData->cols[j].colId) { + pStatis[i].numOfNull = -1; + i++; + } else { + j++; + } + } +} + +static void tsdbResetReadH(SReadH *pReadh) { + tdResetDataCols(pReadh->pDCols[0]); + tdResetDataCols(pReadh->pDCols[1]); + pReadh->cidx = 0; + pReadh->pBlkIdx = NULL; + pReadh->pTable = NULL; + taosArrayClear(pReadh->aBlkIdx); + tsdbCloseDFileSet(TSDB_READ_FSET(pReadh)); +} + +static int tsdbLoadBlockDataImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols) { + ASSERT(pBlock->numOfSubBlocks >= 0 && pBlock->numOfSubBlocks <= 1); + + SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_HEAD_FILE(pReadh); + + if (tsdbMakeRoom((void **)(&(pReadh->pBuf)), pBlock->len) < 0) return -1; + + SBlockData *pBlockData = (SBlockData *)(pReadh->pBuf); + + if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to load block data part while seek file %s to offset %u since %s", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlock->offset, tstrerror(terrno)); + return -1; + } + + int64_t nread = tsdbReadDFile(pDFile, pReadh->pBuf, pBlock->len); + if (nread < 0) { + tsdbError("vgId:%d failed to load block data part while read file %s sinces %s, offset:%" PRId64 " len :%d", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), pBlock->offset, pBlock->len); + return -1; + } + + if (nread < pBlock->len) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d block data part in file %s is corrupted, offset:%" PRId64 " expected bytes:%d" PRIzu + " read bytes: %" PRId64, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, pBlock->len, nread); + return -1; + } + + int32_t tsize = TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols); + if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBuf), tsize)) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%d", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), pBlock->offset, tsize); + return -1; + } + + ASSERT(pBlockData->numOfCols == pBlock->numOfCols); + + pDataCols->numOfRows = pBlock->numOfRows; + + // Recover the data + int ccol = 0; // loop iter for SBlockCol object + int dcol = 0; // loop iter for SDataCols object + while (dcol < pDataCols->numOfCols) { + SDataCol *pDataCol = &(pDataCols->cols[dcol]); + if (dcol != 0 && ccol >= pBlockData->numOfCols) { + // Set current column as NULL and forward + dataColSetNEleNull(pDataCol, pBlock->numOfRows, pDataCols->maxPoints); + dcol++; + continue; + } + + int16_t tcolId = 0; + int32_t toffset = TSDB_KEY_COL_OFFSET; + int32_t tlen = pBlock->keyLen; + + if (dcol != 0) { + SBlockCol *pBlockCol = &(pBlockData->cols[ccol]); + tcolId = pBlockCol->colId; + toffset = pBlockCol->offset; + tlen = pBlockCol->len; + } else { + ASSERT(pDataCol->colId == tcolId); + } + + if (tcolId == pDataCol->colId) { + if (pBlock->algorithm == TWO_STAGE_COMP) { + int zsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES; + if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { + zsize += (sizeof(VarDataLenT) * pBlock->numOfRows); + } + + if (tsdbMakeRoom((void **)(&(pReadh->pCBuf)), zsize) < 0) return -1; + } + + if (tsdbCheckAndDecodeColumnData(pDataCol, POINTER_SHIFT(pBlockData, tsize + toffset), tlen, pBlock->algorithm, + pBlock->numOfRows, pDataCols->maxPoints, pReadh->pCBuf, + (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) { + tsdbError("vgId:%d file %s is broken at column %d block offset %" PRId64 " column offset %d", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tcolId, (int64_t)pBlock->offset, toffset); + return -1; + } + if (dcol != 0) ccol++; + dcol++; + } else if (tcolId < pDataCol->colId) { + ccol++; + } else { + // Set current column as NULL and forward + dataColSetNEleNull(pDataCol, pBlock->numOfRows, pDataCols->maxPoints); + dcol++; + } + } + + return 0; +} + +static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows, + int maxPoints, char *buffer, int bufferSize) { + if (!taosCheckChecksumWhole((uint8_t *)content, len)) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + // Decode the data + if (comp) { + // Need to decompress + int tlen = (*(tDataTypeDesc[pDataCol->type].decompFunc))(content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData, + pDataCol->spaceSize, comp, buffer, bufferSize); + if (tlen <= 0) { + tsdbError("Failed to decompress column, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d bufferSize:%d", + len, comp, numOfRows, maxPoints, bufferSize); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + pDataCol->len = tlen; + } else { + // No need to decompress, just memcpy it + pDataCol->len = len - sizeof(TSCKSUM); + memcpy(pDataCol->pData, content, pDataCol->len); + } + + if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { + dataColSetOffset(pDataCol, numOfRows); + } + return 0; +} + +static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, const SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds) { + ASSERT(pBlock->numOfSubBlocks <= 1); + ASSERT(colIds[0] == 0); + + SDFile * pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh); + SBlockCol blockCol = {0}; + + // If only load timestamp column, no need to load SBlockData part + if (numOfColIds > 1 && tsdbLoadBlockStatis(pReadh, pBlock) < 0) return -1; + + pDataCols->numOfRows = pBlock->numOfRows; + + int dcol = 0; + int ccol = 0; + for (int i = 0; i < numOfColIds; i++) { + int16_t colId = colIds[i]; + SDataCol * pDataCol = NULL; + SBlockCol *pBlockCol = NULL; + + while (true) { + if (dcol >= pDataCols->numOfCols) { + pDataCol = NULL; + break; + } + pDataCol = &pDataCols->cols[dcol]; + if (pDataCol->colId > colId) { + pDataCol = NULL; + break; + } else { + dcol++; + if (pDataCol->colId == colId) break; + } + } + + if (pDataCol == NULL) continue; + ASSERT(pDataCol->colId == colId); + + if (colId == 0) { // load the key row + blockCol.colId = colId; + blockCol.len = pBlock->keyLen; + blockCol.type = pDataCol->type; + blockCol.offset = TSDB_KEY_COL_OFFSET; + pBlockCol = &blockCol; + } else { // load non-key rows + while (true) { + if (ccol >= pBlock->numOfCols) { + pBlockCol = NULL; + break; + } + + pBlockCol = &(pReadh->pBlockData->cols[ccol]); + if (pBlockCol->colId > colId) { + pBlockCol = NULL; + break; + } else { + ccol++; + if (pBlockCol->colId == colId) break; + } + } + + if (pBlockCol == NULL) { + dataColSetNEleNull(pDataCol, pBlock->numOfRows, pDataCols->maxPoints); + continue; + } + + ASSERT(pBlockCol->colId == pDataCol->colId); + } + + if (tsdbLoadColData(pReadh, pDFile, pBlock, pBlockCol, pDataCol) < 0) return -1; + } + + return 0; +} + +static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol) { + ASSERT(pDataCol->colId == pBlockCol->colId); + + STsdbRepo *pRepo = TSDB_READ_REPO(pReadh); + STsdbCfg * pCfg = &(pRepo->config); + int tsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES; + + if (tsdbMakeRoom((void **)(&(pReadh->pBuf)), pBlockCol->len) < 0) return -1; + if (tsdbMakeRoom((void **)(&(pReadh->pCBuf)), tsize) < 0) return -1; + + int64_t offset = pBlock->offset + TSDB_BLOCK_STATIS_SIZE(pBlock->numOfCols) + pBlockCol->offset; + if (tsdbSeekDFile(pDFile, offset, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to load block column data while seek file %s to offset %" PRId64 " since %s", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), offset, tstrerror(terrno)); + return -1; + } + + int64_t nread = tsdbReadDFile(pDFile, pReadh->pBuf, pBlockCol->len); + if (nread < 0) { + tsdbError("vgId:%d failed to load block column data while read file %s sinces %s, offset:%" PRId64 " len :%d", + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), offset, pBlockCol->len); + return -1; + } + + if (nread < pBlockCol->len) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + tsdbError("vgId:%d block column data in file %s is corrupted, offset:%" PRId64 " expected bytes:%d" PRIzu + " read bytes: %" PRId64, + TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), offset, pBlockCol->len, nread); + return -1; + } + + if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlock->algorithm, pBlock->numOfRows, + pCfg->maxRowsPerFileBlock, pReadh->pCBuf, (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) { + tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_NAME(pFile), + pBlockCol->colId, offset); + return -1; + } + + return 0; +} \ No newline at end of file diff --git a/src/util/inc/tarray.h b/src/util/inc/tarray.h index d2f564baaf78aa38547b025684d179f281833278..268d5b89dfbe1d0cef0bf76425cf95070a783021 100644 --- a/src/util/inc/tarray.h +++ b/src/util/inc/tarray.h @@ -132,7 +132,7 @@ void taosArrayClear(SArray* pArray); * destroy array list * @param pArray */ -void taosArrayDestroy(SArray* pArray); +void* taosArrayDestroy(SArray* pArray); /** * diff --git a/src/util/src/tarray.c b/src/util/src/tarray.c index 88ce936a0e822f57f0a152503541854413ec4cdc..232b46cf8346d551448056aca231fcc0a85fe331 100644 --- a/src/util/src/tarray.c +++ b/src/util/src/tarray.c @@ -189,13 +189,13 @@ void taosArrayClear(SArray* pArray) { pArray->size = 0; } -void taosArrayDestroy(SArray* pArray) { - if (pArray == NULL) { - return; +void* taosArrayDestroy(SArray* pArray) { + if (pArray) { + free(pArray->pData); + free(pArray); } - free(pArray->pData); - free(pArray); + return NULL; } void taosArrayDestroyEx(SArray* pArray, void (*fp)(void*)) {