提交 f29b0fca 编写于 作者: H hzcheng

TD-34

上级 632d1865
......@@ -132,6 +132,7 @@ void tdResetDataCols(SDataCols *pCols);
void tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
void tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols);
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop);
#ifdef __cplusplus
}
......
......@@ -353,6 +353,10 @@ void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
}
pCols->numOfPoints++;
}
// Pop pointsToPop points from the SDataCols
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
}
/**
* Return the first part length of a data row for a schema
......
......@@ -134,12 +134,15 @@ typedef struct {
int64_t uid; // For recovery usage
SCompCol cols[];
} SCompData;
int tsdbCopyCompBlockToFile(SFile *outFile, SFile *inFile, SCompInfo *pCompInfo, int index, int isOutLast);
int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols);
int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables);
int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf);
int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf);
int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf);
int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData);
// TODO: need an API to merge all sub-block data into one
int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols);
......
......@@ -103,8 +103,59 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) {
return 0;
}
int tsdbCopyCompBlockToFile(SFile *outFile, SFile *inFile, SCompInfo *pCompInfo, int index, int isOutLast) {
// TODO
int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) {
SCompBlock *pBlock = pStartBlock;
for (int i = 0; i < numOfBlocks; i++) {
if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1;
for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) {
SCompCol *pCompCol = &(pCompData->cols[iCol]);
pCols->numOfPoints += pBlock->numOfPoints;
int k = 0;
for (; k < pCols->numOfCols; k++) {
if (pCompCol->colId == pCols->cols[k].colId) break;
}
if (tsdbLoadColData(pFile, pCompCol, pBlock->offset,
(void *)((char *)(pCols->cols[k].pData) + pCols->cols[k].len)) < 0)
return -1;
}
pStartBlock++;
}
return 0;
}
int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols) {
SCompBlock *pSuperBlock = TSDB_COMPBLOCK_AT(pCompInfo, idx);
SCompBlock *pStartBlock = NULL;
SCompBlock *pBlock = NULL;
int numOfBlocks = pSuperBlock->numOfSubBlocks;
if (numOfBlocks == 1)
pStartBlock = pSuperBlock;
else
pStartBlock = TSDB_COMPBLOCK_AT(pCompInfo, pSuperBlock->offset);
int maxNumOfCols = 0;
pBlock = pStartBlock;
for (int i = 0; i < numOfBlocks; i++) {
if (pBlock->numOfCols > maxNumOfCols) maxNumOfCols = pBlock->numOfCols;
pBlock++;
}
SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * maxNumOfCols);
if (pCompData == NULL) return -1;
// Load data from the block
if (tsdbLoadDataBlock(pOutFile, pStartBlock, numOfBlocks, pCols, pCompData));
// Write data block to the file
{
// TODO
}
if (pCompData) free(pCompData);
return 0;
}
......
......@@ -88,6 +88,8 @@ static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg);
static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname);
static void * tsdbCommitData(void *arg);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols);
static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey);
static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name)
......@@ -872,13 +874,12 @@ static void *tsdbCommitData(void *arg) {
}
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols) {
int hasDataToCommit = 0;
int isNewLastFile = 0;
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
STsdbCfg * pCfg = &pRepo->config;
SFile tFile, lFile;
SFile hFile, lFile;
SFileGroup *pGroup = NULL;
SCompIdx * pIndices = NULL;
SCompInfo * pCompInfo = NULL;
......@@ -889,125 +890,181 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
TSKEY minKey = 0, maxKey = 0;
tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
for (int tid = 0; tid < pCfg->maxTables; tid++) {
STable * pTable = pMeta->tables[tid];
SSkipListIterator *pIter = iters[tid];
int isLoadCompBlocks = 0;
char dataDir[128] = "\0";
if (pIter == NULL) continue;
tdInitDataCols(pCols, pTable->schema);
int numOfWrites = 0;
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; // We keep 20% of space for merge purpose
// Loop to read columns from cache
while (tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols)) {
if (!hasDataToCommit) {
// There are data to commit to this fileId, we need to create/open it for read/write.
// At the meantime, we set the flag to prevent further create/open operations
tsdbGetDataDirName(pRepo, dataDir);
if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) {
// TODO: deal with the ERROR here
}
// Open files for commit
pGroup = tsdbOpenFilesForCommit(pFileH, fid);
if (pGroup == NULL) {
// TODO: deal with the ERROR here
}
// TODO: open .h file and if neccessary, open .l file
tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &tFile, 1, 0);
if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) {
// TODO: make it not to write the last file every time
tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0);
isNewLastFile = 1;
}
// Check if there are data to commit to this file
int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey);
if (!hasDataToCommit) return 0; // No data to commit, just return
// load the SCompIdx part
pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables);
if (pIndices == NULL) { // TODO: deal with the ERROR
}
if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { // TODO: deal with the ERROR here
}
// sendfile those not need to changed table content
lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables,
SEEK_SET);
lseek(tFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET);
for (int ttid = 0; ttid < tid; ttid++) {
SCompIdx * tIdx= &pIndices[ttid];
if (tIdx->len <= 0) continue;
if (isNewLastFile && tIdx->hasLast) {
// TODO: Need to load the SCompBlock part and copy to new last file
pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, tIdx->len);
if (pCompInfo == NULL) { /* TODO */}
if (tsdbLoadCompBlocks(pGroup, tIdx, (void *)pCompInfo) < 0) {/* TODO */}
SCompBlock *pLastBlock = TSDB_COMPBLOCK_AT(pCompInfo, tIdx->numOfSuperBlocks - 1);
int numOfSubBlocks = pLastBlock->numOfSubBlocks;
assert(pLastBlock->last);
if (tsdbCopyCompBlockToFile(&pGroup->files[TSDB_FILE_TYPE_LAST], &lFile, pCompInfo, tIdx->numOfSuperBlocks, 1 /* isOutLast*/) < 0) {/* TODO */}
{
if (numOfSubBlocks > 1) tIdx->len -= (sizeof(SCompBlock) * numOfSubBlocks);
tIdx->checksum = 0;
}
write(tFile.fd, (void *)pCompInfo, tIdx->len);
tFile.size += tIdx->len;
} else {
sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, tFile.fd, NULL, tIdx->len);
tFile.size += tIdx->len;
}
}
// Create and open files for commit
tsdbGetDataDirName(pRepo, dataDir);
if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) {/* TODO */}
pGroup = tsdbOpenFilesForCommit(pFileH, fid);
if (pGroup == NULL) {/* TODO */}
tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 0);
if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) {
// TODO: make it not to write the last file every time
tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0);
isNewLastFile = 1;
}
hasDataToCommit = 1;
}
// Load the SCompIdx
pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) *pCfg->maxTables);
if (pIndices == NULL) {/* TODO*/}
if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) {/* TODO */}
SCompIdx *pIdx = &pIndices[tid];
/* The first time to write to the table, need to decide
* if it is neccessary to load the SComplock part. If it
* is needed, just load it, or, just use sendfile and
* append it.
*/
if (numOfWrites == 0 && pIdx->offset > 0) {
if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) {
pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len);
if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) {
// TODO: deal with the ERROR here
}
if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1;
} else {
// TODO: sendfile the prefix part
// Loop to commit data in each table
for (int tid = 0; tid < pCfg->maxTables; tid++) {
STable *pTable = pMeta->tables[tid];
SSkipListIterator *pIter = iters[tid];
SCompIdx *pIdx = &pIndices[tid];
if (!tsdbHasDataInRange(pIter, minKey, maxKey)) {
// This table does not have data in this range, just copy its head part and last
// part (if neccessary) to new file
if (pIdx->len != 0) { // has old data
if (isNewLastFile && pIdx->hasLast) {
// Need to load SCompBlock part and copy to new file
if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) {/* TODO */}
if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) {/* TODO */}
// Copy the last block from old last file to new file
// tsdbCopyBlockData()
} else {
pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR);
sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len);
hFile.size += pIdx->len;
}
}
// if (tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols) < 0) {
// // TODO: deal with the ERROR here
// }
// pCompInfo = tsdbMergeBlock(pCompInfo, pBlock);
// if (1 /* the SCompBlock part is not loaded*/) {
// // Append to .data file generate a SCompBlock and record it
// } else {
// }
// // TODO: need to reset the pCols
numOfWrites++;
continue;
}
if (pCols->numOfPoints > 0) {
// TODO: still has data to commit, commit it
}
// while () {
if (1/* SCompBlock part is loaded, write it to .head file*/) {
// TODO
} else {
// TODO: use sendfile send the old part and append the newly added part
}
// }
}
// for (int tid = 0; tid < pCfg->maxTables; tid++) {
// STable * pTable = pMeta->tables[tid];
// SSkipListIterator *pIter = iters[tid];
// int isLoadCompBlocks = 0;
// char dataDir[128] = "\0";
// if (pIter == NULL) {
// if (hasDataToCommit && isNewLastFile())
// continue;
// }
// tdInitDataCols(pCols, pTable->schema);
// int numOfWrites = 0;
// int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; // We keep 20% of space for merge purpose
// // Loop to read columns from cache
// while (tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols)) {
// if (!hasDataToCommit) {
// // There are data to commit to this fileId, we need to create/open it for read/write.
// // At the meantime, we set the flag to prevent further create/open operations
// tsdbGetDataDirName(pRepo, dataDir);
// if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) {
// // TODO: deal with the ERROR here
// }
// // Open files for commit
// pGroup = tsdbOpenFilesForCommit(pFileH, fid);
// if (pGroup == NULL) {
// // TODO: deal with the ERROR here
// }
// // TODO: open .h file and if neccessary, open .l file
// tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &tFile, 1, 0);
// if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) {
// // TODO: make it not to write the last file every time
// tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0);
// isNewLastFile = 1;
// }
// // load the SCompIdx part
// pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables);
// if (pIndices == NULL) { // TODO: deal with the ERROR
// }
// if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { // TODO: deal with the ERROR here
// }
// // sendfile those not need to changed table content
// lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables,
// SEEK_SET);
// lseek(tFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET);
// for (int ttid = 0; ttid < tid; ttid++) {
// SCompIdx * tIdx= &pIndices[ttid];
// if (tIdx->len <= 0) continue;
// if (isNewLastFile && tIdx->hasLast) {
// // TODO: Need to load the SCompBlock part and copy to new last file
// pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, tIdx->len);
// if (pCompInfo == NULL) { /* TODO */}
// if (tsdbLoadCompBlocks(pGroup, tIdx, (void *)pCompInfo) < 0) {/* TODO */}
// SCompBlock *pLastBlock = TSDB_COMPBLOCK_AT(pCompInfo, tIdx->numOfSuperBlocks - 1);
// int numOfSubBlocks = pLastBlock->numOfSubBlocks;
// assert(pLastBlock->last);
// if (tsdbCopyCompBlockToFile(&pGroup->files[TSDB_FILE_TYPE_LAST], &lFile, pCompInfo, tIdx->numOfSuperBlocks, 1 /* isOutLast*/) < 0) {/* TODO */}
// {
// if (numOfSubBlocks > 1) tIdx->len -= (sizeof(SCompBlock) * numOfSubBlocks);
// tIdx->checksum = 0;
// }
// write(tFile.fd, (void *)pCompInfo, tIdx->len);
// tFile.size += tIdx->len;
// } else {
// sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, tFile.fd, NULL, tIdx->len);
// tFile.size += tIdx->len;
// }
// }
// hasDataToCommit = 1;
// }
// SCompIdx *pIdx = &pIndices[tid];
// /* The first time to write to the table, need to decide
// * if it is neccessary to load the SComplock part. If it
// * is needed, just load it, or, just use sendfile and
// * append it.
// */
// if (numOfWrites == 0 && pIdx->offset > 0) {
// if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) {
// pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len);
// if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) {
// // TODO: deal with the ERROR here
// }
// if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1;
// } else {
// // TODO: sendfile the prefix part
// }
// }
// int numOfPointsWritten = tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols);
// if (numOfPointsWritten < 0) {
// // TODO: deal with the ERROR here
// }
// // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock);
// // if (1 /* the SCompBlock part is not loaded*/) {
// // // Append to .data file generate a SCompBlock and record it
// // } else {
// // }
// // // TODO: need to reset the pCols
// numOfWrites++;
// }
// if (pCols->numOfPoints > 0) {
// // TODO: still has data to commit, commit it
// }
// if (1/* SCompBlock part is loaded, write it to .head file*/) {
// // TODO
// } else {
// // TODO: use sendfile send the old part and append the newly added part
// }
// }
// Write the SCompIdx part
// Close all files and return
......@@ -1018,5 +1075,25 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
if (pIndices) free(pIndices);
if (pCompInfo) free(pCompInfo);
return 0;
}
static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey) {
if (pIter == NULL) return 0;
SSkipListNode *node = tSkipListIterGet(pIter);
if (node == NULL) return 0;
SDataRow row = SL_GET_NODE_DATA(node);
if (dataRowKey(row) >= minKey && dataRowKey(row) <= maxKey) return 1;
return 0;
}
static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey) {
for (int i = 0; i < nIters; i++) {
SSkipListIterator *pIter = iters[i];
if (tsdbHasDataInRange(pIter, minKey, maxKey)) return 1;
}
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册