提交 253c7e64 编写于 作者: H hzcheng

first version of committing data

上级 ecd1e289
......@@ -136,6 +136,7 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
void tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols);
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop);
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
#ifdef __cplusplus
}
......
......@@ -382,6 +382,7 @@ static int tdFLenFromSchema(STSchema *pSchema) {
return ret;
}
int tdMergeDataCols(SDataCols *target, SDataCols *source) {
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
// TODO
return 0;
}
\ No newline at end of file
......@@ -90,9 +90,9 @@ typedef struct {
STable *superList; // super table list TODO: change it to list container
void *map; // table map of (uid ===> table)
void *map; // table map of (uid ===> table)
SMetaFile *mfh; // meta file handle
SMetaFile *mfh; // meta file handle
int maxRowBytes;
int maxCols;
} STsdbMeta;
......@@ -118,14 +118,14 @@ STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable);
#define TSDB_TABLE_OF_ID(pHandle, id) ((pHandle)->pTables)[id]
#define TSDB_GET_TABLE_OF_NAME(pHandle, name) /* TODO */
STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo);
STsdbMeta *tsdbGetMeta(tsdb_repo_t *pRepo);
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg);
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId);
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId);
// int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable);
STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid);
char *getTupleKey(const void * data);
char * getTupleKey(const void *data);
// ------------------------------ TSDB CACHE INTERFACES ------------------------------
#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16 * 1024 * 1024 /* 16M */
......@@ -191,8 +191,8 @@ typedef struct {
} SFileInfo;
typedef struct {
int fd;
char fname[128];
int fd;
char fname[128];
SFileInfo info;
} SFile;
......@@ -216,11 +216,14 @@ typedef struct {
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles);
void tsdbCloseFileH(STsdbFileH *pFileH);
int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose);
int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables);
int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, int maxTables, SFile *pFile, int writeHeader,
int toClose);
SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables);
int tsdbOpenFile(SFile *pFile, int oflag);
int tsdbCloseFile(SFile *pFile); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid);
int tsdbCloseFile(SFile *pFile);
SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid);
int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid);
int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname);
#define TSDB_FGROUP_ITER_FORWARD 0
#define TSDB_FGROUP_ITER_BACKWARD 1
......@@ -265,6 +268,8 @@ typedef struct {
TSKEY keyLast;
} SCompBlock;
// Maximum number of sub-blocks a super-block can have
#define TSDB_MAX_SUBBLOCKS 8
#define IS_SUPER_BLOCK(pBlock) ((pBlock)->numOfSubBlocks >= 1)
#define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0)
......@@ -276,15 +281,15 @@ typedef struct {
} SCompInfo;
#define TSDB_COMPBLOCK_AT(pCompInfo, idx) ((pCompInfo)->blocks + (idx))
#define TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pCompBlock, size)\
do {\
if (pCompBlock->numOfSubBlocks > 1) {\
pCompBlock = pCompInfo->blocks + pCompBlock->offset;\
size = pCompBlock->numOfSubBlocks;\
} else {\
size = 1;\
}\
} while (0)
#define TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pCompBlock, size) \
do { \
if (pCompBlock->numOfSubBlocks > 1) { \
pCompBlock = pCompInfo->blocks + pCompBlock->offset; \
size = pCompBlock->numOfSubBlocks; \
} else { \
size = 1; \
} \
} while (0)
// TODO: take pre-calculation into account
typedef struct {
......@@ -302,18 +307,11 @@ typedef struct {
SCompCol cols[];
} SCompData;
STsdbFileH* tsdbGetFile(tsdb_repo_t* pRepo);
int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols);
int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables);
int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf);
int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf);
int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf);
int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData);
STsdbFileH *tsdbGetFile(tsdb_repo_t *pRepo);
int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast,
SDataCols *pCols);
SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid);
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey);
// TSDB repository definition
......@@ -348,6 +346,112 @@ typedef struct _tsdb_repo {
} STsdbRepo;
typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t;
typedef struct {
tsdb_rw_helper_t type; // helper type
int maxTables;
int maxRowSize;
int maxRows;
int maxCols;
int minRowsPerFileBlock;
int maxRowsPerFileBlock;
int8_t compress;
} SHelperCfg;
typedef struct {
int fid;
TSKEY minKey;
TSKEY maxKey;
// For read/write purpose
SFile headF;
SFile dataF;
SFile lastF;
// For write purpose only
SFile nHeadF;
SFile nLastF;
} SHelperFile;
typedef struct {
int64_t uid;
int32_t tid;
int32_t sversion;
} SHelperTable;
typedef struct {
// Global configuration
SHelperCfg config;
SHelperFile files;
SHelperTable tableInfo;
// ---------- For read purpose
int8_t state; // current loading state
SCompIdx *pCompIdx;
size_t compIdxSize;
SCompInfo *pCompInfo;
size_t compInfoSize;
int blockIter; // For write purpose
SCompData *pCompData;
size_t compDataSize;
SDataCols *pDataCols[2];
// ---------- For read purpose
bool hasLast;
int newBlocks;
SCompIdx *pWCompIdx;
size_t wCompIdxSize;
SCompInfo *pWCompInfo;
size_t wCompInfoSize;
SCompData *pWCompData;
size_t wCompDataSize;
} SRWHelper;
// --------- Helper state
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
#define TSDB_HELPER_FILE_SET 0x1 // File is set
#define TSDB_HELPER_FILE_OPEN 0x2 // File is opened
#define TSDB_HELPER_IDX_LOAD 0x4 // SCompIdx part is loaded
#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded
#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded
#define TSDB_HELPER_TYPE(h) ((h)->config.type)
#define helperSetState(h, s) (((h)->state) |= (s))
#define helperClearState(h, s) ((h)->state &= (~(s)))
#define helperHasState(h, s) ((((h)->state) & (s)) == (s))
int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg);
void tsdbDestroyHelper(SRWHelper *pHelper);
void tsdbClearHelper(SRWHelper *pHelper);
// --------- For set operations
int tsdbSetHelperFile(SRWHelper *pHelper, SFileGroup *pGroup);
int tsdbOpenHelperFile(SRWHelper *pHelper);
void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema);
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError);
// --------- For read operations
int tsdbLoadCompIdx(SRWHelper *pHelper, void *target);
int tsdbLoadCompInfo(SRWHelper *pHelper, void *target);
int tsdbLoadCompData(SRWHelper *pHelper, int blkIdx, void *target);
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int32_t *colIds, int numOfColIds);
int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols);
// --------- For write operations
int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols);
int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper);
int tsdbWriteCompInfo(SRWHelper *pHelper);
int tsdbWriteCompIdx(SRWHelper *pHelper);
#ifdef __cplusplus
}
......
......@@ -21,6 +21,7 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <libgen.h>
#include "tutil.h"
#include "tsdbMain.h"
......@@ -33,7 +34,6 @@ const char *tsdbFileSuffix[] = {
static int compFGroupKey(const void *key, const void *fgroup);
static int compFGroup(const void *arg1, const void *arg2);
static int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname);
static int tsdbWriteFileHead(SFile *pFile);
static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables);
static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid);
......@@ -91,24 +91,36 @@ static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid) {
return 0;
}
int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) {
if (pFileH->numOfFGroups >= pFileH->maxFGroups) return -1;
/**
* Create the file group if the file group not exists.
*
* @return A pointer to
*/
SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) {
if (pFileH->numOfFGroups >= pFileH->maxFGroups) return NULL;
SFileGroup fGroup;
SFileGroup *pFGroup = &fGroup;
if (tsdbSearchFGroup(pFileH, fid) == NULL) { // if not exists, create one
SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid);
if (pGroup == NULL) { // if not exists, create one
pFGroup->fileId = fid;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], maxTables, &(pFGroup->files[type]), type == TSDB_FILE_TYPE_HEAD ? 1 : 0, 1) < 0) {
// TODO: deal with the ERROR here, remove those creaed file
return -1;
}
if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], maxTables, &(pFGroup->files[type]),
type == TSDB_FILE_TYPE_HEAD ? 1 : 0, 1) < 0)
goto _err;
}
pFileH->fGroup[pFileH->numOfFGroups++] = fGroup;
qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup);
return tsdbSearchFGroup(pFileH, fid);
}
return 0;
return pGroup;
_err:
// TODO: deal with the err here
return NULL;
}
int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) {
......@@ -181,27 +193,27 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
return ret;
}
int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) {
SCompBlock *pBlock = pStartBlock;
for (int i = 0; i < numOfBlocks; i++) {
if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1;
pCols->numOfPoints += (pCompData->cols[0].len / 8);
for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) {
SCompCol *pCompCol = &(pCompData->cols[iCol]);
// pCols->numOfPoints += pBlock->numOfPoints;
int k = 0;
for (; k < pCols->numOfCols; k++) {
if (pCompCol->colId == pCols->cols[k].colId) break;
}
if (tsdbLoadColData(pFile, pCompCol, pBlock->offset,
(void *)((char *)(pCols->cols[k].pData) + pCols->cols[k].len)) < 0)
return -1;
}
pStartBlock++;
}
return 0;
}
// int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) {
// SCompBlock *pBlock = pStartBlock;
// for (int i = 0; i < numOfBlocks; i++) {
// if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1;
// pCols->numOfPoints += (pCompData->cols[0].len / 8);
// for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) {
// SCompCol *pCompCol = &(pCompData->cols[iCol]);
// // pCols->numOfPoints += pBlock->numOfPoints;
// int k = 0;
// for (; k < pCols->numOfCols; k++) {
// if (pCompCol->colId == pCols->cols[k].colId) break;
// }
// if (tsdbLoadColData(pFile, pCompCol, pBlock->offset,
// (void *)((char *)(pCols->cols[k].pData) + pCols->cols[k].len)) < 0)
// return -1;
// }
// pStartBlock++;
// }
// return 0;
// }
int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols) {
SCompBlock *pSuperBlock = TSDB_COMPBLOCK_AT(pCompInfo, idx);
......@@ -237,42 +249,42 @@ int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInf
return 0;
}
int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) {
SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
// int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) {
// SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
// if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
if (read(pFile->fd, buf, sizeof(SCompIdx) * maxTables) < 0) return -1;
// TODO: need to check the correctness
return 0;
}
// if (read(pFile->fd, buf, sizeof(SCompIdx) * maxTables) < 0) return -1;
// // TODO: need to check the correctness
// return 0;
// }
int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) {
SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
// int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) {
// SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) return -1;
// if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) return -1;
if (read(pFile->fd, buf, pIdx->len) < 0) return -1;
// if (read(pFile->fd, buf, pIdx->len) < 0) return -1;
// TODO: need to check the correctness
// // TODO: need to check the correctness
return 0;
}
// return 0;
// }
int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf) {
// assert(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
// int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf) {
// // assert(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) return -1;
size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols;
if (read(pFile->fd, buf, size) < 0) return -1;
// if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) return -1;
// size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols;
// if (read(pFile->fd, buf, size) < 0) return -1;
return 0;
}
// return 0;
// }
int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf) {
if (lseek(pFile->fd, blockBaseOffset + pCol->offset, SEEK_SET) < 0) return -1;
if (read(pFile->fd, buf, pCol->len) < 0) return -1;
return 0;
}
// int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf) {
// if (lseek(pFile->fd, blockBaseOffset + pCol->offset, SEEK_SET) < 0) return -1;
// if (read(pFile->fd, buf, pCol->len) < 0) return -1;
// return 0;
// }
static int compFGroupKey(const void *key, const void *fgroup) {
int fid = *(int *)key;
......@@ -317,7 +329,7 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) {
return 0;
}
static int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname) {
int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname) {
if (dataDir == NULL || fname == NULL) return -1;
sprintf(fname, "%s/f%d%s", dataDir, fileId, suffix);
......
......@@ -55,11 +55,11 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock);
static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg);
static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname);
static void * tsdbCommitData(void *arg);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols);
static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SRWHelper *pHelper, SDataCols *pDataCols);
static TSKEY tsdbNextIterKey(SSkipListIterator *pIter);
static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey);
static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len,
int64_t uid);
// static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len,
// int64_t uid);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name)
......@@ -751,6 +751,8 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
}
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
if (pIter == NULL) return 0;
int numOfRows = 0;
do {
......@@ -811,7 +813,8 @@ static void *tsdbCommitData(void *arg) {
STsdbRepo * pRepo = (STsdbRepo *)arg;
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbCache *pCache = pRepo->tsdbCache;
STsdbCfg * pCfg = &(pRepo->config);
STsdbCfg * pCfg = &(pRepo->config);
SDataCols * pDataCols = NULL;
if (pCache->imem == NULL) return NULL;
// Create the iterator to read from cache
......@@ -821,24 +824,34 @@ static void *tsdbCommitData(void *arg) {
return NULL;
}
// Create a data column buffer for commit
SDataCols *pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock);
if (pDataCols == NULL) {
// TODO: deal with the error
return NULL;
}
// Create a write helper for commit data
SRWHelper whelper;
SHelperCfg hcfg = {
.type = TSDB_WRITE_HELPER,
.maxTables = pCfg->maxTables,
.maxRowSize = pMeta->maxRowBytes,
.maxRows = pCfg->maxRowsPerFileBlock,
.maxCols = pMeta->maxCols,
.minRowsPerFileBlock = pCfg->minRowsPerFileBlock,
.compress = 2 // TODO make it a configuration
};
if (tsdbInitHelper(&whelper, &hcfg) < 0) goto _exit;
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) goto _exit;
int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision);
int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision);
// Loop to commit to each file
for (int fid = sfid; fid <= efid; fid++) {
if (tsdbCommitToFile(pRepo, fid, iters, pDataCols) < 0) {
// TODO: deal with the error here
// assert(0);
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
ASSERT(false);
goto _exit;
}
}
_exit:
tdFreeDataCols(pDataCols);
tsdbDestroyHelper(&whelper);
tsdbDestroyTableIters(iters, pCfg->maxTables);
tsdbLockRepo(arg);
......@@ -849,7 +862,7 @@ static void *tsdbCommitData(void *arg) {
// TODO: free the skiplist
for (int i = 0; i < pCfg->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->imem) { // Here has memory leak
if (pTable && pTable->imem) { // Here has memory leak
pTable->imem = NULL;
}
}
......@@ -858,19 +871,12 @@ static void *tsdbCommitData(void *arg) {
return NULL;
}
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols) {
int isNewLastFile = 0;
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SRWHelper *pHelper, SDataCols *pDataCols) {
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
STsdbCfg * pCfg = &pRepo->config;
SFile hFile, lFile;
SFileGroup *pGroup = NULL;
SCompIdx * pIndices = NULL;
SCompInfo * pCompInfo = NULL;
// size_t compInfoSize = 0;
// SCompBlock compBlock;
// SCompBlock *pBlock = &compBlock;
TSKEY minKey = 0, maxKey = 0;
tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
......@@ -879,334 +885,212 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey);
if (!hasDataToCommit) return 0; // No data to commit, just return
// TODO: make it more flexible
pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + sizeof(SCompBlock) * 1000);
// Create and open files for commit
tsdbGetDataDirName(pRepo, dataDir);
if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { /* TODO */
}
pGroup = tsdbOpenFilesForCommit(pFileH, fid);
if (pGroup == NULL) { /* TODO */
}
tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 1);
tsdbOpenFile(&hFile, O_RDWR);
if (0 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) {
// TODO: make it not to write the last file every time
tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0);
isNewLastFile = 1;
}
if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) goto _err;
// Load the SCompIdx
pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables);
if (pIndices == NULL) { /* TODO*/
}
if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { /* TODO */
}
// Set the file to write/read
tsdbSetHelperFile(pHelper, pGroup);
lseek(hFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET);
// Open files for write/read
if (tsdbOpenHelperFile(pHelper) < 0) goto _err;
// Loop to commit data in each table
for (int tid = 0; tid < pCfg->maxTables; tid++) {
STable * pTable = pMeta->tables[tid];
SSkipListIterator *pIter = iters[tid];
SCompIdx * pIdx = &pIndices[tid];
int nNewBlocks = 0;
if (pTable == NULL || pIter == NULL) continue;
/* If no new data to write for this table, just write the old data to new file
* if there are.
*/
if (!tsdbHasDataInRange(pIter, minKey, maxKey)) {
// has old data
if (pIdx->len > 0) {
goto _table_over;
// if (isNewLastFile && pIdx->hasLast) {
if (0) {
// need to move the last block to new file
if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */
}
if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */
}
tdInitDataCols(pCols, tsdbGetTableSchema(pMeta, pTable));
SCompBlock *pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks);
int nBlocks = 0;
TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pTBlock, nBlocks);
SCompData tBlock;
int64_t toffset;
int32_t tlen;
tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_LAST], pTBlock, nBlocks, pCols, &tBlock);
tsdbWriteBlockToFileImpl(&lFile, pCols, pCols->numOfPoints, &toffset, &tlen, pTable->tableId.uid);
pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks);
pTBlock->offset = toffset;
pTBlock->len = tlen;
pTBlock->numOfPoints = pCols->numOfPoints;
pTBlock->numOfSubBlocks = 1;
pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR);
if (nBlocks > 1) {
pIdx->len -= (sizeof(SCompBlock) * nBlocks);
}
write(hFile.fd, (void *)pCompInfo, pIdx->len);
} else {
pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR);
sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len);
hFile.info.size += pIdx->len;
}
}
continue;
}
pCompInfo->delimiter = TSDB_FILE_DELIMITER;
pCompInfo->checksum = 0;
pCompInfo->uid = pTable->tableId.uid;
// Load SCompBlock part if neccessary
// int isCompBlockLoaded = 0;
if (0) {
// if (pIdx->offset > 0) {
if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) {
// has last block || cache key overlap with commit key
pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100);
if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */
}
// if (pCompInfo->uid == pTable->tableId.uid) isCompBlockLoaded = 1;
} else {
// TODO: No need to load the SCompBlock part, just sendfile the SCompBlock part
// and write those new blocks to it
}
}
tdInitDataCols(pCols, tsdbGetTableSchema(pMeta, pTable));
SHelperTable hTable = {.uid = pTable->tableId.uid, .tid = pTable->tableId.tid, .sversion = pTable->sversion};
tsdbSetHelperTable(pHelper, &hTable, tsdbGetTableSchema(pMeta, pTable));
tdInitDataCols(pDataCols, tsdbGetTableSchema(pMeta, pTable));
// Loop to write the data in the cache to files, if no data to write, just break
// the loop
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
while (1) {
tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols);
if (pCols->numOfPoints == 0) break;
int pointsWritten = pCols->numOfPoints;
// TODO: all write to the end of .data file
int64_t toffset = 0;
int32_t tlen = 0;
tsdbWriteBlockToFileImpl(&pGroup->files[TSDB_FILE_TYPE_DATA], pCols, pCols->numOfPoints, &toffset, &tlen, pTable->tableId.uid);
// Make the compBlock
SCompBlock *pTBlock = pCompInfo->blocks + nNewBlocks++;
pTBlock->offset = toffset;
pTBlock->len = tlen;
pTBlock->keyFirst = dataColsKeyFirst(pCols);
pTBlock->keyLast = dataColsKeyLast(pCols);
pTBlock->last = 0;
pTBlock->algorithm = 0;
pTBlock->numOfPoints = pCols->numOfPoints;
pTBlock->sversion = pTable->sversion;
pTBlock->numOfSubBlocks = 1;
pTBlock->numOfCols = pCols->numOfCols;
if (dataColsKeyLast(pCols) > pIdx->maxKey) pIdx->maxKey = dataColsKeyLast(pCols);
tdPopDataColsPoints(pCols, pointsWritten);
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints;
}
while (true) {
int rowsRead = tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pDataCols);
ASSERT(rowsRead >= 0);
if (pDataCols->numOfPoints == 0) break;
int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols);
if (rowsWritten < 0) goto _err;
assert(rowsWritten <= pDataCols->numOfPoints);
_table_over:
// Write the SCompBlock part
pIdx->offset = lseek(hFile.fd, 0, SEEK_END);
if (pIdx->len > 0) {
int bytes = tsendfile(hFile.fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len);
if (bytes < pIdx->len) {
printf("Failed to send file, reason: %s\n", strerror(errno));
}
if (nNewBlocks > 0) {
write(hFile.fd, (void *)(pCompInfo->blocks), sizeof(SCompBlock) * nNewBlocks);
pIdx->len += (sizeof(SCompBlock) * nNewBlocks);
}
} else {
if (nNewBlocks > 0) {
write(hFile.fd, (void *)pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks);
pIdx->len += sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks;
}
tdPopDataColsPoints(pDataCols, rowsWritten);
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfPoints;
}
pIdx->checksum = 0;
pIdx->numOfSuperBlocks += nNewBlocks;
pIdx->hasLast = 0;
}
// Write the SCompIdx part
if (lseek(hFile.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {/* TODO */}
if (write(hFile.fd, (void *)pIndices, sizeof(SCompIdx) * pCfg->maxTables) < 0) {/* TODO */}
// Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) goto _err;
// close the files
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
tsdbCloseFile(&pGroup->files[type]);
}
tsdbCloseFile(&hFile);
if (isNewLastFile) tsdbCloseFile(&lFile);
// TODO: replace the .head and .last file
rename(hFile.fname, pGroup->files[TSDB_FILE_TYPE_HEAD].fname);
pGroup->files[TSDB_FILE_TYPE_HEAD].info = hFile.info;
if (isNewLastFile) {
rename(lFile.fname, pGroup->files[TSDB_FILE_TYPE_LAST].fname);
pGroup->files[TSDB_FILE_TYPE_LAST].info = lFile.info;
// Write the SCompBlock part
if (tsdbWriteCompInfo(pHelper) < 0) goto _err;
}
if (pIndices) free(pIndices);
if (pCompInfo) free(pCompInfo);
if (tsdbWriteCompIdx(pHelper) < 0) goto _err;
tsdbCloseHelperFile(pHelper, 0);
// TODO: make it atomic with some methods
pGroup->files[TSDB_FILE_TYPE_HEAD] = pHelper->files.headF;
pGroup->files[TSDB_FILE_TYPE_DATA] = pHelper->files.dataF;
pGroup->files[TSDB_FILE_TYPE_LAST] = pHelper->files.lastF;
return 0;
_err:
tsdbCloseHelperFile(pHelper, 1);
return -1;
}
static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey) {
if (pIter == NULL) return 0;
/**
* Return the next iterator key.
*
* @return the next key if iter has
* -1 if iter not
*/
static TSKEY tsdbNextIterKey(SSkipListIterator *pIter) {
if (pIter == NULL) return -1;
SSkipListNode *node = tSkipListIterGet(pIter);
if (node == NULL) return 0;
if (node == NULL) return -1;
SDataRow row = SL_GET_NODE_DATA(node);
if (dataRowKey(row) >= minKey && dataRowKey(row) <= maxKey) return 1;
return 0;
return dataRowKey(row);
}
static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey) {
TSKEY nextKey;
for (int i = 0; i < nIters; i++) {
SSkipListIterator *pIter = iters[i];
if (tsdbHasDataInRange(pIter, minKey, maxKey)) return 1;
nextKey = tsdbNextIterKey(pIter);
if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
}
return 0;
}
static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, int64_t uid) {
size_t size = sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols;
SCompData *pCompData = (SCompData *)malloc(size);
if (pCompData == NULL) return -1;
// static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, int64_t uid) {
// size_t size = sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols;
// SCompData *pCompData = (SCompData *)malloc(size);
// if (pCompData == NULL) return -1;
pCompData->delimiter = TSDB_FILE_DELIMITER;
pCompData->uid = uid;
pCompData->numOfCols = pCols->numOfCols;
// pCompData->delimiter = TSDB_FILE_DELIMITER;
// pCompData->uid = uid;
// pCompData->numOfCols = pCols->numOfCols;
*offset = lseek(pFile->fd, 0, SEEK_END);
*len = size;
// *offset = lseek(pFile->fd, 0, SEEK_END);
// *len = size;
int toffset = size;
for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
SCompCol *pCompCol = pCompData->cols + iCol;
SDataCol *pDataCol = pCols->cols + iCol;
// int toffset = size;
// for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
// SCompCol *pCompCol = pCompData->cols + iCol;
// SDataCol *pDataCol = pCols->cols + iCol;
pCompCol->colId = pDataCol->colId;
pCompCol->type = pDataCol->type;
pCompCol->offset = toffset;
// TODO: add compression
pCompCol->len = TYPE_BYTES[pCompCol->type] * pointsToWrite;
toffset += pCompCol->len;
}
// Write the block
if (write(pFile->fd, (void *)pCompData, size) < 0) goto _err;
*len += size;
for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
SDataCol *pDataCol = pCols->cols + iCol;
SCompCol *pCompCol = pCompData->cols + iCol;
if (write(pFile->fd, pDataCol->pData, pCompCol->len) < 0) goto _err;
*len += pCompCol->len;
}
if (pCompData == NULL) free((void *)pCompData);
return 0;
_err:
if (pCompData == NULL) free((void *)pCompData);
return -1;
}
static int compareKeyBlock(const void *arg1, const void *arg2) {
TSKEY key = *(TSKEY *)arg1;
SCompBlock *pBlock = (SCompBlock *)arg2;
if (key < pBlock->keyFirst) {
return -1;
} else if (key > pBlock->keyLast) {
return 1;
}
return 0;
}
int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) {
STsdbCfg * pCfg = &(pRepo->config);
SFile * pFile = NULL;
int numOfPointsToWrite = 0;
int64_t offset = 0;
int32_t len = 0;
memset((void *)pCompBlock, 0, sizeof(SCompBlock));
if (pCompInfo == NULL) {
// Just append the data block to .data or .l or .last file
numOfPointsToWrite = pCols->numOfPoints;
if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file
pFile = &(pGroup->files[TSDB_FILE_TYPE_DATA]);
} else { // Write to .last or .l file
pCompBlock->last = 1;
if (lFile) {
pFile = lFile;
} else {
pFile = &(pGroup->files[TSDB_FILE_TYPE_LAST]);
}
}
tsdbWriteBlockToFileImpl(pFile, pCols, numOfPointsToWrite, &offset, &len, uid);
pCompBlock->offset = offset;
pCompBlock->len = len;
pCompBlock->algorithm = 2; // TODO : add to configuration
pCompBlock->sversion = pCols->sversion;
pCompBlock->numOfPoints = pCols->numOfPoints;
pCompBlock->numOfSubBlocks = 1;
pCompBlock->numOfCols = pCols->numOfCols;
pCompBlock->keyFirst = dataColsKeyFirst(pCols);
pCompBlock->keyLast = dataColsKeyLast(pCols);
} else {
// Need to merge the block to either the last block or the other block
TSKEY keyFirst = dataColsKeyFirst(pCols);
SCompBlock *pMergeBlock = NULL;
// Search the block to merge in
void *ptr = taosbsearch((void *)&keyFirst, (void *)(pCompInfo->blocks), sizeof(SCompBlock), pIdx->numOfSuperBlocks,
compareKeyBlock, TD_GE);
if (ptr == NULL) {
// No block greater or equal than the key, but there are data in the .last file, need to merge the last file block
// and merge the data
pMergeBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks - 1);
} else {
pMergeBlock = (SCompBlock *)ptr;
}
if (pMergeBlock->last) {
if (pMergeBlock->last + pCols->numOfPoints > pCfg->minRowsPerFileBlock) {
// Need to load the data from .last and combine data in pCols to write to .data file
// pCompCol->colId = pDataCol->colId;
// pCompCol->type = pDataCol->type;
// pCompCol->offset = toffset;
// // TODO: add compression
// pCompCol->len = TYPE_BYTES[pCompCol->type] * pointsToWrite;
// toffset += pCompCol->len;
// }
// // Write the block
// if (write(pFile->fd, (void *)pCompData, size) < 0) goto _err;
// *len += size;
// for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
// SDataCol *pDataCol = pCols->cols + iCol;
// SCompCol *pCompCol = pCompData->cols + iCol;
// if (write(pFile->fd, pDataCol->pData, pCompCol->len) < 0) goto _err;
// *len += pCompCol->len;
// }
// if (pCompData == NULL) free((void *)pCompData);
// return 0;
} else { // Just append the block to .last or .l file
if (lFile) {
// read the block from .last file and merge with pCols, write to .l file
// _err:
// if (pCompData == NULL) free((void *)pCompData);
// return -1;
// }
} else {
// tsdbWriteBlockToFileImpl();
}
}
} else { // The block need to merge in .data file
// static int compareKeyBlock(const void *arg1, const void *arg2) {
// TSKEY key = *(TSKEY *)arg1;
// SCompBlock *pBlock = (SCompBlock *)arg2;
}
// if (key < pBlock->keyFirst) {
// return -1;
// } else if (key > pBlock->keyLast) {
// return 1;
// }
}
// return 0;
// }
return numOfPointsToWrite;
}
// int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) {
// STsdbCfg * pCfg = &(pRepo->config);
// SFile * pFile = NULL;
// int numOfPointsToWrite = 0;
// int64_t offset = 0;
// int32_t len = 0;
// memset((void *)pCompBlock, 0, sizeof(SCompBlock));
// if (pCompInfo == NULL) {
// // Just append the data block to .data or .l or .last file
// numOfPointsToWrite = pCols->numOfPoints;
// if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file
// pFile = &(pGroup->files[TSDB_FILE_TYPE_DATA]);
// } else { // Write to .last or .l file
// pCompBlock->last = 1;
// if (lFile) {
// pFile = lFile;
// } else {
// pFile = &(pGroup->files[TSDB_FILE_TYPE_LAST]);
// }
// }
// tsdbWriteBlockToFileImpl(pFile, pCols, numOfPointsToWrite, &offset, &len, uid);
// pCompBlock->offset = offset;
// pCompBlock->len = len;
// pCompBlock->algorithm = 2; // TODO : add to configuration
// pCompBlock->sversion = pCols->sversion;
// pCompBlock->numOfPoints = pCols->numOfPoints;
// pCompBlock->numOfSubBlocks = 1;
// pCompBlock->numOfCols = pCols->numOfCols;
// pCompBlock->keyFirst = dataColsKeyFirst(pCols);
// pCompBlock->keyLast = dataColsKeyLast(pCols);
// } else {
// // Need to merge the block to either the last block or the other block
// TSKEY keyFirst = dataColsKeyFirst(pCols);
// SCompBlock *pMergeBlock = NULL;
// // Search the block to merge in
// void *ptr = taosbsearch((void *)&keyFirst, (void *)(pCompInfo->blocks), sizeof(SCompBlock), pIdx->numOfSuperBlocks,
// compareKeyBlock, TD_GE);
// if (ptr == NULL) {
// // No block greater or equal than the key, but there are data in the .last file, need to merge the last file block
// // and merge the data
// pMergeBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks - 1);
// } else {
// pMergeBlock = (SCompBlock *)ptr;
// }
// if (pMergeBlock->last) {
// if (pMergeBlock->last + pCols->numOfPoints > pCfg->minRowsPerFileBlock) {
// // Need to load the data from .last and combine data in pCols to write to .data file
// } else { // Just append the block to .last or .l file
// if (lFile) {
// // read the block from .last file and merge with pCols, write to .l file
// } else {
// // tsdbWriteBlockToFileImpl();
// }
// }
// } else { // The block need to merge in .data file
// }
// }
// return numOfPointsToWrite;
// }
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbMain.h"
#define adjustMem(ptr, size, expectedSize) \
do { \
if ((size) < (expectedSize)) { \
(ptr) = realloc((void *)(ptr), (expectedSize)); \
if ((ptr) == NULL) return -1; \
(size) = (expectedSize); \
} \
} while (0)
// Local function definitions
static int tsdbCheckHelperCfg(SHelperCfg *pCfg);
static void tsdbInitHelperFile(SHelperFile *pHFile);
static int tsdbInitHelperRead(SRWHelper *pHelper);
static int tsdbInitHelperWrite(SRWHelper *pHelper);
static void tsdbClearHelperFile(SHelperFile *pHFile);
static void tsdbDestroyHelperRead(SRWHelper *pHelper);
static void tsdbDestroyHelperWrite(SRWHelper *pHelper);
static void tsdbClearHelperRead(SRWHelper *pHelper);
static void tsdbClearHelperWrite(SRWHelper *pHelper);
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper);
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock,
bool isLast);
static int compareKeyBlock(const void *arg1, const void *arg2);
static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols);
static int nRowsLEThan(SDataCols *pDataCols, int maxKey);
static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols);
int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg) {
if (pHelper == NULL || pCfg == NULL || tsdbCheckHelperCfg(pCfg) < 0) return -1;
memset((void *)pHelper, 0, sizeof(*pHelper));
pHelper->config = *pCfg;
tsdbInitHelperFile(&(pHelper->files));
if (tsdbInitHelperRead(pHelper) < 0) goto _err;
if ((TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) && tsdbInitHelperWrite(pHelper) < 0) goto _err;
pHelper->state = TSDB_HELPER_CLEAR_STATE;
return 0;
_err:
tsdbDestroyHelper(pHelper);
return -1;
}
void tsdbDestroyHelper(SRWHelper *pHelper) {
if (pHelper == NULL) return;
tsdbClearHelperFile(&(pHelper->files));
tsdbDestroyHelperRead(pHelper);
tsdbDestroyHelperWrite(pHelper);
}
void tsdbClearHelper(SRWHelper *pHelper) {
if (pHelper == NULL) return;
tsdbClearHelperFile(&(pHelper->files));
tsdbClearHelperRead(pHelper);
tsdbClearHelperWrite(pHelper);
}
int tsdbSetHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
// TODO: reset the helper object
pHelper->files.fid = pGroup->fileId;
pHelper->files.headF = pGroup->files[TSDB_FILE_TYPE_HEAD];
pHelper->files.dataF = pGroup->files[TSDB_FILE_TYPE_DATA];
pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST];
if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) {
char *fnameDup = strdup(pHelper->files.headF.fname);
if (fnameDup == NULL) return -1;
char *dataDir = dirname(fnameDup);
tsdbGetFileName(dataDir, pHelper->files.fid, ".h", pHelper->files.nHeadF.fname);
tsdbGetFileName(dataDir, pHelper->files.fid, ".l", pHelper->files.nLastF.fname);
free((void *)fnameDup);
}
return 0;
}
int tsdbOpenHelperFile(SRWHelper *pHelper) {
// TODO: check if the file is set
{}
if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) {
if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err;
if (tsdbOpenFile(&(pHelper->files.dataF), O_RDWR) < 0) goto _err;
if (tsdbOpenFile(&(pHelper->files.lastF), O_RDWR) < 0) goto _err;
// TODO: need to write head and compIdx part
if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) goto _err;
if (tsdbShouldCreateNewLast(pHelper)) {
if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err;
}
} else {
if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err;
if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err;
if (tsdbOpenFile(&(pHelper->files.lastF), O_RDONLY) < 0) goto _err;
}
return 0;
_err:
tsdbCloseHelperFile(pHelper, true);
return -1;
}
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
if (pHelper->files.headF.fd > 0) {
close(pHelper->files.headF.fd);
pHelper->files.headF.fd = -1;
}
if (pHelper->files.dataF.fd > 0) {
close(pHelper->files.dataF.fd);
pHelper->files.dataF.fd = -1;
}
if (pHelper->files.lastF.fd > 0) {
close(pHelper->files.lastF.fd);
pHelper->files.lastF.fd = -1;
}
if (pHelper->files.nHeadF.fd > 0) {
close(pHelper->files.nHeadF.fd);
pHelper->files.nHeadF.fd = -1;
if (hasError) remove(pHelper->files.nHeadF.fname);
}
if (pHelper->files.nLastF.fd > 0) {
close(pHelper->files.nLastF.fd);
pHelper->files.nLastF.fd = -1;
if (hasError) remove(pHelper->files.nLastF.fname);
}
return 0;
}
void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema) {
// TODO: check if it is available to set the table
pHelper->tableInfo = *pHelperTable;
// TODO: Set the pDataCols according to schema
// TODO: set state
}
int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET) && helperHasState(pHelper, TSDB_HELPER_FILE_OPEN));
SCompBlock compBlock;
int rowsToWrite = 0;
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
// Load SCompIdx part if not loaded yet
if ((!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) && (tsdbLoadCompIdx(pHelper, NULL) < 0)) goto _err;
// Load the SCompInfo part if neccessary
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
if ((pIdx->offset > 0) && (pIdx->hasLast || dataColsKeyFirst(pDataCols) <= pIdx->maxKey)) {
if (tsdbLoadCompInfo(pHelper, NULL) < 0) goto _err;
}
SCompIdx *pWIdx = pHelper->pWCompIdx + pHelper->tableInfo.tid;
if (!pIdx->hasLast && keyFirst > pIdx->maxKey) {
// Just need to append as a super block
rowsToWrite = pDataCols->numOfPoints;
SFile *pWFile = NULL;
bool isLast = false;
if (rowsToWrite > pHelper->config.minRowsPerFileBlock) {
pWFile = &(pHelper->files.dataF);
} else {
isLast = true;
pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.nLastF);
}
if (tsdbWriteBlockToFile(pHelper, pWFile, pDataCols, rowsToWrite, &compBlock, isLast) < 0) goto _err;
// TODO: may need to reallocate the memory
pHelper->pCompInfo->blocks[pHelper->blockIter++] = compBlock;
pIdx->hasLast = compBlock.last;
pIdx->numOfSuperBlocks++;
pIdx->maxKey = dataColsKeyLast(pDataCols);
// pIdx->len = ??????
} else { // (pIdx->hasLast) OR (keyFirst <= pIdx->maxKey)
if (keyFirst > pIdx->maxKey) {
int blkIdx = pIdx->numOfSuperBlocks - 1;
ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[blkIdx].last);
// Need to merge with the last block
if (tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols) < 0) goto _err;
} else {
// Find the first block greater or equal to the block
SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)(pHelper->pCompInfo->blocks),
pIdx->numOfSuperBlocks, sizeof(SCompBlock), compareKeyBlock, TD_GE);
if (pCompBlock == NULL) {
if (tsdbMergeDataWithBlock(pHelper, pIdx->numOfSuperBlocks-1, pDataCols) < 0) goto _err;
} else {
if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) {
SCompBlock *pNextBlock = NULL;
TSKEY keyLimit = (pNextBlock == NULL) ? INT_MAX : (pNextBlock->keyFirst - 1);
rowsToWrite =
MIN(nRowsLEThan(pDataCols, keyLimit), pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints);
if (tsdbMergeDataWithBlock(pHelper, pCompBlock-pHelper->pCompInfo->blocks, pDataCols) < 0) goto _err;
} else {
// There options: 1. merge with previous block
// 2. commit as one block
// 3. merge with current block
int nRows1 = INT_MAX;
int nRows2 = nRowsLEThan(pDataCols, pCompBlock->keyFirst);
int nRows3 = MIN(nRowsLEThan(pDataCols, (pCompBlock + 1)->keyFirst), (pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints));
// TODO: find the block with max rows can merge
if (tsdbMergeDataWithBlock(pHelper, pCompBlock, pDataCols) < 0) goto _err;
}
}
}
}
return rowsToWrite;
_err:
return -1;
}
int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
// TODO
return 0;
}
int tsdbWriteCompInfo(SRWHelper *pHelper) {
// TODO
return 0;
}
int tsdbWriteCompIdx(SRWHelper *pHelper) {
// TODO
return 0;
}
int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
// TODO: check helper state
ASSERT(!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD));
int fd = pHelper->files.headF.fd;
if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
if (tread(fd, pHelper->pCompIdx, pHelper->compIdxSize) < pHelper->compIdxSize) return -1;
// TODO: check the checksum
if (target) memcpy(target, pHelper->pCompIdx, pHelper->compIdxSize);
helperSetState(pHelper, TSDB_HELPER_IDX_LOAD);
return 0;
}
int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
ASSERT(pIdx->offset > 0);
int fd = pHelper->files.headF.fd;
if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1;
ASSERT(pIdx->len > 0);
adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, pIdx->len);
if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < 0) return -1;
// TODO: check the checksum
// TODO: think about when target has no space for the content
if (target) memcpy(target, (void *)(pHelper->pCompInfo), pIdx->len);
helperSetState(pHelper, TSDB_HELPER_INFO_LOAD);
return 0;
}
int tsdbLoadCompData(SRWHelper *pHelper, int blkIdx, void *target) {
// TODO
return 0;
}
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int32_t *colIds, int numOfColIds) {
// TODO
return 0;
}
int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
// TODO
return 0;
}
static int tsdbCheckHelperCfg(SHelperCfg *pCfg) {
// TODO
return 0;
}
static void tsdbInitHelperFile(SHelperFile *pHFile) {
pHFile->fid = -1;
pHFile->headF.fd = -1;
pHFile->dataF.fd = -1;
pHFile->lastF.fd = -1;
pHFile->nHeadF.fd = -1;
pHFile->nLastF.fd = -1;
}
static void tsdbClearHelperFile(SHelperFile *pHFile) {
pHFile->fid = -1;
if (pHFile->headF.fd > 0) {
close(pHFile->headF.fd);
pHFile->headF.fd = -1;
}
if (pHFile->dataF.fd > 0) {
close(pHFile->dataF.fd);
pHFile->dataF.fd = -1;
}
if (pHFile->lastF.fd > 0) {
close(pHFile->lastF.fd);
pHFile->lastF.fd = -1;
}
if (pHFile->nHeadF.fd > 0) {
close(pHFile->nHeadF.fd);
pHFile->nHeadF.fd = -1;
}
if (pHFile->nLastF.fd > 0) {
close(pHFile->nLastF.fd);
pHFile->nLastF.fd = -1;
}
}
static int tsdbInitHelperRead(SRWHelper *pHelper) {
SHelperCfg *pCfg = &(pHelper->config);
pHelper->compIdxSize = pCfg->maxTables * sizeof(SCompIdx);
if ((pHelper->pCompIdx = (SCompIdx *)malloc(pHelper->compIdxSize)) == NULL) return -1;
return 0;
}
static void tsdbDestroyHelperRead(SRWHelper *pHelper) {
tfree(pHelper->pCompIdx);
pHelper->compIdxSize = 0;
tfree(pHelper->pCompInfo);
pHelper->compInfoSize = 0;
tfree(pHelper->pCompData);
pHelper->compDataSize = 0;
tdFreeDataCols(pHelper->pDataCols[0]);
tdFreeDataCols(pHelper->pDataCols[1]);
}
static int tsdbInitHelperWrite(SRWHelper *pHelper) {
SHelperCfg *pCfg = &(pHelper->config);
pHelper->wCompIdxSize = pCfg->maxTables * sizeof(SCompIdx);
if ((pHelper->pWCompIdx = (SCompIdx *)malloc(pHelper->wCompIdxSize)) == NULL) return -1;
return 0;
}
static void tsdbDestroyHelperWrite(SRWHelper *pHelper) {
tfree(pHelper->pWCompIdx);
pHelper->wCompIdxSize = 0;
tfree(pHelper->pWCompInfo);
pHelper->wCompInfoSize = 0;
tfree(pHelper->pWCompData);
pHelper->wCompDataSize = 0;
}
static void tsdbClearHelperRead(SRWHelper *pHelper) {
// TODO
}
static void tsdbClearHelperWrite(SRWHelper *pHelper) {
// TODO
}
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
// TODO
return 0;
}
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock,
bool isLast) {
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints);
int64_t offset = lseek(pFile->fd, 0, SEEK_END);
if (offset < 0) goto _err;
SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pDataCols->numOfCols);
if (pCompData == NULL) goto _err;
int nColsNotAllNull = 0;
int32_t toffset;
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
SDataCol *pDataCol = pDataCols->cols + ncol;
SCompCol *pCompCol = pCompData->cols + nColsNotAllNull;
if (0) {
// TODO: all data are NULL
continue;
}
pCompCol->colId = pDataCol->colId;
pCompCol->type = pDataCol->type;
pCompCol->len = pDataCol->len;
pCompCol->offset = toffset;
nColsNotAllNull++;
toffset += pCompCol->len;
}
pCompData->delimiter = TSDB_FILE_DELIMITER;
pCompData->uid = pHelper->tableInfo.uid;
pCompData->numOfCols = nColsNotAllNull;
size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull;
if (twrite(pFile->fd, (void *)pCompData, tsize) < tsize) goto _err;
for (int i = 0; i < pDataCols->numOfCols; i++) {
SDataCol *pDataCol = pCompData->cols + i;
SCompCol *pCompCol = NULL;
if (twrite(pFile->fd, (void *)(pDataCol->pData), pCompCol->len) < pCompCol->len) goto _err;
}
pCompBlock->last = isLast;
pCompBlock->offset = offset;
// pCOmpBlock->algorithm = ;
pCompBlock->numOfPoints = rowsToWrite;
pCompBlock->sversion = pHelper->tableInfo.sversion;
// pCompBlock->len = ;
// pCompBlock->numOfSubBlocks = ;
pCompBlock->numOfCols = nColsNotAllNull;
// pCompBlock->keyFirst = ;
// pCompBlock->keyLast = ;
return 0;
_err:
return -1;
}
// static int compareKeyBlock(const void *arg1, const void *arg2);
// /**
// * Init a read-write helper object for read or write usage.
// */
// int tsdbInitHelper(SRWHelper *pHelper, int maxTables, tsdb_rwhelper_t type, int maxRowSize, int maxRows,
// int maxCols) {
// if (pHelper == NULL) return -1;
// memset((void *)pHelper, 0, sizeof(SRWHelper));
// for (int ftype = TSDB_RW_HEADF; ftype <= TSDB_RW_LF; ftype++) {
// pHelper->files[ftype] = -1;
// }
// // Set type
// pHelper->type = type;
// // Set global configuration
// pHelper->maxTables = maxTables;
// pHelper->maxRowSize = maxRowSize;
// pHelper->maxRows = maxRows;
// pHelper->maxCols = maxCols;
// // Allocate SCompIdx part memory
// pHelper->compIdxSize = sizeof(SCompIdx) * maxTables;
// pHelper->pCompIdx = (SCompIdx *)malloc(pHelper->compIdxSize);
// if (pHelper->pCompIdx == NULL) goto _err;
// pHelper->compDataSize = sizeof(SCompData) + sizeof(SCompCol) * maxCols;
// pHelper->pCompData = (SCompData *)malloc(pHelper->compDataSize);
// pHelper->pDataCols = tdNewDataCols(maxRowSize, maxCols, maxRows);
// if (pHelper->pDataCols == NULL) goto _err;
// return 0;
// _err:
// tsdbDestroyHelper(pHelper);
// return -1;
// }
// void tsdbResetHelper(SRWHelper *pHelper) {
// if (pHelper->headF.fd > 0) {
// close(pHelper->headF.fd);
// pHelper->headF.fd = -1;
// }
// if (pHelper->dataF.fd > 0) {
// close(pHelper->dataF.fd);
// pHelper->dataF.fd = -1;
// }
// if (pHelper->lastF.fd > 0) {
// close(pHelper->lastF.fd);
// pHelper->lastF.fd = -1;
// }
// if (pHelper->hF.fd > 0) {
// close(pHelper->hF.fd);
// pHelper->hF.fd = -1;
// }
// if (pHelper->lF.fd > 0) {
// close(pHelper->lF.fd);
// pHelper->lF.fd = -1;
// }
// pHelper->state = 0;
// tdResetDataCols(pHelper->pDataCols);
// }
// int tsdbDestroyHelper(SRWHelper *pHelper) {
// if (pHelper->headF.fd > 0) close(pHelper->headF.fd);
// if (pHelper->dataF.fd > 0) close(pHelper->dataF.fd);
// if (pHelper->lastF.fd > 0) close(pHelper->lastF.fd);
// if (pHelper->hF.fd > 0) close(pHelper->hF.fd);
// if (pHelper->lF.fd > 0) close(pHelper->lF.fd);
// if (pHelper->pCompIdx) free(pHelper->pCompIdx);
// if (pHelper->pCompInfo) free(pHelper->pCompInfo);
// if (pHelper->pCompData) free(pHelper->pCompData);
// memset((void *)pHelper, 0, sizeof(SRWHelper));
// return 0;
// }
// int tsdbSetHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
// if (pHelper->state != 0) return -1;
// pHelper->fid = pGroup->fileId;
// pHelper->headF = pGroup->files[TSDB_FILE_TYPE_HEAD];
// pHelper->headF.fd = -1;
// pHelper->dataF = pGroup->files[TSDB_FILE_TYPE_DATA];
// pHelper->dataF.fd = -1;
// pHelper->lastF = pGroup->files[TSDB_FILE_TYPE_LAST];
// pHelper->lastF.fd = -1;
// if (pHelper->mode == TSDB_WRITE_HELPER) {
// char *fnameCpy = strdup(pHelper->headF.fname);
// if (fnameCpy == NULL) return -1;
// char *dataDir = dirname(fnameCpy);
// memset((void *)(&pHelper->hF), 0, sizeof(SFile));
// memset((void *)(&pHelper->lF), 0, sizeof(SFile));
// pHelper->hF.fd = -1;
// pHelper->lF.fd = -1;
// tsdbGetFileName(dataDir, pHelper->fid, ".h", pHelper->hF.fname);
// tsdbGetFileName(dataDir, pHelper->fid, ".l", pHelper->lF.fname);
// free((char *)fnameCpy);
// }
// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_FILE_SET);
// return 0;
// }
// static int tsdbNeedToCreateNewLastFile() {
// // TODO
// return 0;
// }
// int tsdbCloseHelperFile(SRWHelper *pHelper, int hasErr) {
// int ret = 0;
// if (pHelper->headF.fd > 0) {
// close(pHelper->headF.fd);
// pHelper->headF.fd = -1;
// }
// if (pHelper->dataF.fd > 0) {
// close(pHelper->dataF.fd);
// pHelper->dataF.fd = -1;
// }
// if (pHelper->lastF.fd > 0) {
// close(pHelper->lastF.fd);
// pHelper->lastF.fd = -1;
// }
// if (pHelper->hF.fd > 0) {
// close(pHelper->hF.fd);
// pHelper->hF.fd = -1;
// if (hasErr) remove(pHelper->hF.fname);
// }
// if (pHelper->lF.fd > 0) {
// close(pHelper->lF.fd);
// pHelper->lF.fd = -1;
// if (hasErr) remove(pHelper->hF.fname);
// }
// return 0;
// }
// int tsdbOpenHelperFile(SRWHelper *pHelper) {
// if (pHelper->state != TSDB_RWHELPER_FILE_SET) return -1;
// if (pHelper->mode == TSDB_READ_HELPER) { // The read helper
// if (tsdbOpenFile(&pHelper->headF, O_RDONLY) < 0) goto _err;
// if (tsdbOpenFile(&pHelper->dataF, O_RDONLY) < 0) goto _err;
// if (tsdbOpenFile(&pHelper->lastF, O_RDONLY) < 0) goto _err;
// } else {
// if (tsdbOpenFile(&pHelper->headF, O_RDONLY) < 0) goto _err;
// if (tsdbOpenFile(&pHelper->dataF, O_RDWR) < 0) goto _err;
// if (tsdbOpenFile(&pHelper->lastF, O_RDWR) < 0) goto _err;
// // Open .h and .l file
// if (tsdbOpenFile(&pHelper->hF, O_WRONLY | O_CREAT) < 0) goto _err;
// if (tsdbNeedToCreateNewLastFile()) {
// if (tsdbOpenFile(&pHelper->lF, O_WRONLY | O_CREAT) < 0) goto _err;
// }
// }
// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_FILE_OPENED);
// return 0;
// _err:
// tsdbCloseHelperFile(pHelper, 1);
// return -1;
// }
// int tsdbLoadCompIdx(SRWHelper *pHelper) {
// if (pHelper->state != (TSDB_RWHELPER_FILE_SET | TSDB_RWHELPER_FILE_OPENED)) return -1;
// if (lseek(pHelper->headF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
// if (tread(pHelper->headF.fd, (void *)(pHelper->pCompIdx), pHelper->compIdxSize) < pHelper->compIdxSize) return -1;
// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_COMPIDX_LOADED);
// return 0;
// }
// int tsdbSetHelperTable(SRWHelper *pHelper, int32_t tid, int64_t uid, STSchema *pSchema) {
// // TODO: add some check information
// pHelper->tid = tid;
// pHelper->uid = uid;
// tdInitDataCols(pHelper->pDataCols, pSchema);
// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_TABLE_SET);
// return 0;
// }
// int tsdbLoadCompBlocks(SRWHelper *pHelper) {
// SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tid;
// if (pIdx->offset <= 0) return 0;
// if (lseek(pHelper->headF.fd, pIdx->offset, SEEK_SET) < 0) return -1;
// if (pHelper->compInfoSize < pIdx->len) {
// pHelper->pCompInfo = (SCompInfo *)realloc((void *)(pHelper->pCompInfo), pIdx->len);
// if (pHelper->pCompInfo == NULL) return -1;
// pHelper->compInfoSize = pIdx->len;
// }
// if (tread(pHelper->headF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_COMPBLOCK_LOADED);
// return 0;
// }
// int tsdbRWHelperSetBlockIdx(SRWHelper *pHelper, int blkIdx) {
// SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tid;
// if (blkIdx > pIdx->numOfSuperBlocks) return -1;
// pHelper->blkIdx = blkIdx;
// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_BLOCKIDX_SET);
// return 0;
// }
// int tsdbRWHelperLoadCompData(SRWHelper *pHelper) {
// SCompBlock *pBlock = pHelper->pCompInfo->blocks + pHelper->blkIdx;
// if (pBlock->numOfSubBlocks == 1) { // Only one super block
// size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols;
// if (size > pHelper->compDataSize) {
// pHelper->pCompData = (SCompData *)realloc((void *)pHelper->pCompData, size);
// if (pHelper->pCompData == NULL) return -1;
// pHelper->compDataSize = size;
// }
// if (lseek(pHelper->dataF.fd, pBlock->offset, SEEK_SET) < 0) return -1;
// if (tread(pHelper->dataF.fd, (void *)(pHelper->pCompData), size) < size) return -1;
// } else { // TODO: More sub blocks
// }
// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_COMPCOL_LOADED);
// return 0;
// }
// static int compColIdCompCol(const void *arg1, const void *arg2) {
// int colId = *(int *)arg1;
// SCompCol *pCompCol = (SCompCol *)arg2;
// return (int)(colId - pCompCol->colId);
// }
// static int compColIdDataCol(const void *arg1, const void *arg2) {
// int colId = *(int *)arg1;
// SDataCol *pDataCol = (SDataCol *)arg2;
// return (int)(colId - pDataCol->colId);
// }
// int tsdbRWHelperLoadColData(SRWHelper *pHelper, int colId) {
// SCompBlock *pBlock = pHelper->pCompInfo->blocks + pHelper->blkIdx;
// if (pBlock->numOfSubBlocks == 1) { // only one super block
// SCompCol *pCompCol = bsearch((void *)(&colId), (void *)(pHelper->pCompData->cols), pBlock->numOfCols, compColIdCompCol, compColIdCompCol);
// if (pCompCol == NULL) return 0; // No data to read from this block , but we still return 0
// SDataCol *pDataCol = bsearch((void *)(&colId), (void *)(pHelper->pDataCols->cols), pHelper->pDataCols->numOfCols, sizeof(SDataCol), compColIdDataCol);
// assert(pDataCol != NULL);
// int fd = (pBlock->last) ? pHelper->lastF.fd : pHelper->dataF.fd;
// if (lseek(fd, pBlock->offset + pCompCol->offset, SEEK_SET) < 0) return -1;
// if (tread(fd, (void *)pDataCol->pData, pCompCol->len) < pCompCol->len) return -1;
// pDataCol->len = pCompCol->len;
// } else {
// // TODO: more than 1 blocks
// }
// return 0;
// }
// int tsdbRWHelperLoadBlockData(SRWHelper *pHelper, int blkIdx) {
// SCompBlock *pBlock = pHelper->pCompInfo->blocks + pHelper->blkIdx;
// if (pBlock->numOfSubBlocks == 1) {
// for (int i = 0; i < pHelper->pDataCols->numOfCols; i++) {
// if (tsdbRWHelperLoadBlockData(pHelper, pHelper->pDataCols->cols[i].colId) < 0) return -1;
// }
// } else {
// // TODO: more than 1 block of data
// }
// return 0;
// }
// int tsdbRWHelperCopyCompBlockPart(SRWHelper *pHelper) {
// // TODO
// return 0;
// }
// int tsdbRWHelperCopyDataBlockPart(SRWHelper *pHelper ) {
// // TODO
// return 0;
// }
// int tsdbRWHelperWriteCompIdx(SRWHelper *pHelper) {
// // TODO
// if (lseek(pHelper->hF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
// if (twrite(pHelper->hF.fd, (void *)(pHelper->pCompIdx), pHelper->compIdxSize) < pHelper->compIdxSize) return -1;
// return 0;
// }
// /**
// * Load the data block from file
// *
// * @return 0 for success
// * -1 for failure
// */
// int tsdbLoadDataBlock(SRWHelper *pHelper, int bldIdx) {
// SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tid;
// if (pIdx->)
// return 0;
// }
// /**
// * Append the block to a file, either .data
// */
// int tsdbAppendBlockToFile(SRWHelper *pHelper, tsdb_rw_file_t toFile, SDataCols *pDataCols, SCompBlock *pCompBlock, bool isSuper) {
// SFile *pFile = pHelper->files + toFile;
// int64_t offset = lseek(pFile->fd, 0, SEEK_END);
// if (*offset < 0) return -1;
// SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pDataCols->numOfCols);
// if (pCompData == NULL) return -1;
// int numOfNotAllNullCols = 0;
// int32_t toffset = 0;
// for (int i = 0; i < pDataCols->numOfCols; i++) {
// SDataCol *pDataCol = pDataCols->cols + i;
// SCompCol *pCompCol = pCompData->cols + numOfNotAllNullCols;
// if (0 /* All data in this column are NULL value */) {
// continue;
// }
// pCompCol->colId = pDataCol->colId;
// pCompCol->type = pDataCol->type;
// pCompCol->len = pDataCol->len;
// // pCompCol->offset = toffset;
// numOfNotAllNullCols++;
// // toffset += pDataCol->len;
// }
// pCompData->delimiter = TSDB_FILE_DELIMITER;
// pCompData->numOfCols = numOfNotAllNullCols;
// pCompData->uid = pHelper->uid;
// size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * numOfNotAllNullCols;
// if (twrite(pFile->fd, (void *)pCompData, tsize) < 0) return -1;
// for (int i = 0; i < numOfNotAllNullCols; i++) {
// SCompCol *pCompCol = pCompData->cols + i;
// SDataCol *pDataCol = NULL; // bsearch()
// tassert(pDataCol != NULL);
// if (twrite(pFile->fd, (void *)(pDataCol->pData), pDataCol->len) < pDataCol->len) return -1;
// }
// pCompBlock->last = (toFile == TSDB_RW_DATAF) ? 0 : 1;
// pCompBlock->offset = offset;
// pCompBlock->algorithm = pHelper->compression;
// pCompBlock->numOfPoints = pDataCols->numOfPoints;
// pCompBlock->sversion = pHelper->sversion;
// // pCompBlock->len = ;
// pCompBlock->numOfSubBlocks = isSuper ? 1 : 0;
// pCompBlock->numOfCols = numOfNotAllNullCols;
// pCompBlock->keyFirst = dataColsKeyFirst(pDataCols);
// pCompBlock->keyLast = dataColsKeyLast(pDataCols);
// return 0;
// }
// /**
// * Write the whole or part of the cached data block to file.
// *
// * There are four options:
// * 1. Append the whole block as a SUPER-BLOCK at the end
// * 2. Append part/whole block as a SUPER-BLOCK and insert in the middle
// * 3. Append part/whole block as a SUB-BLOCK
// * 4. Merge part/whole block as a SUPER-BLOCK
// */
// int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
// tassert(pHelper->type == TSDB_WRITE_HELPER);
// int rowsWritten = 0;
// SCompBlock compBlock;
// SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tid;
// // if ((no old data) OR (no last block AND cached first key is larger than the max key))
// if ((pIdx->offset == 0) || (pIdx->hasLast && dataColsKeyFirst(pDataCols) > pIdx->maxKey)) {
// // Append the whole block as a SUPER-BLOCK at the end
// if (pDataCols->numOfPoints >= pHelper->minRowPerFileBlock) {
// if (tsdbAppendBlockToFile(pHelper, TSDB_RW_DATAF, pDataCols, &compBlock, true) < 0) goto _err;
// } else {
// tsdb_rw_file_t ftype = (pHelper->files[TSDB_RW_LF].fd > 0) ? TSDB_RW_LF : TSDB_RW_LASTF;
// if (tsdbAppendBlockToFile(pHelper, ftype, pDataCols, &compBlock, true) < 0) goto _err;
// }
// // Copy the compBlock part to the end
// if (IS_COMPBLOCK_LOADED(pHelper)) {
// } else {
// }
// pIdx->hasLast = compBlock.last;
// pIdx->len += sizeof(compBlock);
// pIdx->numOfSuperBlocks++;
// pIdx->maxKey = compBlock.keyLast;
// rowsWritten = pDataCols->numOfPoints;
// } else {
// // Need to find a block to merge with
// int blkIdx = 0;
// // if (has last block AND cached Key is larger than the max Key)
// if (pIdx->hasLast && dataColsKeyFirst(pDataCols) > pIdx->maxKey) {
// blkIdx = pIdx->numOfSuperBlocks - 1;
// rowsWritten = tsdbMergeDataWithBlock(pHelper, pDataCols, blkIdx);
// if (rowsWritten < 0) goto _err;
// } else {
// ASSERT(IS_COMPBLOCK_LOADED(pHelper));
// // SCompBlock *pMergeBlock = taosbsearch();
// }
// }
// return numOfPointsWritten;
// _err:
// return -1;
// }
static int compareKeyBlock(const void *arg1, const void *arg2) {
TSKEY key = *(TSKEY *)arg1;
SCompBlock *pBlock = (SCompBlock *)arg2;
if (key < pBlock->keyFirst) {
return -1;
} else if (key > pBlock->keyLast) {
return 1;
}
return 0;
}
static int nRowsLEThan(SDataCols *pDataCols, int maxKey) {
return 0;
}
static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
int rowsWritten = 0;
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
SCompBlock compBlock = {0};
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
ASSERT(blkIdx < pIdx->numOfSuperBlocks);
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(pCompBlock->numOfSubBlocks >= 1);
int rowsCanMerge = tsdbGetRowsCanBeMergedWithBlock(pHelper, blkIdx, pDataCols);
if (rowsCanMerge < 0) goto _err;
ASSERT(rowsCanMerge > 0);
if (pCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS &&
((!pCompBlock->last) || (pHelper->files.nLastF.fd < 0 &&
pCompBlock->numOfPoints + rowsCanMerge < pHelper->config.minRowsPerFileBlock))) {
SFile *pFile = NULL;
if (!pCompBlock->last) {
pFile = &(pHelper->files.dataF);
} else {
pFile = &(pHelper->files.lastF);
}
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rowsCanMerge, &compBlock, pCompBlock->last) < 0) goto _err;
// TODO: Add the sub-block
if (pCompBlock->numOfSubBlocks == 1) {
pCompBlock->numOfSubBlocks += 2;
// pCompBlock->offset = ;
// pCompBlock->len = ;
} else {
pCompBlock->numOfSubBlocks++;
}
pCompBlock->numOfPoints += rowsCanMerge;
pCompBlock->keyFirst = MIN(pCompBlock->keyFirst, dataColsKeyFirst(pDataCols));
pCompBlock->keyLast = MAX(pCompBlock->keyLast, dataColsKeyAt(pDataCols, rowsCanMerge - 1));
// Update the Idx
// pIdx->hasLast = ;
// pIdx->len =;
// pIdx->numOfSuperBlocks = ;
rowsWritten = rowsCanMerge;
} else {
// Read-Merge-Write as a super block
if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err;
tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsCanMerge);
int isLast = 0;
SFile *pFile = NULL;
if (!pCompBlock->last || (pCompBlock->numOfPoints + rowsCanMerge >= pHelper->config.minRowsPerFileBlock)) {
pFile = &(pHelper->files.dataF);
} else {
isLast = 1;
if (pHelper->files.nLastF.fd > 0) {
pFile = &(pHelper->files.nLastF);
} else {
pFile = &(pHelper->files.lastF);
}
}
if (tsdbWriteBlockToFile(pHelper, pFile, pHelper->pDataCols[0], pCompBlock->numOfPoints + rowsCanMerge, &compBlock, isLast) < 0) goto _err;
*pCompBlock = compBlock;
pIdx->maxKey = MAX(pIdx->maxKey, compBlock.keyLast);
// pIdx->hasLast = ;
// pIdx->
}
return rowsWritten;
_err:
return -1;
}
static int compTSKEY(const void *key1, const void *key2) { return ((TSKEY *)key1 - (TSKEY *)key2); }
// Get the number of rows the data can be merged into the block
static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
int rowsCanMerge = 0;
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(blkIdx < pIdx->numOfSuperBlocks);
TSKEY keyMax = (blkIdx < pIdx->numOfSuperBlocks + 1) ? (pCompBlock + 1)->keyFirst - 1 : pHelper->files.maxKey;
if (keyFirst > pCompBlock->keyLast) {
void *ptr = taosbsearch((void *)(&keyMax), pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY),
compTSKEY, TD_LE);
ASSERT(ptr != NULL);
rowsCanMerge =
MIN((TSKEY *)ptr - (TSKEY *)pDataCols->cols[0].pData, pHelper->config.minRowsPerFileBlock - pCompBlock->numOfPoints);
} else {
int32_t colId[1] = {0};
if (tsdbLoadBlockDataCols(pHelper, NULL, &colId, 1) < 0) goto _err;
int iter1 = 0; // For pDataCols
int iter2 = 0; // For loaded data cols
while (1) {
if (iter1 >= pDataCols->numOfPoints || iter2 >= pHelper->pDataCols[0]->numOfPoints) break;
if (pCompBlock->numOfPoints + rowsCanMerge >= pHelper->config.maxRowsPerFileBlock) break;
TSKEY key1 = dataColsKeyAt(pDataCols, iter1);
TSKEY key2 = dataColsKeyAt(pHelper->pDataCols[0], iter2);
if (key1 > keyMax) break;
if (key1 < key2) {
iter1++;
} else if (key1 == key2) {
iter1++;
iter2++;
} else {
iter2++;
rowsCanMerge++;
}
}
}
return rowsCanMerge;
_err:
return -1;
}
\ No newline at end of file
......@@ -369,14 +369,14 @@ static int32_t getFileCompInfo(STableCheckInfo* pCheckInfo, SFileGroup* fileGrou
fileGroup->files[TSDB_FILE_TYPE_HEAD].fd = open(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname, O_RDONLY);
}
tsdbLoadCompIdx(fileGroup, pCheckInfo->compIndex, 10000); // todo set dynamic max tables
SCompIdx* compIndex = &pCheckInfo->compIndex[pCheckInfo->tableId.tid];
// tsdbLoadCompIdx(fileGroup, pCheckInfo->compIndex, 10000); // todo set dynamic max tables
// SCompIdx* compIndex = &pCheckInfo->compIndex[pCheckInfo->tableId.tid];
if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file
// if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file
} else {
tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo);
}
// } else {
// tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo);
// }
return TSDB_CODE_SUCCESS;
}
......@@ -444,7 +444,7 @@ static bool doLoadDataFromFileBlock(STsdbQueryHandle *pQueryHandle) {
pFile->fd = open(pFile->fname, O_RDONLY);
}
tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data);
// tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data);
return true;
}
......@@ -810,10 +810,10 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
pFile->fd = open(pFile->fname, O_RDONLY);
}
if (tsdbLoadDataBlock(pFile, &pCheckInfo->pCompInfo->blocks[cur->slot], 1,
pCheckInfo->pDataCols, data) == 0) {
blockLoaded = true;
}
// if (tsdbLoadDataBlock(pFile, &pCheckInfo->pCompInfo->blocks[cur->slot], 1,
// pCheckInfo->pDataCols, data) == 0) {
// blockLoaded = true;
// }
// dError("QInfo:%p fileId:%d total numOfBlks:%d blockId:%d load into memory failed due to error in disk files",
// GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->numOfBlocks, blkIdx);
......
......@@ -54,7 +54,8 @@ TEST(TsdbTest, createRepo) {
// 1. Create a tsdb repository
tsdbSetDefaultCfg(&config);
tsdb_repo_t *pRepo = tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL);
tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL);
tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
ASSERT_NE(pRepo, nullptr);
// 2. Create a normal table
......@@ -139,42 +140,42 @@ TEST(TsdbTest, createRepo) {
}
// TEST(TsdbTest, DISABLED_openRepo) {
TEST(TsdbTest, openRepo) {
tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
ASSERT_NE(repo, nullptr);
// TEST(TsdbTest, openRepo) {
// tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
// ASSERT_NE(repo, nullptr);
STsdbRepo *pRepo = (STsdbRepo *)repo;
// STsdbRepo *pRepo = (STsdbRepo *)repo;
SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1833);
// SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1833);
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
tsdbOpenFile(&pGroup->files[type], O_RDONLY);
}
// for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
// tsdbOpenFile(&pGroup->files[type], O_RDONLY);
// }
SCompIdx *pIdx = (SCompIdx *)calloc(pRepo->config.maxTables, sizeof(SCompIdx));
tsdbLoadCompIdx(pGroup, (void *)pIdx, pRepo->config.maxTables);
// SCompIdx *pIdx = (SCompIdx *)calloc(pRepo->config.maxTables, sizeof(SCompIdx));
// tsdbLoadCompIdx(pGroup, (void *)pIdx, pRepo->config.maxTables);
SCompInfo *pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + pIdx[1].len);
// SCompInfo *pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + pIdx[1].len);
tsdbLoadCompBlocks(pGroup, &pIdx[0], (void *)pCompInfo);
// tsdbLoadCompBlocks(pGroup, &pIdx[0], (void *)pCompInfo);
int blockIdx = 0;
SCompBlock *pBlock = &(pCompInfo->blocks[blockIdx]);
// int blockIdx = 0;
// SCompBlock *pBlock = &(pCompInfo->blocks[blockIdx]);
SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols);
// SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols);
tsdbLoadCompCols(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, (void *)pCompData);
// tsdbLoadCompCols(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, (void *)pCompData);
STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid);
SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(pTable->schema), 5, 10);
tdInitDataCols(pDataCols, pTable->schema);
// STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid);
// SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(pTable->schema), 5, 10);
// tdInitDataCols(pDataCols, pTable->schema);
tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, 1, pDataCols, pCompData);
// tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, 1, pDataCols, pCompData);
int k = 0;
// int k = 0;
}
// }
TEST(TsdbTest, DISABLED_createFileGroup) {
SFileGroup fGroup;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册