From bed8890fe808c67f8fc4046dbb41658e99a851ef Mon Sep 17 00:00:00 2001 From: hzcheng Date: Wed, 8 Apr 2020 18:04:03 +0800 Subject: [PATCH] TD-100 --- src/vnode/tsdb/inc/tsdbMain.h | 35 +- src/vnode/tsdb/src/tsdbMain.c | 10 +- src/vnode/tsdb/src/tsdbRWHelper.c | 628 ++++++------------------------ 3 files changed, 134 insertions(+), 539 deletions(-) diff --git a/src/vnode/tsdb/inc/tsdbMain.h b/src/vnode/tsdb/inc/tsdbMain.h index 54f5ab0cce..949bb9d48b 100644 --- a/src/vnode/tsdb/inc/tsdbMain.h +++ b/src/vnode/tsdb/inc/tsdbMain.h @@ -385,10 +385,11 @@ typedef struct { SHelperFile files; SHelperTable tableInfo; + SCompIdx compIdx; // SCompIdx of current table - // ---------- For read purpose int8_t state; // current loading state + // Information in .head file SCompIdx *pCompIdx; size_t compIdxSize; @@ -396,33 +397,24 @@ typedef struct { size_t compInfoSize; int blockIter; // For write purpose + // Information in .data or .last file SCompData *pCompData; size_t compDataSize; SDataCols *pDataCols[2]; - // ---------- For read purpose - bool hasLast; - - int newBlocks; - SCompIdx *pWCompIdx; - size_t wCompIdxSize; - - SCompInfo *pWCompInfo; - size_t wCompInfoSize; - - SCompData *pWCompData; - size_t wCompDataSize; + // Compression buffer + void * cBuffer; + size_t cBufSize; } SRWHelper; // --------- Helper state -#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state -#define TSDB_HELPER_FILE_SET 0x1 // File is set -#define TSDB_HELPER_FILE_OPEN 0x2 // File is opened - -#define TSDB_HELPER_IDX_LOAD 0x4 // SCompIdx part is loaded -#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded -#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded +#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state +#define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set +#define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded +#define TSDB_HELPER_TABLE_SET 0x4 // Table is set +#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded +#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded #define TSDB_HELPER_TYPE(h) ((h)->config.type) @@ -435,8 +427,7 @@ void tsdbDestroyHelper(SRWHelper *pHelper); void tsdbClearHelper(SRWHelper *pHelper); // --------- For set operations -int tsdbSetHelperFile(SRWHelper *pHelper, SFileGroup *pGroup); -int tsdbOpenHelperFile(SRWHelper *pHelper); +int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup); void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema); int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError); diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index b5d573764e..1596cd54d3 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -889,23 +889,20 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters tsdbGetDataDirName(pRepo, dataDir); if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) goto _err; - // Set the file to write/read - tsdbSetHelperFile(pHelper, pGroup); - // Open files for write/read - if (tsdbOpenHelperFile(pHelper) < 0) goto _err; + if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) goto _err; // Loop to commit data in each table for (int tid = 0; tid < pCfg->maxTables; tid++) { STable * pTable = pMeta->tables[tid]; SSkipListIterator *pIter = iters[tid]; + // Set the helper and the buffer dataCols object to help to write this table SHelperTable hTable = {.uid = pTable->tableId.uid, .tid = pTable->tableId.tid, .sversion = pTable->sversion}; tsdbSetHelperTable(pHelper, &hTable, tsdbGetTableSchema(pMeta, pTable)); tdInitDataCols(pDataCols, tsdbGetTableSchema(pMeta, pTable)); - // Loop to write the data in the cache to files, if no data to write, just break - // the loop + // Loop to write the data in the cache to files. If no data to write, just break the loop int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; while (true) { int rowsRead = tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pDataCols); @@ -939,6 +936,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters return 0; _err: + ASSERT(false); tsdbCloseHelperFile(pHelper, 1); return -1; } diff --git a/src/vnode/tsdb/src/tsdbRWHelper.c b/src/vnode/tsdb/src/tsdbRWHelper.c index 79fe3c49f7..2e01d9efec 100644 --- a/src/vnode/tsdb/src/tsdbRWHelper.c +++ b/src/vnode/tsdb/src/tsdbRWHelper.c @@ -35,7 +35,7 @@ static void tsdbClearHelperRead(SRWHelper *pHelper); static void tsdbClearHelperWrite(SRWHelper *pHelper); static bool tsdbShouldCreateNewLast(SRWHelper *pHelper); static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock, - bool isLast); + bool isLast, bool isSuperBlock); static int compareKeyBlock(const void *arg1, const void *arg2); static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); static int nRowsLEThan(SDataCols *pDataCols, int maxKey); @@ -78,17 +78,22 @@ void tsdbClearHelper(SRWHelper *pHelper) { tsdbClearHelperWrite(pHelper); } -int tsdbSetHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { - // TODO: reset the helper object +int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { + ASSERT(pHelper != NULL && pGroup != NULL); - pHelper->files.fid = pGroup->fileId; + // Clear the helper object + tsdbClearHelper(pHelper); + + ASSERT(pHelper->state == TSDB_HELPER_CLEAR_STATE); + // Set the files + pHelper->files.fid = pGroup->fileId; pHelper->files.headF = pGroup->files[TSDB_FILE_TYPE_HEAD]; pHelper->files.dataF = pGroup->files[TSDB_FILE_TYPE_DATA]; pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST]; - if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) { char *fnameDup = strdup(pHelper->files.headF.fname); + if (fnameDup == NULL) goto _err; if (fnameDup == NULL) return -1; char *dataDir = dirname(fnameDup); @@ -96,32 +101,33 @@ int tsdbSetHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { tsdbGetFileName(dataDir, pHelper->files.fid, ".l", pHelper->files.nLastF.fname); free((void *)fnameDup); } - return 0; -} - -int tsdbOpenHelperFile(SRWHelper *pHelper) { - // TODO: check if the file is set - {} + // Open the files + if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err; if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) { - if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err; if (tsdbOpenFile(&(pHelper->files.dataF), O_RDWR) < 0) goto _err; if (tsdbOpenFile(&(pHelper->files.lastF), O_RDWR) < 0) goto _err; - // TODO: need to write head and compIdx part + + // Create and open .h if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) goto _err; + size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pHelper->config.maxTables; + if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, tsize) < tsize) goto _err; + + // Create and open .l file if should if (tsdbShouldCreateNewLast(pHelper)) { if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err; + if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) goto _err; } } else { - if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err; if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err; if (tsdbOpenFile(&(pHelper->files.lastF), O_RDONLY) < 0) goto _err; } - return 0; + helperSetState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN); -_err: - tsdbCloseHelperFile(pHelper, true); + return tsdbLoadCompIdx(pHelper, NULL); + + _err: return -1; } @@ -153,31 +159,44 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { } void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema) { - // TODO: check if it is available to set the table + ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN)); + + // Clear members and state used by previous table + pHelper->blockIter = 0; + pHelper->state &= (TSDB_HELPER_TABLE_SET - 1); pHelper->tableInfo = *pHelperTable; - // TODO: Set the pDataCols according to schema + tdInitDataCols(pHelper->pDataCols[0], pSchema); + tdInitDataCols(pHelper->pDataCols[1], pSchema); + + pHelper->compIdx = pHelper->pCompIdx[pHelper->tableInfo.tid]; - // TODO: set state + helperSetState(pHelper, TSDB_HELPER_TABLE_SET); } +/** + * Write part of of points from pDataCols to file + * + * @return: number of points written to file successfully + * -1 for failure + */ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER); - ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET) && helperHasState(pHelper, TSDB_HELPER_FILE_OPEN)); + SCompBlock compBlock; int rowsToWrite = 0; TSKEY keyFirst = dataColsKeyFirst(pDataCols); - // Load SCompIdx part if not loaded yet - if ((!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) && (tsdbLoadCompIdx(pHelper, NULL) < 0)) goto _err; + ASSERT(helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)); + SCompIdx curIdx = pHelper->compIdx; // old table SCompIdx for sendfile usage + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; // for change purpose // Load the SCompInfo part if neccessary - SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - if ((pIdx->offset > 0) && (pIdx->hasLast || dataColsKeyFirst(pDataCols) <= pIdx->maxKey)) { - if (tsdbLoadCompInfo(pHelper, NULL) < 0) goto _err; - } - - SCompIdx *pWIdx = pHelper->pWCompIdx + pHelper->tableInfo.tid; + ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET)); + if ((!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) && + ((pIdx->offset > 0) && (pIdx->hasLast || dataColsKeyFirst(pDataCols) <= pIdx->maxKey)) && + (tsdbLoadCompInfo(pHelper, NULL) < 0)) + goto _err; if (!pIdx->hasLast && keyFirst > pIdx->maxKey) { // Just need to append as a super block @@ -189,10 +208,10 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { pWFile = &(pHelper->files.dataF); } else { isLast = true; - pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.nLastF); + pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF); } - if (tsdbWriteBlockToFile(pHelper, pWFile, pDataCols, rowsToWrite, &compBlock, isLast) < 0) goto _err; + if (tsdbWriteBlockToFile(pHelper, pWFile, pDataCols, rowsToWrite, &compBlock, isLast, true) < 0) goto _err; // TODO: may need to reallocate the memory pHelper->pCompInfo->blocks[pHelper->blockIter++] = compBlock; @@ -259,39 +278,44 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { } int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { - // TODO: check helper state - ASSERT(!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)); + ASSERT(pHelper->state = TSDB_HELPER_FILE_SET_AND_OPEN); - int fd = pHelper->files.headF.fd; + if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) { + // If not load from file, just load it in object + int fd = pHelper->files.headF.fd; - if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; - if (tread(fd, pHelper->pCompIdx, pHelper->compIdxSize) < pHelper->compIdxSize) return -1; - // TODO: check the checksum + if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; + if (tread(fd, (void *)(pHelper->pCompIdx), pHelper->compIdxSize) < pHelper->compIdxSize) return -1; + // TODO: check the correctness of the part + } + helperSetState(pHelper, TSDB_HELPER_IDX_LOAD); + // Copy the memory for outside usage if (target) memcpy(target, pHelper->pCompIdx, pHelper->compIdxSize); - helperSetState(pHelper, TSDB_HELPER_IDX_LOAD); return 0; } int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { - SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET)); + + SCompIdx curCompIdx = pHelper->compIdx; - ASSERT(pIdx->offset > 0); + ASSERT(curCompIdx.offset > 0 && curCompIdx.len > 0); int fd = pHelper->files.headF.fd; - if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1; - ASSERT(pIdx->len > 0); + if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { + if (lseek(fd, curCompIdx.offset, SEEK_SET) < 0) return -1; - adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, pIdx->len); - if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < 0) return -1; - // TODO: check the checksum + adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, curCompIdx.len); + if (tread(fd, (void *)(pHelper->pCompInfo), pHelper) < 0) return -1; + // TODO: check the checksum - // TODO: think about when target has no space for the content - if (target) memcpy(target, (void *)(pHelper->pCompInfo), pIdx->len); + helperSetState(pHelper, TSDB_HELPER_INFO_LOAD); + } - helperSetState(pHelper, TSDB_HELPER_INFO_LOAD); + if (target) memcpy(target, (void *)(pHelper->pCompInfo), curCompIdx.len); return 0; } @@ -376,21 +400,21 @@ static void tsdbDestroyHelperRead(SRWHelper *pHelper) { static int tsdbInitHelperWrite(SRWHelper *pHelper) { SHelperCfg *pCfg = &(pHelper->config); - pHelper->wCompIdxSize = pCfg->maxTables * sizeof(SCompIdx); - if ((pHelper->pWCompIdx = (SCompIdx *)malloc(pHelper->wCompIdxSize)) == NULL) return -1; + // pHelper->wCompIdxSize = pCfg->maxTables * sizeof(SCompIdx); + // if ((pHelper->pWCompIdx = (SCompIdx *)malloc(pHelper->wCompIdxSize)) == NULL) return -1; return 0; } static void tsdbDestroyHelperWrite(SRWHelper *pHelper) { - tfree(pHelper->pWCompIdx); - pHelper->wCompIdxSize = 0; + // tfree(pHelper->pWCompIdx); + // pHelper->wCompIdxSize = 0; - tfree(pHelper->pWCompInfo); - pHelper->wCompInfoSize = 0; + // tfree(pHelper->pWCompInfo); + // pHelper->wCompInfoSize = 0; - tfree(pHelper->pWCompData); - pHelper->wCompDataSize = 0; + // tfree(pHelper->pWCompData); + // pHelper->wCompDataSize = 0; } static void tsdbClearHelperRead(SRWHelper *pHelper) { @@ -407,17 +431,21 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { } static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock, - bool isLast) { - ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints); + bool isLast, bool isSuperBlock) { + ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints && + rowsToWrite <= pHelper->config.maxRowsPerFileBlock); + + SCompData *pCompData = NULL; + int64_t offset = 0; - int64_t offset = lseek(pFile->fd, 0, SEEK_END); + offset = lseek(pFile->fd, 0, SEEK_END); if (offset < 0) goto _err; - SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pDataCols->numOfCols); + pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pDataCols->numOfCols); if (pCompData == NULL) goto _err; int nColsNotAllNull = 0; - int32_t toffset; + int32_t toffset = 0; for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { SDataCol *pDataCol = pDataCols->cols + ncol; SCompCol *pCompCol = pCompData->cols + nColsNotAllNull; @@ -427,481 +455,59 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa continue; } + // Compress the data here + {} + pCompCol->colId = pDataCol->colId; pCompCol->type = pDataCol->type; - pCompCol->len = pDataCol->len; + pCompCol->len = TYPE_BYTES[pCompCol->type] * rowsToWrite; // TODO: change it pCompCol->offset = toffset; nColsNotAllNull++; + toffset += pCompCol->len; } + ASSERT(nColsNotAllNull > 0); + pCompData->delimiter = TSDB_FILE_DELIMITER; pCompData->uid = pHelper->tableInfo.uid; pCompData->numOfCols = nColsNotAllNull; size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull; if (twrite(pFile->fd, (void *)pCompData, tsize) < tsize) goto _err; - for (int i = 0; i < pDataCols->numOfCols; i++) { - SDataCol *pDataCol = pCompData->cols + i; - SCompCol *pCompCol = NULL; - if (twrite(pFile->fd, (void *)(pDataCol->pData), pCompCol->len) < pCompCol->len) goto _err; + int nCompCol = 0; + for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { + ASSERT(nCompCol < nColsNotAllNull); + + SDataCol *pDataCol = pDataCols->cols + ncol; + SCompCol *pCompCol = pCompData->cols + nCompCol; + + if (pDataCol->colId == pCompCol->colId) { + if (twrite(pFile->fd, (void *)(pDataCol->pData), pCompCol->len) < pCompCol->len) goto _err; + tsize += pCompCol->len; + nCompCol++; + } } pCompBlock->last = isLast; pCompBlock->offset = offset; - // pCOmpBlock->algorithm = ; + pCompBlock->algorithm = 2; // TODO pCompBlock->numOfPoints = rowsToWrite; pCompBlock->sversion = pHelper->tableInfo.sversion; - // pCompBlock->len = ; - // pCompBlock->numOfSubBlocks = ; + pCompBlock->len = (int32_t)tsize; + pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0; pCompBlock->numOfCols = nColsNotAllNull; - // pCompBlock->keyFirst = ; - // pCompBlock->keyLast = ; + pCompBlock->keyFirst = dataColsKeyFirst(pDataCols); + pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1); + tfree(pCompData); return 0; _err: + tfree(pCompData); return -1; } -// static int compareKeyBlock(const void *arg1, const void *arg2); - -// /** -// * Init a read-write helper object for read or write usage. -// */ -// int tsdbInitHelper(SRWHelper *pHelper, int maxTables, tsdb_rwhelper_t type, int maxRowSize, int maxRows, -// int maxCols) { -// if (pHelper == NULL) return -1; - -// memset((void *)pHelper, 0, sizeof(SRWHelper)); -// for (int ftype = TSDB_RW_HEADF; ftype <= TSDB_RW_LF; ftype++) { -// pHelper->files[ftype] = -1; -// } - -// // Set type -// pHelper->type = type; - -// // Set global configuration -// pHelper->maxTables = maxTables; -// pHelper->maxRowSize = maxRowSize; -// pHelper->maxRows = maxRows; -// pHelper->maxCols = maxCols; - -// // Allocate SCompIdx part memory -// pHelper->compIdxSize = sizeof(SCompIdx) * maxTables; -// pHelper->pCompIdx = (SCompIdx *)malloc(pHelper->compIdxSize); -// if (pHelper->pCompIdx == NULL) goto _err; - -// pHelper->compDataSize = sizeof(SCompData) + sizeof(SCompCol) * maxCols; -// pHelper->pCompData = (SCompData *)malloc(pHelper->compDataSize); - -// pHelper->pDataCols = tdNewDataCols(maxRowSize, maxCols, maxRows); -// if (pHelper->pDataCols == NULL) goto _err; - -// return 0; - -// _err: -// tsdbDestroyHelper(pHelper); -// return -1; -// } - -// void tsdbResetHelper(SRWHelper *pHelper) { -// if (pHelper->headF.fd > 0) { -// close(pHelper->headF.fd); -// pHelper->headF.fd = -1; -// } -// if (pHelper->dataF.fd > 0) { -// close(pHelper->dataF.fd); -// pHelper->dataF.fd = -1; -// } -// if (pHelper->lastF.fd > 0) { -// close(pHelper->lastF.fd); -// pHelper->lastF.fd = -1; -// } -// if (pHelper->hF.fd > 0) { -// close(pHelper->hF.fd); -// pHelper->hF.fd = -1; -// } -// if (pHelper->lF.fd > 0) { -// close(pHelper->lF.fd); -// pHelper->lF.fd = -1; -// } - -// pHelper->state = 0; -// tdResetDataCols(pHelper->pDataCols); -// } - -// int tsdbDestroyHelper(SRWHelper *pHelper) { -// if (pHelper->headF.fd > 0) close(pHelper->headF.fd); -// if (pHelper->dataF.fd > 0) close(pHelper->dataF.fd); -// if (pHelper->lastF.fd > 0) close(pHelper->lastF.fd); -// if (pHelper->hF.fd > 0) close(pHelper->hF.fd); -// if (pHelper->lF.fd > 0) close(pHelper->lF.fd); - -// if (pHelper->pCompIdx) free(pHelper->pCompIdx); -// if (pHelper->pCompInfo) free(pHelper->pCompInfo); -// if (pHelper->pCompData) free(pHelper->pCompData); -// memset((void *)pHelper, 0, sizeof(SRWHelper)); -// return 0; -// } - -// int tsdbSetHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { -// if (pHelper->state != 0) return -1; - -// pHelper->fid = pGroup->fileId; - -// pHelper->headF = pGroup->files[TSDB_FILE_TYPE_HEAD]; -// pHelper->headF.fd = -1; -// pHelper->dataF = pGroup->files[TSDB_FILE_TYPE_DATA]; -// pHelper->dataF.fd = -1; -// pHelper->lastF = pGroup->files[TSDB_FILE_TYPE_LAST]; -// pHelper->lastF.fd = -1; - -// if (pHelper->mode == TSDB_WRITE_HELPER) { -// char *fnameCpy = strdup(pHelper->headF.fname); -// if (fnameCpy == NULL) return -1; -// char *dataDir = dirname(fnameCpy); - -// memset((void *)(&pHelper->hF), 0, sizeof(SFile)); -// memset((void *)(&pHelper->lF), 0, sizeof(SFile)); -// pHelper->hF.fd = -1; -// pHelper->lF.fd = -1; - -// tsdbGetFileName(dataDir, pHelper->fid, ".h", pHelper->hF.fname); -// tsdbGetFileName(dataDir, pHelper->fid, ".l", pHelper->lF.fname); -// free((char *)fnameCpy); -// } - -// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_FILE_SET); - -// return 0; -// } - -// static int tsdbNeedToCreateNewLastFile() { -// // TODO -// return 0; -// } - -// int tsdbCloseHelperFile(SRWHelper *pHelper, int hasErr) { -// int ret = 0; -// if (pHelper->headF.fd > 0) { -// close(pHelper->headF.fd); -// pHelper->headF.fd = -1; -// } -// if (pHelper->dataF.fd > 0) { -// close(pHelper->dataF.fd); -// pHelper->dataF.fd = -1; -// } -// if (pHelper->lastF.fd > 0) { -// close(pHelper->lastF.fd); -// pHelper->lastF.fd = -1; -// } -// if (pHelper->hF.fd > 0) { -// close(pHelper->hF.fd); -// pHelper->hF.fd = -1; -// if (hasErr) remove(pHelper->hF.fname); -// } -// if (pHelper->lF.fd > 0) { -// close(pHelper->lF.fd); -// pHelper->lF.fd = -1; -// if (hasErr) remove(pHelper->hF.fname); -// } -// return 0; -// } - -// int tsdbOpenHelperFile(SRWHelper *pHelper) { -// if (pHelper->state != TSDB_RWHELPER_FILE_SET) return -1; - -// if (pHelper->mode == TSDB_READ_HELPER) { // The read helper -// if (tsdbOpenFile(&pHelper->headF, O_RDONLY) < 0) goto _err; -// if (tsdbOpenFile(&pHelper->dataF, O_RDONLY) < 0) goto _err; -// if (tsdbOpenFile(&pHelper->lastF, O_RDONLY) < 0) goto _err; -// } else { -// if (tsdbOpenFile(&pHelper->headF, O_RDONLY) < 0) goto _err; -// if (tsdbOpenFile(&pHelper->dataF, O_RDWR) < 0) goto _err; -// if (tsdbOpenFile(&pHelper->lastF, O_RDWR) < 0) goto _err; -// // Open .h and .l file -// if (tsdbOpenFile(&pHelper->hF, O_WRONLY | O_CREAT) < 0) goto _err; -// if (tsdbNeedToCreateNewLastFile()) { -// if (tsdbOpenFile(&pHelper->lF, O_WRONLY | O_CREAT) < 0) goto _err; -// } -// } - -// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_FILE_OPENED); - -// return 0; -// _err: -// tsdbCloseHelperFile(pHelper, 1); -// return -1; -// } - -// int tsdbLoadCompIdx(SRWHelper *pHelper) { -// if (pHelper->state != (TSDB_RWHELPER_FILE_SET | TSDB_RWHELPER_FILE_OPENED)) return -1; - -// if (lseek(pHelper->headF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; -// if (tread(pHelper->headF.fd, (void *)(pHelper->pCompIdx), pHelper->compIdxSize) < pHelper->compIdxSize) return -1; - -// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_COMPIDX_LOADED); - -// return 0; -// } - -// int tsdbSetHelperTable(SRWHelper *pHelper, int32_t tid, int64_t uid, STSchema *pSchema) { -// // TODO: add some check information -// pHelper->tid = tid; -// pHelper->uid = uid; - -// tdInitDataCols(pHelper->pDataCols, pSchema); - -// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_TABLE_SET); - -// return 0; -// } - -// int tsdbLoadCompBlocks(SRWHelper *pHelper) { -// SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tid; -// if (pIdx->offset <= 0) return 0; - -// if (lseek(pHelper->headF.fd, pIdx->offset, SEEK_SET) < 0) return -1; -// if (pHelper->compInfoSize < pIdx->len) { -// pHelper->pCompInfo = (SCompInfo *)realloc((void *)(pHelper->pCompInfo), pIdx->len); -// if (pHelper->pCompInfo == NULL) return -1; -// pHelper->compInfoSize = pIdx->len; -// } - -// if (tread(pHelper->headF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1; - -// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_COMPBLOCK_LOADED); - -// return 0; -// } - -// int tsdbRWHelperSetBlockIdx(SRWHelper *pHelper, int blkIdx) { -// SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tid; -// if (blkIdx > pIdx->numOfSuperBlocks) return -1; - -// pHelper->blkIdx = blkIdx; - -// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_BLOCKIDX_SET); - -// return 0; -// } - -// int tsdbRWHelperLoadCompData(SRWHelper *pHelper) { -// SCompBlock *pBlock = pHelper->pCompInfo->blocks + pHelper->blkIdx; - -// if (pBlock->numOfSubBlocks == 1) { // Only one super block -// size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols; -// if (size > pHelper->compDataSize) { -// pHelper->pCompData = (SCompData *)realloc((void *)pHelper->pCompData, size); -// if (pHelper->pCompData == NULL) return -1; -// pHelper->compDataSize = size; -// } - -// if (lseek(pHelper->dataF.fd, pBlock->offset, SEEK_SET) < 0) return -1; -// if (tread(pHelper->dataF.fd, (void *)(pHelper->pCompData), size) < size) return -1; -// } else { // TODO: More sub blocks -// } - -// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_COMPCOL_LOADED); - -// return 0; -// } - - -// static int compColIdCompCol(const void *arg1, const void *arg2) { -// int colId = *(int *)arg1; -// SCompCol *pCompCol = (SCompCol *)arg2; - -// return (int)(colId - pCompCol->colId); -// } - -// static int compColIdDataCol(const void *arg1, const void *arg2) { -// int colId = *(int *)arg1; -// SDataCol *pDataCol = (SDataCol *)arg2; - -// return (int)(colId - pDataCol->colId); -// } - -// int tsdbRWHelperLoadColData(SRWHelper *pHelper, int colId) { -// SCompBlock *pBlock = pHelper->pCompInfo->blocks + pHelper->blkIdx; - -// if (pBlock->numOfSubBlocks == 1) { // only one super block -// SCompCol *pCompCol = bsearch((void *)(&colId), (void *)(pHelper->pCompData->cols), pBlock->numOfCols, compColIdCompCol, compColIdCompCol); -// if (pCompCol == NULL) return 0; // No data to read from this block , but we still return 0 - -// SDataCol *pDataCol = bsearch((void *)(&colId), (void *)(pHelper->pDataCols->cols), pHelper->pDataCols->numOfCols, sizeof(SDataCol), compColIdDataCol); -// assert(pDataCol != NULL); - -// int fd = (pBlock->last) ? pHelper->lastF.fd : pHelper->dataF.fd; -// if (lseek(fd, pBlock->offset + pCompCol->offset, SEEK_SET) < 0) return -1; -// if (tread(fd, (void *)pDataCol->pData, pCompCol->len) < pCompCol->len) return -1; -// pDataCol->len = pCompCol->len; -// } else { -// // TODO: more than 1 blocks -// } -// return 0; -// } - -// int tsdbRWHelperLoadBlockData(SRWHelper *pHelper, int blkIdx) { -// SCompBlock *pBlock = pHelper->pCompInfo->blocks + pHelper->blkIdx; - -// if (pBlock->numOfSubBlocks == 1) { -// for (int i = 0; i < pHelper->pDataCols->numOfCols; i++) { -// if (tsdbRWHelperLoadBlockData(pHelper, pHelper->pDataCols->cols[i].colId) < 0) return -1; -// } -// } else { -// // TODO: more than 1 block of data -// } -// return 0; -// } - -// int tsdbRWHelperCopyCompBlockPart(SRWHelper *pHelper) { -// // TODO -// return 0; -// } - -// int tsdbRWHelperCopyDataBlockPart(SRWHelper *pHelper ) { -// // TODO -// return 0; -// } - -// int tsdbRWHelperWriteCompIdx(SRWHelper *pHelper) { -// // TODO -// if (lseek(pHelper->hF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1; -// if (twrite(pHelper->hF.fd, (void *)(pHelper->pCompIdx), pHelper->compIdxSize) < pHelper->compIdxSize) return -1; - -// return 0; -// } - -// /** -// * Load the data block from file -// * -// * @return 0 for success -// * -1 for failure -// */ -// int tsdbLoadDataBlock(SRWHelper *pHelper, int bldIdx) { -// SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tid; -// if (pIdx->) - -// return 0; -// } - -// /** -// * Append the block to a file, either .data -// */ -// int tsdbAppendBlockToFile(SRWHelper *pHelper, tsdb_rw_file_t toFile, SDataCols *pDataCols, SCompBlock *pCompBlock, bool isSuper) { -// SFile *pFile = pHelper->files + toFile; - -// int64_t offset = lseek(pFile->fd, 0, SEEK_END); -// if (*offset < 0) return -1; - -// SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pDataCols->numOfCols); -// if (pCompData == NULL) return -1; - - -// int numOfNotAllNullCols = 0; -// int32_t toffset = 0; -// for (int i = 0; i < pDataCols->numOfCols; i++) { -// SDataCol *pDataCol = pDataCols->cols + i; -// SCompCol *pCompCol = pCompData->cols + numOfNotAllNullCols; - -// if (0 /* All data in this column are NULL value */) { -// continue; -// } -// pCompCol->colId = pDataCol->colId; -// pCompCol->type = pDataCol->type; -// pCompCol->len = pDataCol->len; -// // pCompCol->offset = toffset; -// numOfNotAllNullCols++; -// // toffset += pDataCol->len; -// } - -// pCompData->delimiter = TSDB_FILE_DELIMITER; -// pCompData->numOfCols = numOfNotAllNullCols; -// pCompData->uid = pHelper->uid; - -// size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * numOfNotAllNullCols; -// if (twrite(pFile->fd, (void *)pCompData, tsize) < 0) return -1; -// for (int i = 0; i < numOfNotAllNullCols; i++) { -// SCompCol *pCompCol = pCompData->cols + i; -// SDataCol *pDataCol = NULL; // bsearch() -// tassert(pDataCol != NULL); -// if (twrite(pFile->fd, (void *)(pDataCol->pData), pDataCol->len) < pDataCol->len) return -1; -// } - -// pCompBlock->last = (toFile == TSDB_RW_DATAF) ? 0 : 1; -// pCompBlock->offset = offset; -// pCompBlock->algorithm = pHelper->compression; -// pCompBlock->numOfPoints = pDataCols->numOfPoints; -// pCompBlock->sversion = pHelper->sversion; -// // pCompBlock->len = ; -// pCompBlock->numOfSubBlocks = isSuper ? 1 : 0; -// pCompBlock->numOfCols = numOfNotAllNullCols; -// pCompBlock->keyFirst = dataColsKeyFirst(pDataCols); -// pCompBlock->keyLast = dataColsKeyLast(pDataCols); - -// return 0; -// } - -// /** -// * Write the whole or part of the cached data block to file. -// * -// * There are four options: -// * 1. Append the whole block as a SUPER-BLOCK at the end -// * 2. Append part/whole block as a SUPER-BLOCK and insert in the middle -// * 3. Append part/whole block as a SUB-BLOCK -// * 4. Merge part/whole block as a SUPER-BLOCK -// */ -// int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { -// tassert(pHelper->type == TSDB_WRITE_HELPER); - -// int rowsWritten = 0; -// SCompBlock compBlock; - -// SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tid; -// // if ((no old data) OR (no last block AND cached first key is larger than the max key)) -// if ((pIdx->offset == 0) || (pIdx->hasLast && dataColsKeyFirst(pDataCols) > pIdx->maxKey)) { -// // Append the whole block as a SUPER-BLOCK at the end -// if (pDataCols->numOfPoints >= pHelper->minRowPerFileBlock) { -// if (tsdbAppendBlockToFile(pHelper, TSDB_RW_DATAF, pDataCols, &compBlock, true) < 0) goto _err; -// } else { -// tsdb_rw_file_t ftype = (pHelper->files[TSDB_RW_LF].fd > 0) ? TSDB_RW_LF : TSDB_RW_LASTF; -// if (tsdbAppendBlockToFile(pHelper, ftype, pDataCols, &compBlock, true) < 0) goto _err; -// } -// // Copy the compBlock part to the end -// if (IS_COMPBLOCK_LOADED(pHelper)) { - -// } else { - -// } - -// pIdx->hasLast = compBlock.last; -// pIdx->len += sizeof(compBlock); -// pIdx->numOfSuperBlocks++; -// pIdx->maxKey = compBlock.keyLast; - -// rowsWritten = pDataCols->numOfPoints; -// } else { -// // Need to find a block to merge with -// int blkIdx = 0; -// // if (has last block AND cached Key is larger than the max Key) -// if (pIdx->hasLast && dataColsKeyFirst(pDataCols) > pIdx->maxKey) { -// blkIdx = pIdx->numOfSuperBlocks - 1; -// rowsWritten = tsdbMergeDataWithBlock(pHelper, pDataCols, blkIdx); -// if (rowsWritten < 0) goto _err; -// } else { -// ASSERT(IS_COMPBLOCK_LOADED(pHelper)); -// // SCompBlock *pMergeBlock = taosbsearch(); -// } -// } - -// return numOfPointsWritten; - -// _err: -// return -1; -// } - static int compareKeyBlock(const void *arg1, const void *arg2) { TSKEY key = *(TSKEY *)arg1; SCompBlock *pBlock = (SCompBlock *)arg2; @@ -947,7 +553,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa pFile = &(pHelper->files.lastF); } - if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rowsCanMerge, &compBlock, pCompBlock->last) < 0) goto _err; + if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rowsCanMerge, &compBlock, pCompBlock->last, false) < 0) goto _err; // TODO: Add the sub-block if (pCompBlock->numOfSubBlocks == 1) { @@ -985,7 +591,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa } } - if (tsdbWriteBlockToFile(pHelper, pFile, pHelper->pDataCols[0], pCompBlock->numOfPoints + rowsCanMerge, &compBlock, isLast) < 0) goto _err; + if (tsdbWriteBlockToFile(pHelper, pFile, pHelper->pDataCols[0], pCompBlock->numOfPoints + rowsCanMerge, &compBlock, isLast, true) < 0) goto _err; *pCompBlock = compBlock; -- GitLab