diff --git a/src/util/src/tlist.c b/src/util/src/tlist.c index badcb7802f510b2978abace6b21a1098e1cdc44d..aaedc7672677dcd2c27d961412955c0ad519d326 100644 --- a/src/util/src/tlist.c +++ b/src/util/src/tlist.c @@ -138,11 +138,10 @@ SListNode *tdListPopNode(SList *list, SListNode *node) { // Move all node elements from src to dst, the dst is assumed as an empty list void tdListMove(SList *src, SList *dst) { // assert(dst->eleSize == src->eleSize); - dst->numOfEles = src->numOfEles; - dst->head = src->head; - dst->tail = src->tail; - src->numOfEles = 0; - src->head = src->tail = NULL; + SListNode *node = NULL; + while ((node = tdListPopHead(src)) != NULL) { + tdListAppendNode(dst, node); + } } void tdListNodeGetData(SList *list, SListNode *node, void *target) { memcpy(target, node->data, list->eleSize); } diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 7e358cc0a2479ddfeec9f5cf8aae1f29dee5be19..8c106d1067f92d9eabcb3dbf8eb89081a93f015a 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -122,6 +122,15 @@ typedef struct { } SCompInfo; #define TSDB_COMPBLOCK_AT(pCompInfo, idx) ((pCompInfo)->blocks + (idx)) +#define TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pCompBlock, size)\ +do {\ + if (pCompBlock->numOfSubBlocks > 1) {\ + pCompBlock = pCompInfo->blocks + pCompBlock->offset;\ + size = pCompBlock->numOfSubBlocks;\ + } else {\ + size = 1;\ + }\ +} while (0) // TODO: take pre-calculation into account typedef struct { @@ -147,6 +156,8 @@ int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf); int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf); int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData); +SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid); + // TODO: need an API to merge all sub-block data into one void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 9a0db96bd0a4b93b410abe95ef6e4438128fbe4e..5240a99a370727a773422877c6e88a8984ea42dc 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -35,7 +35,6 @@ static int compFGroup(const void *arg1, const void *arg2); static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname); static int tsdbWriteFileHead(SFile *pFile); static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); -static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid); STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles); @@ -309,7 +308,7 @@ void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t file *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; } -static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid) { +SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid) { if (pFileH->numOfFGroups == 0 || fid < pFileH->fGroup[0].fileId || fid > pFileH->fGroup[pFileH->numOfFGroups - 1].fileId) return NULL; void *ptr = bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey); diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 984f2ca203c27d17b59e5ee495446c31999dfc60..769fc238153987b4cf14e09ca2692d260fdf9b3a 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -90,6 +90,8 @@ static void * tsdbCommitData(void *arg); static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols); static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey); static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey); +static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, + int64_t uid); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -330,13 +332,13 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { pRepo->tsdbCache->imem = pRepo->tsdbCache->mem; pRepo->tsdbCache->mem = NULL; pRepo->tsdbCache->curBlock = NULL; + tsdbUnLockRepo(repo); // TODO: here should set as detached or use join for memory leak pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); pthread_create(&(pRepo->commitThread), &thattr, tsdbCommitData, (void *)repo); - tsdbUnLockRepo(repo); return 0; } @@ -814,7 +816,9 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables) } if (!tSkipListIterNext(iters[tid])) { - assert(false); + // No data in this iterator + tSkipListDestroyIter(iters[tid]); + iters[tid] = NULL; } } @@ -839,8 +843,8 @@ static void *tsdbCommitData(void *arg) { } // Create a data column buffer for commit - SDataCols *pCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock); - if (pCols == NULL) { + SDataCols *pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock); + if (pDataCols == NULL) { // TODO: deal with the error return NULL; } @@ -849,16 +853,15 @@ static void *tsdbCommitData(void *arg) { int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); for (int fid = sfid; fid <= efid; fid++) { - if (tsdbCommitToFile(pRepo, fid, iters, pCols) < 0) { + if (tsdbCommitToFile(pRepo, fid, iters, pDataCols) < 0) { // TODO: deal with the error here // assert(0); } } - tdFreeDataCols(pCols); + tdFreeDataCols(pDataCols); tsdbDestroyTableIters(iters, pCfg->maxTables); - tsdbLockRepo(arg); tdListMove(pCache->imem->list, pCache->pool.memPool); free(pCache->imem); @@ -918,25 +921,52 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { /* TODO */ } + lseek(hFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET); + // Loop to commit data in each table for (int tid = 0; tid < pCfg->maxTables; tid++) { STable * pTable = pMeta->tables[tid]; SSkipListIterator *pIter = iters[tid]; SCompIdx * pIdx = &pIndices[tid]; + if (pTable == NULL || pIter == NULL) continue; + + /* If no new data to write for this table, just write the old data to new file + * if there are. + */ if (!tsdbHasDataInRange(pIter, minKey, maxKey)) { - // This table does not have data in this range, just copy its head part and last - // part (if neccessary) to new file - if (pIdx->offset > 0) { // has old data + // has old data + if (pIdx->offset > 0) { if (isNewLastFile && pIdx->hasLast) { - // Need to load SCompBlock part and copy to new file + // need to move the last block to new file if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */ } if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ } - // TODO: Copy the last block from old last file to new file - // tsdbCopyBlockData() + tdInitDataCols(pCols, pTable->schema); + + SCompBlock *pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks); + int nBlocks = 0; + + TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pTBlock, nBlocks); + + SCompBlock tBlock; + int64_t toffset, tlen; + tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_LAST], pTBlock, nBlocks, pCols, &tBlock); + + tsdbWriteBlockToFileImpl(&lFile, pCols, pCols->numOfPoints, &toffset, tlen, pTable->tableId.uid); + pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks); + pTBlock->offset = toffset; + pTBlock->len = tlen; + pTBlock->numOfPoints = pCols->numOfPoints; + pTBlock->numOfSubBlocks = 1; + + pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR); + if (nBlocks > 1) { + pIdx->len -= (sizeof(SCompBlock) * nBlocks); + } + write(hFile.fd, (void *)pCompInfo, pIdx->len); } else { pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR); sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len); @@ -951,6 +981,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters if (pIdx->offset > 0) { if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) { // has last block || cache key overlap with commit key + pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100); if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ } if (pCompInfo->uid == pTable->tableId.uid) isCompBlockLoaded = 1; @@ -984,7 +1015,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters // // SCompBlock *pTBlock = NULL; // } // } - pointsWritten = pCols->numOfPoints; + // pointsWritten = pCols->numOfPoints; tdPopDataColsPoints(pCols, pointsWritten); maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints; }