From d2b5bf71e4e46041f8538bc7dd1195dc743f1331 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 17 Oct 2020 09:46:44 +0000 Subject: [PATCH] refactor more code --- src/tsdb/inc/tsdbMain.h | 24 +- src/tsdb/src/tsdbCommit.c | 461 ++++++++++++++++++++++++++---------- src/tsdb/src/tsdbReadUtil.c | 2 - src/tsdb/src/tsdbUtil.c | 19 +- 4 files changed, 362 insertions(+), 144 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index da58b9e83d..ebff9a0935 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -286,8 +286,6 @@ typedef struct { typedef struct { STsdbRepo* pRepo; SFileGroup fGroup; - TSKEY minKey; - TSKEY maxKey; SBlockIdx* pBlockIdx; int nBlockIdx; int cBlockIdx; @@ -301,6 +299,7 @@ typedef struct { } SReadHandle; #define TSDB_BLOCK_DATA_LEN(nCols) (sizeof(SBlockData) + sizeof(SBlockCol) * (nCols) + sizeof(TSCKSUM)) +#define TSDB_BLOCK_INFO_LEN(nBlocks) (sizeof(SBlockInfo) + sizeof(SBlock) * (nBlocks) + sizeof(TSCKSUM)) // Operations // ------------------ tsdbMeta.c @@ -509,8 +508,27 @@ int tsdbLoadBlockDataCols(SReadHandle* pReadH, SBlock* pBlock, SBlockInfo* pBloc int tsdbLoadBlockDataInfo(SReadHandle* pReadH, SBlock* pBlock); #define TSDB_FILE_IN_FGROUP(pGroup, type) (&((pGroup)->files[(type)])) +#define TSDB_KEY_BEYOND_RANGE(key, maxKey) ((key) < 0 || (key) > (maxKey)) -int tsdbAllocBuf(void **ppBuf, uint32_t size); +static FORCE_INLINE int tsdbAllocBuf(void **ppBuf, uint32_t size) { + ASSERT(size > 0); + + void *pBuf = *pBuf; + + uint32_t tsize = taosTSizeof(pBuf); + if (tsize >= size) return 0; + + if (tsize == 0) tsize = 1024; + while (tsize < size) { + tsize *= 2; + } + + *ppBuf = taosTRealloc(pBuf, tsize); + if (*ppBuf == NULL) return -1; +} + +int tsdbEncodeBlockIdx(void** buf, SBlockIdx* pBlockIdx); +void* tsdbDecodeBlockIdx(void* buf, SBlockIdx* pBlockIdx); #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index ea36c55d0d..ee294f33a5 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -17,8 +17,9 @@ #include #include -#include "tsdbMain.h" #include "tchecksum.h" +#include "tscompression.h" +#include "tsdbMain.h" #define TSDB_DATA_FILE_CHANGE 0 #define TSDB_META_FILE_CHANGE 1 @@ -31,11 +32,15 @@ typedef struct { SCommitIter *pIters; SReadHandle *pReadH; SFileGroup * pFGroup; + TSKEY minKey; + TSKEY maxKey; SBlockIdx * pBlockIdx; int nBlockIdx; + SBlockIdx newBlockIdx; SBlockInfo * pBlockInfo; + int nBlocks; SBlock * pSubBlock; - int nSubBlock; + int nSubBlocks; SDataCols * pDataCols; } STSCommitHandle; @@ -164,24 +169,28 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) { if (pTSCh == NULL) return -1; // Seek skip over data beyond retention - tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); - tsdbSeekTSCommitHandle(pTSCh, maxKey); + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, mfid, &minKey, &maxKey); + tsdbSeekTSCommitHandle(pTSCh, minKey); // Commit Time-Series data file by file int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); for (int fid = sfid; fid <= efid; fid++) { + // Skip files beyond retention if (fid < mfid) continue; - if (!tsdbHasDataToCommit(tsCommitH.pIters, pMem->maxTables, minKey, maxKey)) continue; + if (!tsdbHasDataToCommit(pTSCh, minKey, maxKey)) continue; + // TODO : set pOldGroup and pNewGroup + SFileGroup *pOldGroup = NULL; + SFileGroup *pNewGroup = NULL; if (tsdbLogTSFileChange(pCommitH, fid) < 0) { tsdbFreeTSCommitHandle(pTSCh); return -1; } - if (tsdbCommitToFileGroup(pRepo, NULL, NULL, &tsCommitH) < 0) { + if (tsdbCommitToFileGroup(pTSCh, pOldGroup, pNewGroup) < 0) { tsdbFreeTSCommitHandle(pTSCh); return -1; } @@ -295,8 +304,8 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) { free(iters); } -static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileGroup *pNewGroup, STSCommitHandle *pTSCh) { - SCommitIter *iters = pTSCh->pIters; +static int tsdbCommitToFileGroup(STSCommitHandle *pTSCh, SFileGroup *pOldGroup, SFileGroup *pNewGroup) { + SCommitIter *pIters = pTSCh->pIters; if (tsdbSetAndOpenCommitFGroup(pTSCh, pOldGroup, pNewGroup) < 0) return -1; @@ -306,7 +315,7 @@ static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileG } for (int tid = 1; tid < pTSCh->maxIters; tid++) { - SCommitIter *pIter = iters + tid; + SCommitIter *pIter = pIters + tid; if (pIter->pTable == NULL) continue; if (tsdbCommitTableData(pTSCh, tid) < 0) { @@ -320,12 +329,20 @@ static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileG return -1; } + if (tsdbUpdateFileGroupInfo(pNewGroup) < 0) { + tsdbCloseAndUnsetCommitFGroup(pTSCh, true /* hasError = true */); + return -1; + } + tsdbCloseAndUnsetCommitFGroup(pTSCh, false /* hasError = false */); return 0; } -static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { +static int tsdbHasDataToCommit(STSCommitHandle *pTSCh, TSKEY minKey, TSKEY maxKey) { + int nIters = pTSCh->maxIters; + SCommitIter *iters = pTSCh->pIters; + for (int i = 0; i < nIters; i++) { SCommitIter *pIter = iters + i; if (pIter->pTable == NULL) continue; @@ -619,6 +636,7 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { SCommitIter *pIter = pTSCh->pIters + tid; SReadHandle *pReadH = pTSCh->pReadH; SDataCols * pDataCols = pTSCh->pDataCols; + TSKEY keyNext = tsdbNextIterKey(pIter->pIter); taosRLockLatch(&(pIter->pTable->latch)); @@ -627,7 +645,13 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { return -1; } - if (tsdbLoadBlockInfo(pReadH) < 0) { + if (pReadH->pCurBlockIdx == NULL && TSDB_KEY_BEYOND_RANGE(keyNext, pTSCh->maxKey)) { + // no data in memory and no old data in file, just skip the table + taosRUnLockLatch(&(pIter->pTable->latch)); + return 0; + } + + if (tsdbLoadBlockInfo(pReadH, tid) < 0) { taosRUnLockLatch(&(pIter->pTable->latch)); return -1; } @@ -642,6 +666,12 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { return -1; } + // Append a new blockIdx + if (tsdbAppendBlockIdx(pTSCh) < 0) { + taosRUnLockLatch(&(pIter->pTable->latch)); + return -1; + } + taosRUnLockLatch(&(pIter->pTable->latch)); return 0; } @@ -689,23 +719,19 @@ static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIte return numOfRows; } -static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols, SBlock *pCompBlock) { - STsdbCfg *pCfg = &(pHelper->pRepo->config); +static int tsdbWriteBlockToRightFile(STSCommitHandle *pTSCh, SDataCols *pDataCols, SBlock *pBlock) { + STsdbCfg *pCfg = &(pTSCh->pReadH->pRepo->config); SFile * pFile = NULL; - bool isLast = false; - - ASSERT(pDataCols->numOfRows > 0); + bool islast = false; if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { - pFile = helperDataF(pHelper); + pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA); } else { - isLast = true; - pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? helperNewLastF(pHelper) : helperLastF(pHelper); + islast = true; + pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST); } - ASSERT(pFile->fd > 0); - - if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, pCompBlock, isLast, true) < 0) return -1; + if (tsdbWriteBlockToFile(pTSCh, pFile, pDataCols, pBlock, islast, true) < 0) return -1; return 0; } @@ -877,8 +903,11 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkI return 0; } -static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, STsdbRepo *pOldGroup, STsdbRepo *pNewGroup) { +static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, SFileGroup *pOldGroup, SFileGroup *pNewGroup) { + ASSERT(pOldGroup->fileId == pNewGroup->fileId); + STsdbRepo *pRepo = pTSCh->pReadH->pRepo; + STsdbCfg * pCfg = &(pRepo->config); if (tsdbSetAndOpenReadFGroup(pTSCh->pReadH, pOldGroup) < 0) { tsdbError("vgId:%d failed to set and open commit file group since %s", REPO_ID(pRepo), tstrerror(terrno)); @@ -913,6 +942,8 @@ static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, STsdbRepo *pOldGro pTSCh->pFGroup = pNewGroup; pTSCh->nBlockIdx = 0; + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pOldGroup->fileId, &(pTSCh->minKey), &(pTSCh->maxKey)); + return 0; } @@ -934,12 +965,91 @@ static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError) } static int tsdbWriteBlockInfo(STSCommitHandle *pTSCh) { - // TODO + ASSERT(pTSCh->nBlocks > 0); + SReadHandle *pReadH = pTSCh->pReadH; + SBlockInfo * pBlockInfo = pTSCh->pBlockInfo; + SFile * pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_HEAD); + int tlen = TSDB_BLOCK_INFO_LEN(pTSCh->nBlocks, pTSCh->nSubBlocks); + + pBlockInfo->delimiter = TSDB_FILE_DELIMITER; + pBlockInfo->uid = TABLE_UID(pReadH->pTable); + pBlockInfo->tid = TABLE_TID(pReadH->pTable); + + if (pTSCh->nSubBlocks > 0) { + if (tsdbAllocBuf(&((void *)pTSCh->pBlockInfo), tlen) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + memcpy(POINTER_SHIFT(pTSCh->pBlockInfo, sizeof(SBlockInfo) + sizeof(SBlock) * pTSCh->nBlocks), + (void *)pTSCh->pSubBlock, sizeof(SBlock) * pTSCh->nSubBlocks); + } + + taosCalcChecksumAppend(0, (uint8_t *)(pTSCh->pBlockInfo), tlen); + + if (taosTWrite(pFile->fd, (void *)pTSCh->pBlockInfo, tlen) < tlen) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pRepo), tlen, pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + pTSCh->newBlockIdx.tid = TABLE_TID(pReadH->pTable); + pTSCh->newBlockIdx.uid = TABLE_UID(pReadH->pTable); + pTSCh->newBlockIdx.offset = (uint32_t)(pFile->info.size); + pTSCh->newBlockIdx.numOfBlocks = pTSCh->nBlocks; + pTSCh->newBlockIdx.len = tlen; + pTSCh->newBlockIdx.hasLast = pTSCh->pBlockInfo->blocks[pTSCh->nBlocks - 1].last; + pTSCh->newBlockIdx.maxKey = pTSCh->pBlockInfo->blocks[pTSCh->nBlocks - 1].keyLast; + + pFile->info.size += tlen; + pFile->info.magic = taosCalcChecksum( + pFile->info.magic, (uint8_t *)POINTER_SHIFT(pTSCh->pBlockInfo, tlen - sizeof(TSCKSUM)), sizeof(TSCKSUM)); + return 0; } static int tsdbWriteBlockIdx(STSCommitHandle *pTSCh) { - SFile *pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_HEAD); + ASSERT(pTSCh->nBlockIdx > 0); + + SReadHandle *pReadH = pTSCh->pReadH; + STsdbRepo * pRepo = pReadH->pRepo; + SFile * pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_HEAD); + + int len = tsdbEncodeBlockIdxArray(pTSCh); + if (len < 0) return -1; + + // label checksum + len += sizeof(TSCKSUM); + if (tsdbAllocBuf(&(pReadH->pBuf), len) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + taosCalcChecksumAppend(0, (uint8_t *)(pReadH->pBuf), len); + + off_t offset = lseek(pFile->fd, 0, SEEK_END); + if (offset < 0) { + tsdbError("vgId:%d failed to lseek to end of file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (taosTWrite(pFile->fd, pReadH->pBuf, len) < len) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pRepo), len, pFile->fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + // Update pFile->info + pFile->info.size += len; + pFile->info.offset = (uint32_t)offset; + pFile->info.len = len; + pFile->info.magic = taosCalcChecksum(pFile->info.magic, + (uint8_t *)POINTER_SHIFT(pReadH->pBuf, len - sizeof(TSCKSUM), sizeof(TSCKSUM))); + + ASSERT(pFile->info.size == pFile->info.offset + pFile->info.len); + return 0; } @@ -958,29 +1068,37 @@ static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid) { SReadHandle *pReadH = pTSCh->pReadH; SDataCols * pDataCols = pTSCh->pDataCols; SBlockIdx * pOldIdx = pReadH->pCurBlockIdx; + TSKEY keyNext = tsdbNextIterKey(pIter->pIter); + + ASSERT((pOldIdx == NULL && (!TSDB_KEY_BEYOND_RANGE(keyNext, pTSCh->maxKey))) || pOldIdx->numOfBlocks > 0); - ASSERT(pOldIdx == NULL || pOldIdx->numOfBlocks > 0); + // Initialize + memset((void *)(&(pTSCh->newBlockIdx)), 0, sizeof(pTSCh->newBlockIdx)); + pTSCh->nBlocks = 0; + pTSCh->nSubBlocks = 0; int sidx = 0; int eidx = (pOldIdx == NULL) ? 0 : pOldIdx->numOfBlocks; while (true) { - TSKEY keyNext = tsdbNextIterKey(pIter->pIter); - if (keyNext > pReadH->maxKey) break; + if (TSDB_KEY_BEYOND_RANGE(keyNext, pTSCh->maxKey)) break; + ASSERT(pTSCh->nBlocks == 0 || keyNext > pTSCh->pBlockInfo->blocks[pTSCh->nBlocks-1].keyLast); - void *ptr = taosbsearch((void *)keyNext, (void *)(pReadH->pBlockInfo->blocks + sidx), eidx - sidx, sizeof(SBlock), - NULL, TD_GE); - if (ptr == NULL) { - if (sidx < eidx && pOldIdx->hasLast) { - ptr = pReadH->pBlockInfo->blocks + eidx - 1; - } + void *ptr = NULL; + if (eidx > sidx) { + ptr = taosbsearch((void *)keyNext, (void *)(pReadH->pBlockInfo->blocks + sidx), eidx - sidx, sizeof(SBlock), + compareKeyBlock, TD_GE); + } + + if (ptr == NULL && sidx < eidx && pOldIdx->hasLast) { + ptr = pReadH->pBlockInfo->blocks + eidx - 1; } int bidx = 0; if (ptr == NULL) { bidx = eidx; } else { - bidx = POINTER_DISTANCE(ptr, (void *)pReadH->pBlockInfo->blocks) / sizeof(SBlock); + bidx = POINTER_DISTANCE(ptr, (void *)(pReadH->pBlockInfo->blocks)) / sizeof(SBlock); } if (tsdbCopyBlocks(pTSCh, sidx, bidx) < 0) return -1; @@ -992,8 +1110,12 @@ static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid) { if (tsdbMergeCommit(pTSCh, (SBlock *)ptr) < 0) return -1; sidx++; } + + // Update keyNext + keyNext = tsdbNextIterKey(pIter->pIter); } + // Move remaining blocks if (tsdbCopyBlocks(pTSCh, sidx, eidx) < 0) return -1; return 0; @@ -1001,28 +1123,29 @@ static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid) { static int tsdbCopyBlocks(STSCommitHandle *pTSCh, int sidx, int eidx) { ASSERT(sidx <= eidx); + for (int idx = sidx; idx < eidx; idx++) { - // TODO + if (tsdbCopyBlock(pTSCh, idx) < 0) return -1; } + return 0; } static int tsdbAppendCommit(STSCommitHandle *pTSCh) { SDataCols * pDataCols = pTSCh->pDataCols; SReadHandle *pReadH = pTSCh->pReadH; - STsdbRepo * pRepo = pReadH->pRepo; STable * pTable = pReadH->pTable; SBlock block = {0}; SBlock * pBlock = █ - STsdbCfg * pCfg = &(pRepo->config); + STsdbCfg * pCfg = &(pReadH->pRepo->config); SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable); - int blockCommitRows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock); + int dbrows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock); // default block rows tdResetDataCols(pDataCols); - int rowsToRead = tsdbLoadDataFromCache(pTable, pIter->pIter, pReadH->maxKey, blockCommitRows, pDataCols, NULL, 0); - ASSERT(rowsToRead > 0); + int rows = tsdbLoadDataFromCache(pTable, pIter->pIter, pReadH->maxKey, dbrows, pDataCols, NULL, 0); + ASSERT(rows > 0 && pDataCols->numOfRows == rows); - if (tsdbWriteBlockToProperFile(pTSCh, pDataCols, pBlock) < 0) return -1; + if (tsdbWriteBlockToRightFile(pTSCh, pDataCols, pBlock) < 0) return -1; if (tsdbAddSuperBlock(pTSCh, pBlock) < 0) return -1; return -1; @@ -1043,112 +1166,111 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols SReadHandle *pReadH = pTSCh->pReadH; STsdbRepo * pRepo = pReadH->pRepo; STsdbCfg * pCfg = &(pRepo->config); - int64_t offset = 0; - SBlockData * pBlockData = NULL; + int64_t offset = pFile->info.size; int nColsNotAllNull = 0; + int csize = TSDB_BLOCK_DATA_LEN(nColsNotAllNull); // column size + int32_t keyLen = 0; - ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_END)); + ASSERT(offset == lseek(pFile->fd, 0, SEEK_END)); ASSERT(pDataCols->numOfRows > 0 && pDataCols->numOfRows <= pCfg->maxRowsPerFileBlock); ASSERT(isLast ? pDataCols->numOfRows < pCfg->minRowsPerFileBlock : true); - offset = pFile->info.size; - int32_t bsize = TSDB_BLOCK_DATA_LEN(0); // total block size - int32_t csize = 0; // SBlockCol part size + if (tsdbAllocBuf(&((void *)pReadH->pBlockData), csize) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } - for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column + int32_t coffset = 0; // column data offset + for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column SDataCol * pDataCol = pDataCols->cols + ncol; + SBlockCol *pBlockCol = NULL; - if (isNEleNull(pDataCol, pDataCols->numOfRows)) { // all data to commit are NULL, just ignore it - continue; - } + if (ncol != 0) { + if (isNEleNull(pDataCol, pDataCols->numOfRows)) { // all data to commit are NULL, just ignore it + continue; + } - nColsNotAllNull++; - bsize = TSDB_BLOCK_DATA_LEN(nColsNotAllNull); - if (tsdbAllocBuf(&(pReadH->pBuf), bsize) < 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - pBlockData = (SBlockData *)(pReadH->pBuf); - SBlockCol *pBlockCol = pBlockData->cols + (nColsNotAllNull - 1); + nColsNotAllNull++; + csize = TSDB_BLOCK_DATA_LEN(nColsNotAllNull); + if (tsdbAllocBuf(&((void *)pReadH->pBlockData), csize) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } - memset(pBlockCol, 0, sizeof(*pBlockCol)); + pBlockCol = pReadH->pBlockData->cols + nColsNotAllNull - 1; + memset(pBlockCol, 0, sizeof(*pBlockCol)); - pBlockCol->colId = pDataCol->colId; - pBlockCol->type = pDataCol->type; - if (tDataTypeDesc[pDataCol->type].getStatisFunc) { - (*tDataTypeDesc[pDataCol->type].getStatisFunc)( - (TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), - &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), &(pBlockCol->numOfNull)); + pBlockCol->colId = pDataCol->colId; + pBlockCol->type = pDataCol->type; + if (tDataTypeDesc[pDataCol->type].getStatisFunc) { + (*tDataTypeDesc[pDataCol->type].getStatisFunc)((TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, + pDataCols->numOfRows, &(pBlockCol->min), &(pBlockCol->max), + &(pBlockCol->sum), &(pBlockCol->minIndex), + &(pBlockCol->maxIndex), &(pBlockCol->numOfNull)); + } } - } - - csize = bsize; - - ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); - - // Compress the data if neccessary - int tcol = 0; - int32_t toffset = 0; - int32_t keyLen = 0; - for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { - if (ncol != 0 && tcol >= nColsNotAllNull) break; - SDataCol * pDataCol = pDataCols->cols + ncol; - SBlockCol *pBlockCol = pBlockData->cols + tcol; + // compress data if needed + int32_t olen = dataColGetNEleLen(pDataCol, pDataCols->numOfRows); + int32_t blen = olen + COMP_OVERFLOW_BYTES; // allocated buffer length + int32_t clen = 0; - if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue; - void *tptr = POINTER_SHIFT(pBlockData, lsize); + if (tsdbAllocBuf(&(pReadH->pBuf), coffset + blen + sizeof(TSCKSUM)) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } - int32_t flen = 0; // final length - int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite); + void *pData = POINTER_SHIFT(pReadH->pBuf, coffset); if (pCfg->compression) { if (pCfg->compression == TWO_STAGE_COMP) { - pHelper->compBuffer = taosTRealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES); - if (pHelper->compBuffer == NULL) { + if (tsdbAllocBuf(&(pReadH->pCBuf), blen) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; + return -1; } } - flen = (*(tDataTypeDesc[pDataCol->type].compFunc))( - (char *)pDataCol->pData, tlen, rowsToWrite, tptr, (int32_t)taosTSizeof(pHelper->pBuffer) - lsize, - pCfg->compression, pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer)); + clen = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, olen, pDataCols->numOfRows, pData, + blen, pCfg->compression, pReadH->pCBuf, blen); } else { - flen = tlen; - memcpy(tptr, pDataCol->pData, flen); + clen = olen; + memcpy(pData, olen); } - // Add checksum - ASSERT(flen > 0); - flen += sizeof(TSCKSUM); - taosCalcChecksumAppend(0, (uint8_t *)tptr, flen); + ASSERT(clen > 0 && clen <= blen); + + clen += sizeof(TSCKSUM); + taosCalcChecksumAppend(0, (uint8_t *)pData, clen); pFile->info.magic = - taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)), sizeof(TSCKSUM)); + taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pData, clen - sizeof(TSCKSUM)), sizeof(TSCKSUM)); if (ncol != 0) { - pBlockCol->offset = toffset; - pBlockCol->len = flen; - tcol++; + pReadH->pBlockData->cols[ncol].offset = coffset; + pReadH->pBlockData->cols[ncol].len = clen; } else { - keyLen = flen; + keyLen = clen; } - toffset += flen; - lsize += flen; + coffset += clen; } + ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); + + pReadH->pBlockData->delimiter = TSDB_FILE_DELIMITER; + pReadH->pBlockData->uid = TABLE_UID(pReadH->pTable); + pReadH->pBlockData->numOfCols = nColsNotAllNull; - pBlockData->delimiter = TSDB_FILE_DELIMITER; - pBlockData->uid = pHelper->tableInfo.uid; - pBlockData->numOfCols = nColsNotAllNull; + taosCalcChecksumAppend(0, (uint8_t *)pReadH->pBlockData, csize); + pFile->info.magic = taosCalcChecksum( + pFile->info.magic, (uint8_t *)POINTER_SHIFT(pReadH->pBlockData, csize - sizeof(TSCKSUM)), sizeof(TSCKSUM)); - taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize); - pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM)), - sizeof(TSCKSUM)); + if (taosTWrite(pFile->fd, (void *)pReadH->pBlockData, csize) < csize) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pRepo), csize, pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } - // Write the whole block to file - if (taosTWrite(pFile->fd, (void *)pBlockData, lsize) < lsize) { - tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, pFile->fname, + if (taosTWrite(pFile->fd, pReadH->pBuf, coffset) < coffset) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pRepo), coffset, pFile->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -1158,21 +1280,20 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols pBlock->last = isLast; pBlock->offset = offset; pBlock->algorithm = pCfg->compression; - pBlock->numOfRows = rowsToWrite; - pBlock->len = lsize; + pBlock->numOfRows = pDataCols->numOfRows; + pBlock->len = coffset+csize; pBlock->keyLen = keyLen; pBlock->numOfSubBlocks = isSuperBlock ? 1 : 0; pBlock->numOfCols = nColsNotAllNull; pBlock->keyFirst = dataColsKeyFirst(pDataCols); - pBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1); + pBlock->keyLast = dataColsKeyLast(pDataCols); + + pFile->info.size += pBlock->len; tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, - REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, pFile->fname, (int64_t)(pBlock->offset), - (int)(pBlock->numOfRows), pBlock->len, pBlock->numOfCols, pBlock->keyFirst, - pBlock->keyLast); - - pFile->info.size += pBlock->len; + REPO_ID(pRepo), TABLE_TID(pReadH->pTable), pFile->fname, (int64_t)(pBlock->offset), + (int)(pBlock->numOfRows), pBlock->len, pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast); return 0; } @@ -1319,5 +1440,103 @@ static int tsdbCommitMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { } } } + return 0; +} + +static int tsdbEncodeBlockIdxArray(STSCommitHandle *pTSCh) { + SReadHandle *pReadH = pTSCh->pReadH; + int len = 0; + + for (int i = 0; i < pTSCh->nBlockIdx; i++) { + int tlen = tsdbEncodeBlockIdx(NULL, pTSCh->pBlockIdx + i); + + if (tsdbAllocBuf(&(pReadH->pBuf), tlen + len) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + void *ptr = POINTER_SHIFT(pReadH->pBuf, len); + tsdbEncodeBlockIdx(&ptr, pTSCh->pBlockIdx + i); + + len += tlen; + } + + return len; +} + +static int tsdbUpdateFileGroupInfo(SFileGroup *pFileGroup) { + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + SFile *pFile = TSDB_FILE_IN_FGROUP(pFileGroup, type); + if (tsdbUpdateFileHeader(pFile) < 0) return -1; + } + + return 0; +} + +static int tsdbAppendBlockIdx(STSCommitHandle *pTSCh) { + if (tsdbAllocBuf(&((void *)(pTSCh->pBlockIdx)), sizeof(SBlockIdx) * (pTSCh->nBlockIdx + 1)) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + pTSCh->pBlockIdx[pTSCh->nBlockIdx++] = pTSCh->newBlockIdx; + return 0; +} + +static int tsdbCopyBlock(STSCommitHandle *pTSCh, int bidx) { + SReadHandle *pReadH = pTSCh->pReadH; + + SBlock *pBlock = pReadH->pBlockInfo->blocks + bidx; + SFile * pWFile = NULL; + SFile * pRFile = NULL; + SBlock newBlock = {0}; + + if (pBlock->last) { + pWFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST); + pRFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST); + } else { + pWFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA); + pRFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_DATA); + } + + // TODO: use flag to omit this string compare. this may cause a lot of time + if (strncmp(pWFile->fname, pRFile->fname, TSDB_FILENAME_LEN) == 0) { + if (pBlock->numOfSubBlocks == 1) { + + } else { // need to copy both super block and sub-blocks + + } + } else { + if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; + if (tsdbWriteBlockToFile(pTSCh, pWFile, pReadH->pDataCols[0], &newBlock, pBlock->last, true) < 0) return -1; + // TODO: add a super block + } + + return 0; +} + +static int compareKeyBlock(const void *arg1, const void *arg2) { + TSKEY key = *(TSKEY *)arg1; + SBlock *pBlock = (SBlock *)arg2; + + if (key < pBlock->keyFirst) { + return -1; + } else if (key > pBlock->keyLast) { + return 1; + } + + return 0; +} + +static int tsdbAddSuperBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { + int tsize = sizeof(SBlockInfo) + sizeof(SBlock) * (pTSCh->nBlocks + 1) + sizeof(TSCKSUM); + if (tsdbAllocBuf(&((void *)pTSCh->pBlockInfo), tsize) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + ASSERT(pTSCh->nBlocks == 0 || pBlock->keyFirst > pTSCh->pBlockInfo->blocks[pTSCh->nBlocks - 1].keyLast); + pTSCh->pBlockInfo->blocks[pTSCh->nBlocks++] = *pBlock; + return 0; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbReadUtil.c b/src/tsdb/src/tsdbReadUtil.c index 0f2c20fcbf..247aa350fd 100644 --- a/src/tsdb/src/tsdbReadUtil.c +++ b/src/tsdb/src/tsdbReadUtil.c @@ -95,8 +95,6 @@ int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { } } - tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pFGroup->fileId, &(pReadH->minKey), &(pReadH->maxKey)); - return 0; } diff --git a/src/tsdb/src/tsdbUtil.c b/src/tsdb/src/tsdbUtil.c index d2a1bd53aa..0aa5b4c1a1 100644 --- a/src/tsdb/src/tsdbUtil.c +++ b/src/tsdb/src/tsdbUtil.c @@ -111,21 +111,4 @@ void tsdbResetFGroupFd(SFileGroup *pFGroup) { for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { TSDB_FILE_IN_FGROUP(pFGroup, type)->fd = -1; } -} - -int tsdbAllocBuf(void **ppBuf, uint32_t size) { - ASSERT(size > 0); - - void *pBuf = *pBuf; - - uint32_t tsize = taosTSizeof(pBuf); - if (tsize >= size) return 0; - - if (tsize == 0) tsize = 1024; - while (tsize < size) { - tsize *= 2; - } - - *ppBuf = taosTRealloc(pBuf, tsize); - if (*ppBuf == NULL) return -1; -} +} \ No newline at end of file -- GitLab