diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index a7016061e4e5af4afeef6182baafe62c687d7b2d..4e8afd4f0eec26b42842fd4b4ddfcff05d876a13 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -119,13 +119,15 @@ typedef struct { int maxPoints; // max number of points int numOfPoints; int numOfCols; // Total number of cols + int sversion; // TODO: set sversion void * buf; SDataCol cols[]; } SDataCols; #define keyCol(pCols) (&((pCols)->cols[0])) // Key column -#define dataColsKeyFirst(pCols) ((int64_t *)(keyCol(pCols)->pData))[0] -#define dataColsKeyLast(pCols) ((int64_t *)(keyCol(pCols)->pData))[(pCols)->numOfPoints - 1] +#define dataColsKeyAt(pCols, idx) ((int64_t *)(keyCol(pCols)->pData))[(idx)] +#define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0) +#define dataColsKeyLast(pCols) dataColsKeyAt(pCols, (pCols)->numOfPoints - 1) SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); void tdResetDataCols(SDataCols *pCols); diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 0f654ad0bde2bf5d2656761b8b2775f18c028dc2..7e358cc0a2479ddfeec9f5cf8aae1f29dee5be19 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -26,6 +26,7 @@ extern "C" { #endif #define TSDB_FILE_HEAD_SIZE 512 +#define TSDB_FILE_DELIMITER 0xF00AFA0F #define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) #define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3) diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 088a164933865382ae33e1b15518818c6d385e27..9a0db96bd0a4b93b410abe95ef6e4438128fbe4e 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -24,8 +24,6 @@ #include "tsdbFile.h" -#define TSDB_FILE_DELIMITER 0xF00AFA0F - const char *tsdbFileSuffix[] = { ".head", // TSDB_FILE_TYPE_HEAD ".data", // TSDB_FILE_TYPE_DATA diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 5f7dd2513f312bbb5907bac4df8f700a6cfbe789..d18e766c38960b07669f6a66daac408e524c770a 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -1041,13 +1041,70 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK return 0; } +static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, int64_t uid) { + size_t size = sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols; + SCompData *pCompData = (SCompData *)malloc(size); + if (pCompData == NULL) return -1; + + pCompData->delimiter = TSDB_FILE_DELIMITER; + pCompData->uid = uid; + pCompData->numOfCols = pCols->numOfCols; + + *offset = lseek(pFile->fd, 0, SEEK_END); + *len = size; + + int toffset = size; + for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { + SCompCol *pCompCol = pCompData->cols + iCol; + SDataCol *pDataCol = pCols->cols + iCol; + + pCompCol->colId = pDataCol->colId; + pCompCol->type = pDataCol->type; + pCompCol->offset = toffset; + + // TODO: add compression + pCompCol->len = TYPE_BYTES[pCompCol->type] * pointsToWrite; + toffset += pCompCol->len; + } + + // Write the block + if (write(pFile->fd, (void *)pCompData, size) < 0) goto _err; + for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { + SDataCol *pDataCol = pCols->cols + iCol; + SCompCol *pCompCol = pCompData->cols + iCol; + if (write(pFile->fd, pDataCol->pData, pCompCol->len) < 0) goto _err; + } + + if (pCompData == NULL) free((void *)pCompData); + return 0; + +_err: + if (pCompData == NULL) free((void *)pCompData); + return -1; +} + static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock) { STsdbCfg *pCfg = &(pRepo->config); + SCompData *pCompData = NULL; memset((void *)pCompBlock, 0, sizeof(SCompBlock)); if (pCompInfo == NULL) { - // Just need to append to file + if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file + // tsdbWriteBlockToFileImpl() + } else { // Write to .last or .l file + pCompBlock->last = 1; + + } + // pCompBlock->offset = ; + // pCompBlock->len = ; + pCompBlock->algorithm = 2; // TODO : add to configuration + pCompBlock->sversion = pCols->sversion; + pCompBlock->numOfPoints = pCols->numOfPoints; + pCompBlock->numOfSubBlocks = 1; + pCompBlock->numOfCols = pCols->numOfCols; + pCompBlock->keyFirst = dataColsKeyFirst(pCols); + pCompBlock->keyLast = dataColsKeyLast(pCols); } else { // Need to merge }