提交 ffb02d97 编写于 作者: H hzcheng

TD-34

上级 f0bd200e
...@@ -138,11 +138,10 @@ SListNode *tdListPopNode(SList *list, SListNode *node) { ...@@ -138,11 +138,10 @@ SListNode *tdListPopNode(SList *list, SListNode *node) {
// Move all node elements from src to dst, the dst is assumed as an empty list // Move all node elements from src to dst, the dst is assumed as an empty list
void tdListMove(SList *src, SList *dst) { void tdListMove(SList *src, SList *dst) {
// assert(dst->eleSize == src->eleSize); // assert(dst->eleSize == src->eleSize);
dst->numOfEles = src->numOfEles; SListNode *node = NULL;
dst->head = src->head; while ((node = tdListPopHead(src)) != NULL) {
dst->tail = src->tail; tdListAppendNode(dst, node);
src->numOfEles = 0; }
src->head = src->tail = NULL;
} }
void tdListNodeGetData(SList *list, SListNode *node, void *target) { memcpy(target, node->data, list->eleSize); } void tdListNodeGetData(SList *list, SListNode *node, void *target) { memcpy(target, node->data, list->eleSize); }
......
...@@ -122,6 +122,15 @@ typedef struct { ...@@ -122,6 +122,15 @@ typedef struct {
} SCompInfo; } SCompInfo;
#define TSDB_COMPBLOCK_AT(pCompInfo, idx) ((pCompInfo)->blocks + (idx)) #define TSDB_COMPBLOCK_AT(pCompInfo, idx) ((pCompInfo)->blocks + (idx))
#define TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pCompBlock, size)\
do {\
if (pCompBlock->numOfSubBlocks > 1) {\
pCompBlock = pCompInfo->blocks + pCompBlock->offset;\
size = pCompBlock->numOfSubBlocks;\
} else {\
size = 1;\
}\
} while (0)
// TODO: take pre-calculation into account // TODO: take pre-calculation into account
typedef struct { typedef struct {
...@@ -147,6 +156,8 @@ int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf); ...@@ -147,6 +156,8 @@ int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf);
int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf); int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf);
int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData); int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData);
SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid);
// TODO: need an API to merge all sub-block data into one // TODO: need an API to merge all sub-block data into one
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey);
......
...@@ -35,7 +35,6 @@ static int compFGroup(const void *arg1, const void *arg2); ...@@ -35,7 +35,6 @@ static int compFGroup(const void *arg1, const void *arg2);
static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname); static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname);
static int tsdbWriteFileHead(SFile *pFile); static int tsdbWriteFileHead(SFile *pFile);
static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables);
static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid);
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) {
STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles); STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles);
...@@ -309,7 +308,7 @@ void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t file ...@@ -309,7 +308,7 @@ void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t file
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
} }
static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid) { SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid) {
if (pFileH->numOfFGroups == 0 || fid < pFileH->fGroup[0].fileId || fid > pFileH->fGroup[pFileH->numOfFGroups - 1].fileId) if (pFileH->numOfFGroups == 0 || fid < pFileH->fGroup[0].fileId || fid > pFileH->fGroup[pFileH->numOfFGroups - 1].fileId)
return NULL; return NULL;
void *ptr = bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey); void *ptr = bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey);
......
...@@ -90,6 +90,8 @@ static void * tsdbCommitData(void *arg); ...@@ -90,6 +90,8 @@ static void * tsdbCommitData(void *arg);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols); static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols);
static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey); static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey);
static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey); static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey);
static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len,
int64_t uid);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name) #define TSDB_GET_TABLE_BY_NAME(pRepo, name)
...@@ -330,13 +332,13 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { ...@@ -330,13 +332,13 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) {
pRepo->tsdbCache->imem = pRepo->tsdbCache->mem; pRepo->tsdbCache->imem = pRepo->tsdbCache->mem;
pRepo->tsdbCache->mem = NULL; pRepo->tsdbCache->mem = NULL;
pRepo->tsdbCache->curBlock = NULL; pRepo->tsdbCache->curBlock = NULL;
tsdbUnLockRepo(repo);
// TODO: here should set as detached or use join for memory leak // TODO: here should set as detached or use join for memory leak
pthread_attr_t thattr; pthread_attr_t thattr;
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
pthread_create(&(pRepo->commitThread), &thattr, tsdbCommitData, (void *)repo); pthread_create(&(pRepo->commitThread), &thattr, tsdbCommitData, (void *)repo);
tsdbUnLockRepo(repo);
return 0; return 0;
} }
...@@ -814,7 +816,9 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables) ...@@ -814,7 +816,9 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables)
} }
if (!tSkipListIterNext(iters[tid])) { if (!tSkipListIterNext(iters[tid])) {
assert(false); // No data in this iterator
tSkipListDestroyIter(iters[tid]);
iters[tid] = NULL;
} }
} }
...@@ -839,8 +843,8 @@ static void *tsdbCommitData(void *arg) { ...@@ -839,8 +843,8 @@ static void *tsdbCommitData(void *arg) {
} }
// Create a data column buffer for commit // Create a data column buffer for commit
SDataCols *pCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock); SDataCols *pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock);
if (pCols == NULL) { if (pDataCols == NULL) {
// TODO: deal with the error // TODO: deal with the error
return NULL; return NULL;
} }
...@@ -849,16 +853,15 @@ static void *tsdbCommitData(void *arg) { ...@@ -849,16 +853,15 @@ static void *tsdbCommitData(void *arg) {
int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision);
for (int fid = sfid; fid <= efid; fid++) { for (int fid = sfid; fid <= efid; fid++) {
if (tsdbCommitToFile(pRepo, fid, iters, pCols) < 0) { if (tsdbCommitToFile(pRepo, fid, iters, pDataCols) < 0) {
// TODO: deal with the error here // TODO: deal with the error here
// assert(0); // assert(0);
} }
} }
tdFreeDataCols(pCols); tdFreeDataCols(pDataCols);
tsdbDestroyTableIters(iters, pCfg->maxTables); tsdbDestroyTableIters(iters, pCfg->maxTables);
tsdbLockRepo(arg); tsdbLockRepo(arg);
tdListMove(pCache->imem->list, pCache->pool.memPool); tdListMove(pCache->imem->list, pCache->pool.memPool);
free(pCache->imem); free(pCache->imem);
...@@ -918,25 +921,52 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -918,25 +921,52 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { /* TODO */ if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { /* TODO */
} }
lseek(hFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET);
// Loop to commit data in each table // Loop to commit data in each table
for (int tid = 0; tid < pCfg->maxTables; tid++) { for (int tid = 0; tid < pCfg->maxTables; tid++) {
STable * pTable = pMeta->tables[tid]; STable * pTable = pMeta->tables[tid];
SSkipListIterator *pIter = iters[tid]; SSkipListIterator *pIter = iters[tid];
SCompIdx * pIdx = &pIndices[tid]; SCompIdx * pIdx = &pIndices[tid];
if (pTable == NULL || pIter == NULL) continue;
/* If no new data to write for this table, just write the old data to new file
* if there are.
*/
if (!tsdbHasDataInRange(pIter, minKey, maxKey)) { if (!tsdbHasDataInRange(pIter, minKey, maxKey)) {
// This table does not have data in this range, just copy its head part and last // has old data
// part (if neccessary) to new file if (pIdx->offset > 0) {
if (pIdx->offset > 0) { // has old data
if (isNewLastFile && pIdx->hasLast) { if (isNewLastFile && pIdx->hasLast) {
// Need to load SCompBlock part and copy to new file // need to move the last block to new file
if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */ if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */
} }
if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */
} }
// TODO: Copy the last block from old last file to new file tdInitDataCols(pCols, pTable->schema);
// tsdbCopyBlockData()
SCompBlock *pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks);
int nBlocks = 0;
TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pTBlock, nBlocks);
SCompBlock tBlock;
int64_t toffset, tlen;
tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_LAST], pTBlock, nBlocks, pCols, &tBlock);
tsdbWriteBlockToFileImpl(&lFile, pCols, pCols->numOfPoints, &toffset, tlen, pTable->tableId.uid);
pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks);
pTBlock->offset = toffset;
pTBlock->len = tlen;
pTBlock->numOfPoints = pCols->numOfPoints;
pTBlock->numOfSubBlocks = 1;
pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR);
if (nBlocks > 1) {
pIdx->len -= (sizeof(SCompBlock) * nBlocks);
}
write(hFile.fd, (void *)pCompInfo, pIdx->len);
} else { } else {
pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR); pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR);
sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len); sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len);
...@@ -951,6 +981,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -951,6 +981,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
if (pIdx->offset > 0) { if (pIdx->offset > 0) {
if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) { if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) {
// has last block || cache key overlap with commit key // has last block || cache key overlap with commit key
pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100);
if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */
} }
if (pCompInfo->uid == pTable->tableId.uid) isCompBlockLoaded = 1; if (pCompInfo->uid == pTable->tableId.uid) isCompBlockLoaded = 1;
...@@ -984,7 +1015,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -984,7 +1015,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
// // SCompBlock *pTBlock = NULL; // // SCompBlock *pTBlock = NULL;
// } // }
// } // }
pointsWritten = pCols->numOfPoints; // pointsWritten = pCols->numOfPoints;
tdPopDataColsPoints(pCols, pointsWritten); tdPopDataColsPoints(pCols, pointsWritten);
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints; maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册