提交 2b14e552 编写于 作者: H hzcheng

TD-34

上级 5470057d
...@@ -355,7 +355,18 @@ void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { ...@@ -355,7 +355,18 @@ void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
} }
// Pop pointsToPop points from the SDataCols // Pop pointsToPop points from the SDataCols
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
int pointsLeft = pCols->numOfPoints - pointsToPop;
for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
SDataCol *p_col = pCols->cols + iCol;
if (p_col->len > 0) {
p_col->len = TYPE_BYTES[p_col->type] * pointsLeft;
if (pointsLeft > 0) {
memmove((void *)(p_col->pData), (void *)((char *)(p_col->pData) + TYPE_BYTES[p_col->type] * pointsToPop), p_col->len);
}
}
}
pCols->numOfPoints = pointsLeft;
} }
/** /**
......
...@@ -148,8 +148,6 @@ int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SD ...@@ -148,8 +148,6 @@ int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SD
// TODO: need an API to merge all sub-block data into one // TODO: need an API to merge all sub-block data into one
int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols);
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -196,65 +196,6 @@ int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void ...@@ -196,65 +196,6 @@ int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void
return 0; return 0;
} }
static int tsdbWriteBlockToFileImpl(SFile * pFile, // File to write
SDataCols * pCols, // Data column buffer
int numOfPointsToWrie, // Number of points to write to the file
SCompBlock *pBlock // SCompBlock to hold block information to return
) {
// pBlock->last = 0;
// pBlock->offset = lseek(pFile->fd, 0, SEEK_END);
// // pBlock->algorithm = ;
// pBlock->numOfPoints = pCols->numOfPoints;
// // pBlock->sversion = ;
// // pBlock->len = ;
// pBlock->numOfSubBlocks = 1;
// pBlock->keyFirst = dataColsKeyFirst(pCols);
// pBlock->keyLast = dataColsKeyLast(pCols);
// for (int i = 0; i < pCols->numOfCols; i++) {
// // TODO: if all col value is NULL, do not save it
// pBlock->numOfCols++;
// pCompData->numOfCols++;
// SCompCol *pCompCol = pCompData->cols + i;
// pCompCol->colId = pCols->cols[i].colId;
// pCompCol->type = pCols->cols[i].type;
// // pCompCol->len = ;
// // pCompCol->offset = ;
// }
return 0;
}
int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols) {
memset((void *)pBlock, 0, sizeof(SCompBlock));
SFile *pFile = NULL;
SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols);
if (pCompData == NULL) return -1;
pCompData->delimiter = TSDB_FILE_DELIMITER;
// pCompData->uid = ;
if (isMerge) {
TSKEY keyFirst = dataColsKeyFirst(pCols);
// 1. Binary search the block the data can merged into
if (1/* the data should only merged into last file */) {
} else {
}
} else {
// Write directly to the file without merge
if (1/*pCols->numOfPoints < pCfg->minRowsPerFileBlock*/) {
// TODO: write the data to the last file
} else {
// TODO: wirte the data to the data file
}
}
// TODO: need to update pIdx
if (pCompData) free(pCompData);
return 0;
}
static int compFGroupKey(const void *key, const void *fgroup) { static int compFGroupKey(const void *key, const void *fgroup) {
int fid = *(int *)key; int fid = *(int *)key;
SFileGroup *pFGroup = (SFileGroup *)fgroup; SFileGroup *pFGroup = (SFileGroup *)fgroup;
......
...@@ -967,9 +967,26 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -967,9 +967,26 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols); tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols);
if (pCols->numOfPoints == 0) break; if (pCols->numOfPoints == 0) break;
// TODO int pointsWritten = 0;
int pointsWritten = 0; /* tsdbWriteBlockToFile(); */ // { // TODO : try to write the block data to file
// if (!isCompBlockLoaded) { // Just append
// if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // write directly to .data file
// lseek(pGroup->files[TSDB_FILE_TYPE_DATA], 0, SEEK_END);
// } else {
// if (isNewLastFile) { // write directly to .l file
// } else { // write directly to .last file
// }
// }
// } else { // Need to append
// // SCompBlock *pTBlock = NULL;
// }
// }
pointsWritten = pCols->numOfPoints;
tdPopDataColsPoints(pCols, pointsWritten); tdPopDataColsPoints(pCols, pointsWritten);
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints;
} }
// Write the SCompBlock part // Write the SCompBlock part
...@@ -1023,3 +1040,16 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK ...@@ -1023,3 +1040,16 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK
} }
return 0; return 0;
} }
static int tsdbWriteBlockToFile(STsdbRepo *pRepo, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock) {
STsdbCfg *pCfg = &(pRepo->config);
memset((void *)pCompBlock, 0, sizeof(SCompBlock));
if (pCompInfo == NULL) {
// Just need to append to file
} else {
// Need to merge
}
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册