提交 7032405e 编写于 作者: H hzcheng

TD-34

上级 497300f1
......@@ -25,6 +25,8 @@
extern "C" {
#endif
#define TSDB_FILE_HEAD_SIZE 512
#define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
#define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
......@@ -69,6 +71,7 @@ typedef struct {
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles);
void tsdbCloseFileH(STsdbFileH *pFileH);
int tsdbCreateFile(char *dataDir, int fileId, char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose);
int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables);
int tsdbOpenFile(SFile *pFile, int oflag);
SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid);
......@@ -104,6 +107,9 @@ typedef struct {
TSKEY keyLast;
} SCompBlock;
#define IS_SUPER_BLOCK(pBlock) ((pBlock)->numOfSubBlocks >= 1)
#define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0)
typedef struct {
int32_t delimiter; // For recovery usage
int32_t checksum; // TODO: decide if checksum logic in this file or make it one API
......@@ -111,8 +117,7 @@ typedef struct {
SCompBlock blocks[];
} SCompInfo;
int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables);
int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf);
#define TSDB_COMPBLOCK_AT(pCompInfo, idx) ((pCompInfo)->blocks + (idx))
// TODO: take pre-calculation into account
typedef struct {
......@@ -129,6 +134,13 @@ typedef struct {
int64_t uid; // For recovery usage
SCompCol cols[];
} SCompData;
int tsdbCopyCompBlockToFile(SFile *outFile, SFile *inFile, SCompInfo *pCompInfo, int index, int isOutLast);
int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables);
int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf);
int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf);
int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf);
// TODO: need an API to merge all sub-block data into one
int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols);
......
......@@ -24,7 +24,6 @@
#include "tsdbFile.h"
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
const char *tsdbFileSuffix[] = {
......@@ -35,8 +34,7 @@ const char *tsdbFileSuffix[] = {
static int compFGroupKey(const void *key, const void *fgroup);
static int compFGroup(const void *arg1, const void *arg2);
static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname);
static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile);
static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname);
static int tsdbWriteFileHead(SFile *pFile);
static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables);
static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid);
......@@ -71,10 +69,10 @@ int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables)
SFileGroup fGroup;
SFileGroup *pFGroup = &fGroup;
if (tsdbSearchFGroup(pFileH, fid) == NULL) {
if (tsdbSearchFGroup(pFileH, fid) == NULL) { // if not exists, create one
pFGroup->fileId = fid;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
if (tsdbCreateFile(dataDir, fid, type, maxTables, &(pFGroup->files[type])) < 0) {
if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], maxTables, &(pFGroup->files[type]), type == TSDB_FILE_TYPE_HEAD ? 1 : 0, 1) < 0) {
// TODO: deal with the ERROR here, remove those creaed file
return -1;
}
......@@ -105,6 +103,10 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) {
return 0;
}
int tsdbCopyCompBlockToFile(SFile *outFile, SFile *inFile, SCompInfo *pCompInfo, int index, int isOutLast) {
// TODO
return 0;
}
int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) {
SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
......@@ -127,6 +129,22 @@ int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) {
return 0;
}
int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf) {
// assert(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) return -1;
size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols;
if (read(pFile->fd, buf, size) < 0) return -1;
return 0;
}
int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf) {
if (lseek(pFile->fd, blockBaseOffset + pCol->offset, SEEK_SET) < 0) return -1;
if (read(pFile->fd, buf, pCol->len) < 0) return -1;
return 0;
}
static int tsdbWriteBlockToFileImpl(SFile * pFile, // File to write
SDataCols * pCols, // Data column buffer
int numOfPointsToWrie, // Number of points to write to the file
......@@ -229,10 +247,10 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) {
return 0;
}
static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname) {
if (dataDir == NULL || fname == NULL || !IS_VALID_TSDB_FILE_TYPE(type)) return -1;
static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname) {
if (dataDir == NULL || fname == NULL) return -1;
sprintf(fname, "%s/f%d%s", dataDir, fileId, tsdbFileSuffix[type]);
sprintf(fname, "%s/f%d%s", dataDir, fileId, suffix);
return 0;
}
......@@ -264,12 +282,12 @@ static int tsdbCloseFile(SFile *pFile) {
return ret;
}
static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile) {
int tsdbCreateFile(char *dataDir, int fileId, char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose) {
memset((void *)pFile, 0, sizeof(SFile));
pFile->type = type;
pFile->fd = -1;
tsdbGetFileName(dataDir, fileId, type, pFile->fname);
tsdbGetFileName(dataDir, fileId, suffix, pFile->fname);
if (access(pFile->fname, F_OK) == 0) {
// File already exists
return -1;
......@@ -280,7 +298,7 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables,
return -1;
}
if (type == TSDB_FILE_TYPE_HEAD) {
if (writeHeader) {
if (tsdbWriteHeadFileIdx(pFile, maxTables) < 0) {
tsdbCloseFile(pFile);
return -1;
......@@ -292,7 +310,7 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables,
return -1;
}
tsdbCloseFile(pFile);
if (toClose) tsdbCloseFile(pFile);
return 0;
}
......
......@@ -8,6 +8,7 @@
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/sendfile.h>
#include <unistd.h>
// #include "taosdef.h"
......@@ -45,6 +46,7 @@
#define TSDB_CFG_FILE_NAME "CONFIG"
#define TSDB_DATA_DIR_NAME "data"
#define TSDB_DEFAULT_FILE_BLOCK_ROW_OPTION 0.7
#define TSDB_MAX_LAST_FILE_SIZE (1024 * 1024 * 10) // 10M
enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURING };
......@@ -775,7 +777,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
tdAppendDataRowToDataCol(row, pCols);
numOfRows++;
if (numOfRows > maxRowsToRead) break;
if (numOfRows >= maxRowsToRead) break;
} while (tSkipListIterNext(pIter));
return numOfRows;
......@@ -842,7 +844,10 @@ static void *tsdbCommitData(void *arg) {
int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision);
for (int fid = sfid; fid <= efid; fid++) {
tsdbCommitToFile(pRepo, fid, iters, pCols);
if (tsdbCommitToFile(pRepo, fid, iters, pCols) < 0) {
// TODO: deal with the error here
// assert(0);
}
}
tdFreeDataCols(pCols);
......@@ -867,7 +872,8 @@ static void *tsdbCommitData(void *arg) {
}
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols) {
int flag = 0;
int hasDataToCommit = 0;
int isNewLastFile = 0;
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
......@@ -887,97 +893,125 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
STable * pTable = pMeta->tables[tid];
SSkipListIterator *pIter = iters[tid];
int isLoadCompBlocks = 0;
char dataDir[128] = "\0";
if (pIter == NULL) continue;
tdInitDataCols(pCols, pTable->schema);
int numOfWrites = 0;
// while (tsdbReadRowsFromCache(pIter, maxKey, pCfg->maxRowsPerFileBlock, pCols)) {
// break;
// if (!flag) {
// // There are data to commit to this file, we need to create/open it for read/write.
// // At the meantime, we set the flag to prevent further create/open operations
// if (tsdbCreateFGroup(pFileH, pRepo->rootDir, fid, pCfg->maxTables) < 0) {
// // TODO: deal with the ERROR here
// }
// // Open files for commit
// pGroup = tsdbOpenFilesForCommit(pFileH, fid);
// if (pGroup == NULL) {
// // TODO: deal with the ERROR here
// }
// // TODO: open .h file and if neccessary, open .l file
// {}
// pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables);
// if (pIndices == NULL) {
// // TODO: deal with the ERROR
// }
// // load the SCompIdx part
// if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) {
// // TODO: deal with the ERROR here
// }
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; // We keep 20% of space for merge purpose
// Loop to read columns from cache
while (tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols)) {
if (!hasDataToCommit) {
// There are data to commit to this fileId, we need to create/open it for read/write.
// At the meantime, we set the flag to prevent further create/open operations
tsdbGetDataDirName(pRepo, dataDir);
// // TODO: sendfile those not need changed table content
// for (int ttid = 0; ttid < tid; ttid++) {
// // SCompIdx *pIdx = &pIndices[ttid];
// // if (pIdx->len > 0) {
// // lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, pIdx->offset, 0, SEEK_CUR);
// // sendfile(fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len);
// // }
// }
// flag = 1;
// }
if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) {
// TODO: deal with the ERROR here
}
// Open files for commit
pGroup = tsdbOpenFilesForCommit(pFileH, fid);
if (pGroup == NULL) {
// TODO: deal with the ERROR here
}
// TODO: open .h file and if neccessary, open .l file
tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &tFile, 1, 0);
if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) {
// TODO: make it not to write the last file every time
tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0);
isNewLastFile = 1;
}
// load the SCompIdx part
pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables);
if (pIndices == NULL) { // TODO: deal with the ERROR
}
if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { // TODO: deal with the ERROR here
}
// sendfile those not need to changed table content
lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables,
SEEK_SET);
lseek(tFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET);
for (int ttid = 0; ttid < tid; ttid++) {
SCompIdx * tIdx= &pIndices[ttid];
if (tIdx->len <= 0) continue;
if (isNewLastFile && tIdx->hasLast) {
// TODO: Need to load the SCompBlock part and copy to new last file
pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, tIdx->len);
if (pCompInfo == NULL) { /* TODO */}
if (tsdbLoadCompBlocks(pGroup, tIdx, (void *)pCompInfo) < 0) {/* TODO */}
SCompBlock *pLastBlock = TSDB_COMPBLOCK_AT(pCompInfo, tIdx->numOfSuperBlocks - 1);
int numOfSubBlocks = pLastBlock->numOfSubBlocks;
assert(pLastBlock->last);
if (tsdbCopyCompBlockToFile(&pGroup->files[TSDB_FILE_TYPE_LAST], &lFile, pCompInfo, tIdx->numOfSuperBlocks, 1 /* isOutLast*/) < 0) {/* TODO */}
{
if (numOfSubBlocks > 1) tIdx->len -= (sizeof(SCompBlock) * numOfSubBlocks);
tIdx->checksum = 0;
}
write(tFile.fd, (void *)pCompInfo, tIdx->len);
tFile.size += tIdx->len;
} else {
sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, tFile.fd, NULL, tIdx->len);
tFile.size += tIdx->len;
}
}
// SCompIdx *pIdx = &pIndices[tid];
// /* The first time to write to the table, need to decide
// * if it is neccessary to load the SComplock part. If it
// * is needed, just load it, or, just use sendfile and
// * append it.
// */
// if (numOfWrites == 0 && pIdx->offset > 0) {
// if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) {
// pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len);
// if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) {
// // TODO: deal with the ERROR here
// }
// if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1;
// } else {
// // TODO: sendfile the prefix part
// }
// }
hasDataToCommit = 1;
}
// // if (tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols) < 0) {
// // // TODO: deal with the ERROR here
// // }
SCompIdx *pIdx = &pIndices[tid];
// // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock);
/* The first time to write to the table, need to decide
* if it is neccessary to load the SComplock part. If it
* is needed, just load it, or, just use sendfile and
* append it.
*/
if (numOfWrites == 0 && pIdx->offset > 0) {
if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) {
pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len);
if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) {
// TODO: deal with the ERROR here
}
if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1;
} else {
// TODO: sendfile the prefix part
}
}
// if (tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols) < 0) {
// // TODO: deal with the ERROR here
// }
// // if (1 /* the SCompBlock part is not loaded*/) {
// // // Append to .data file generate a SCompBlock and record it
// // } else {
// // }
// pCompInfo = tsdbMergeBlock(pCompInfo, pBlock);
// // // TODO: need to reset the pCols
// numOfWrites++;
// if (1 /* the SCompBlock part is not loaded*/) {
// // Append to .data file generate a SCompBlock and record it
// } else {
// }
// if (pCols->numOfPoints > 0) {
// // TODO: still has data to commit, commit it
// }
// // TODO: need to reset the pCols
// if (1/* SCompBlock part is loaded, write it to .head file*/) {
// // TODO
// } else {
// // TODO: use sendfile send the old part and append the newly added part
// }
numOfWrites++;
}
if (pCols->numOfPoints > 0) {
// TODO: still has data to commit, commit it
}
if (1/* SCompBlock part is loaded, write it to .head file*/) {
// TODO
} else {
// TODO: use sendfile send the old part and append the newly added part
}
}
// Write the SCompIdx part
// Close all files and return
if (flag) {
if (hasDataToCommit) {
// TODO
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册