提交 bed8890f 编写于 作者: H hzcheng

TD-100

上级 253c7e64
......@@ -385,10 +385,11 @@ typedef struct {
SHelperFile files;
SHelperTable tableInfo;
SCompIdx compIdx; // SCompIdx of current table
// ---------- For read purpose
int8_t state; // current loading state
// Information in .head file
SCompIdx *pCompIdx;
size_t compIdxSize;
......@@ -396,33 +397,24 @@ typedef struct {
size_t compInfoSize;
int blockIter; // For write purpose
// Information in .data or .last file
SCompData *pCompData;
size_t compDataSize;
SDataCols *pDataCols[2];
// ---------- For read purpose
bool hasLast;
int newBlocks;
SCompIdx *pWCompIdx;
size_t wCompIdxSize;
SCompInfo *pWCompInfo;
size_t wCompInfoSize;
SCompData *pWCompData;
size_t wCompDataSize;
// Compression buffer
void * cBuffer;
size_t cBufSize;
} SRWHelper;
// --------- Helper state
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
#define TSDB_HELPER_FILE_SET 0x1 // File is set
#define TSDB_HELPER_FILE_OPEN 0x2 // File is opened
#define TSDB_HELPER_IDX_LOAD 0x4 // SCompIdx part is loaded
#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded
#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
#define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set
#define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded
#define TSDB_HELPER_TABLE_SET 0x4 // Table is set
#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded
#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded
#define TSDB_HELPER_TYPE(h) ((h)->config.type)
......@@ -435,8 +427,7 @@ void tsdbDestroyHelper(SRWHelper *pHelper);
void tsdbClearHelper(SRWHelper *pHelper);
// --------- For set operations
int tsdbSetHelperFile(SRWHelper *pHelper, SFileGroup *pGroup);
int tsdbOpenHelperFile(SRWHelper *pHelper);
int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup);
void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema);
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError);
......
......@@ -889,23 +889,20 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
tsdbGetDataDirName(pRepo, dataDir);
if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) goto _err;
// Set the file to write/read
tsdbSetHelperFile(pHelper, pGroup);
// Open files for write/read
if (tsdbOpenHelperFile(pHelper) < 0) goto _err;
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) goto _err;
// Loop to commit data in each table
for (int tid = 0; tid < pCfg->maxTables; tid++) {
STable * pTable = pMeta->tables[tid];
SSkipListIterator *pIter = iters[tid];
// Set the helper and the buffer dataCols object to help to write this table
SHelperTable hTable = {.uid = pTable->tableId.uid, .tid = pTable->tableId.tid, .sversion = pTable->sversion};
tsdbSetHelperTable(pHelper, &hTable, tsdbGetTableSchema(pMeta, pTable));
tdInitDataCols(pDataCols, tsdbGetTableSchema(pMeta, pTable));
// Loop to write the data in the cache to files, if no data to write, just break
// the loop
// Loop to write the data in the cache to files. If no data to write, just break the loop
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
while (true) {
int rowsRead = tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pDataCols);
......@@ -939,6 +936,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
return 0;
_err:
ASSERT(false);
tsdbCloseHelperFile(pHelper, 1);
return -1;
}
......
......@@ -35,7 +35,7 @@ static void tsdbClearHelperRead(SRWHelper *pHelper);
static void tsdbClearHelperWrite(SRWHelper *pHelper);
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper);
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock,
bool isLast);
bool isLast, bool isSuperBlock);
static int compareKeyBlock(const void *arg1, const void *arg2);
static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols);
static int nRowsLEThan(SDataCols *pDataCols, int maxKey);
......@@ -78,17 +78,22 @@ void tsdbClearHelper(SRWHelper *pHelper) {
tsdbClearHelperWrite(pHelper);
}
int tsdbSetHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
// TODO: reset the helper object
int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
ASSERT(pHelper != NULL && pGroup != NULL);
pHelper->files.fid = pGroup->fileId;
// Clear the helper object
tsdbClearHelper(pHelper);
ASSERT(pHelper->state == TSDB_HELPER_CLEAR_STATE);
// Set the files
pHelper->files.fid = pGroup->fileId;
pHelper->files.headF = pGroup->files[TSDB_FILE_TYPE_HEAD];
pHelper->files.dataF = pGroup->files[TSDB_FILE_TYPE_DATA];
pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST];
if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) {
char *fnameDup = strdup(pHelper->files.headF.fname);
if (fnameDup == NULL) goto _err;
if (fnameDup == NULL) return -1;
char *dataDir = dirname(fnameDup);
......@@ -96,32 +101,33 @@ int tsdbSetHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
tsdbGetFileName(dataDir, pHelper->files.fid, ".l", pHelper->files.nLastF.fname);
free((void *)fnameDup);
}
return 0;
}
int tsdbOpenHelperFile(SRWHelper *pHelper) {
// TODO: check if the file is set
{}
// Open the files
if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err;
if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) {
if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err;
if (tsdbOpenFile(&(pHelper->files.dataF), O_RDWR) < 0) goto _err;
if (tsdbOpenFile(&(pHelper->files.lastF), O_RDWR) < 0) goto _err;
// TODO: need to write head and compIdx part
// Create and open .h
if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) goto _err;
size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pHelper->config.maxTables;
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, tsize) < tsize) goto _err;
// Create and open .l file if should
if (tsdbShouldCreateNewLast(pHelper)) {
if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err;
if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) goto _err;
}
} else {
if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err;
if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err;
if (tsdbOpenFile(&(pHelper->files.lastF), O_RDONLY) < 0) goto _err;
}
return 0;
helperSetState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN);
_err:
tsdbCloseHelperFile(pHelper, true);
return tsdbLoadCompIdx(pHelper, NULL);
_err:
return -1;
}
......@@ -153,31 +159,44 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
}
void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema) {
// TODO: check if it is available to set the table
ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN));
// Clear members and state used by previous table
pHelper->blockIter = 0;
pHelper->state &= (TSDB_HELPER_TABLE_SET - 1);
pHelper->tableInfo = *pHelperTable;
// TODO: Set the pDataCols according to schema
tdInitDataCols(pHelper->pDataCols[0], pSchema);
tdInitDataCols(pHelper->pDataCols[1], pSchema);
pHelper->compIdx = pHelper->pCompIdx[pHelper->tableInfo.tid];
// TODO: set state
helperSetState(pHelper, TSDB_HELPER_TABLE_SET);
}
/**
* Write part of of points from pDataCols to file
*
* @return: number of points written to file successfully
* -1 for failure
*/
int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET) && helperHasState(pHelper, TSDB_HELPER_FILE_OPEN));
SCompBlock compBlock;
int rowsToWrite = 0;
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
// Load SCompIdx part if not loaded yet
if ((!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) && (tsdbLoadCompIdx(pHelper, NULL) < 0)) goto _err;
ASSERT(helperHasState(pHelper, TSDB_HELPER_IDX_LOAD));
SCompIdx curIdx = pHelper->compIdx; // old table SCompIdx for sendfile usage
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; // for change purpose
// Load the SCompInfo part if neccessary
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
if ((pIdx->offset > 0) && (pIdx->hasLast || dataColsKeyFirst(pDataCols) <= pIdx->maxKey)) {
if (tsdbLoadCompInfo(pHelper, NULL) < 0) goto _err;
}
SCompIdx *pWIdx = pHelper->pWCompIdx + pHelper->tableInfo.tid;
ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));
if ((!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) &&
((pIdx->offset > 0) && (pIdx->hasLast || dataColsKeyFirst(pDataCols) <= pIdx->maxKey)) &&
(tsdbLoadCompInfo(pHelper, NULL) < 0))
goto _err;
if (!pIdx->hasLast && keyFirst > pIdx->maxKey) {
// Just need to append as a super block
......@@ -189,10 +208,10 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
pWFile = &(pHelper->files.dataF);
} else {
isLast = true;
pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.nLastF);
pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
}
if (tsdbWriteBlockToFile(pHelper, pWFile, pDataCols, rowsToWrite, &compBlock, isLast) < 0) goto _err;
if (tsdbWriteBlockToFile(pHelper, pWFile, pDataCols, rowsToWrite, &compBlock, isLast, true) < 0) goto _err;
// TODO: may need to reallocate the memory
pHelper->pCompInfo->blocks[pHelper->blockIter++] = compBlock;
......@@ -259,39 +278,44 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
}
int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
// TODO: check helper state
ASSERT(!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD));
ASSERT(pHelper->state = TSDB_HELPER_FILE_SET_AND_OPEN);
int fd = pHelper->files.headF.fd;
if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) {
// If not load from file, just load it in object
int fd = pHelper->files.headF.fd;
if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
if (tread(fd, pHelper->pCompIdx, pHelper->compIdxSize) < pHelper->compIdxSize) return -1;
// TODO: check the checksum
if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
if (tread(fd, (void *)(pHelper->pCompIdx), pHelper->compIdxSize) < pHelper->compIdxSize) return -1;
// TODO: check the correctness of the part
}
helperSetState(pHelper, TSDB_HELPER_IDX_LOAD);
// Copy the memory for outside usage
if (target) memcpy(target, pHelper->pCompIdx, pHelper->compIdxSize);
helperSetState(pHelper, TSDB_HELPER_IDX_LOAD);
return 0;
}
int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));
SCompIdx curCompIdx = pHelper->compIdx;
ASSERT(pIdx->offset > 0);
ASSERT(curCompIdx.offset > 0 && curCompIdx.len > 0);
int fd = pHelper->files.headF.fd;
if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1;
ASSERT(pIdx->len > 0);
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
if (lseek(fd, curCompIdx.offset, SEEK_SET) < 0) return -1;
adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, pIdx->len);
if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < 0) return -1;
// TODO: check the checksum
adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, curCompIdx.len);
if (tread(fd, (void *)(pHelper->pCompInfo), pHelper) < 0) return -1;
// TODO: check the checksum
// TODO: think about when target has no space for the content
if (target) memcpy(target, (void *)(pHelper->pCompInfo), pIdx->len);
helperSetState(pHelper, TSDB_HELPER_INFO_LOAD);
}
helperSetState(pHelper, TSDB_HELPER_INFO_LOAD);
if (target) memcpy(target, (void *)(pHelper->pCompInfo), curCompIdx.len);
return 0;
}
......@@ -376,21 +400,21 @@ static void tsdbDestroyHelperRead(SRWHelper *pHelper) {
static int tsdbInitHelperWrite(SRWHelper *pHelper) {
SHelperCfg *pCfg = &(pHelper->config);
pHelper->wCompIdxSize = pCfg->maxTables * sizeof(SCompIdx);
if ((pHelper->pWCompIdx = (SCompIdx *)malloc(pHelper->wCompIdxSize)) == NULL) return -1;
// pHelper->wCompIdxSize = pCfg->maxTables * sizeof(SCompIdx);
// if ((pHelper->pWCompIdx = (SCompIdx *)malloc(pHelper->wCompIdxSize)) == NULL) return -1;
return 0;
}
static void tsdbDestroyHelperWrite(SRWHelper *pHelper) {
tfree(pHelper->pWCompIdx);
pHelper->wCompIdxSize = 0;
// tfree(pHelper->pWCompIdx);
// pHelper->wCompIdxSize = 0;
tfree(pHelper->pWCompInfo);
pHelper->wCompInfoSize = 0;
// tfree(pHelper->pWCompInfo);
// pHelper->wCompInfoSize = 0;
tfree(pHelper->pWCompData);
pHelper->wCompDataSize = 0;
// tfree(pHelper->pWCompData);
// pHelper->wCompDataSize = 0;
}
static void tsdbClearHelperRead(SRWHelper *pHelper) {
......@@ -407,17 +431,21 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
}
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock,
bool isLast) {
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints);
bool isLast, bool isSuperBlock) {
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints &&
rowsToWrite <= pHelper->config.maxRowsPerFileBlock);
SCompData *pCompData = NULL;
int64_t offset = 0;
int64_t offset = lseek(pFile->fd, 0, SEEK_END);
offset = lseek(pFile->fd, 0, SEEK_END);
if (offset < 0) goto _err;
SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pDataCols->numOfCols);
pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pDataCols->numOfCols);
if (pCompData == NULL) goto _err;
int nColsNotAllNull = 0;
int32_t toffset;
int32_t toffset = 0;
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
SDataCol *pDataCol = pDataCols->cols + ncol;
SCompCol *pCompCol = pCompData->cols + nColsNotAllNull;
......@@ -427,481 +455,59 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
continue;
}
// Compress the data here
{}
pCompCol->colId = pDataCol->colId;
pCompCol->type = pDataCol->type;
pCompCol->len = pDataCol->len;
pCompCol->len = TYPE_BYTES[pCompCol->type] * rowsToWrite; // TODO: change it
pCompCol->offset = toffset;
nColsNotAllNull++;
toffset += pCompCol->len;
}
ASSERT(nColsNotAllNull > 0);
pCompData->delimiter = TSDB_FILE_DELIMITER;
pCompData->uid = pHelper->tableInfo.uid;
pCompData->numOfCols = nColsNotAllNull;
size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * nColsNotAllNull;
if (twrite(pFile->fd, (void *)pCompData, tsize) < tsize) goto _err;
for (int i = 0; i < pDataCols->numOfCols; i++) {
SDataCol *pDataCol = pCompData->cols + i;
SCompCol *pCompCol = NULL;
if (twrite(pFile->fd, (void *)(pDataCol->pData), pCompCol->len) < pCompCol->len) goto _err;
int nCompCol = 0;
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
ASSERT(nCompCol < nColsNotAllNull);
SDataCol *pDataCol = pDataCols->cols + ncol;
SCompCol *pCompCol = pCompData->cols + nCompCol;
if (pDataCol->colId == pCompCol->colId) {
if (twrite(pFile->fd, (void *)(pDataCol->pData), pCompCol->len) < pCompCol->len) goto _err;
tsize += pCompCol->len;
nCompCol++;
}
}
pCompBlock->last = isLast;
pCompBlock->offset = offset;
// pCOmpBlock->algorithm = ;
pCompBlock->algorithm = 2; // TODO
pCompBlock->numOfPoints = rowsToWrite;
pCompBlock->sversion = pHelper->tableInfo.sversion;
// pCompBlock->len = ;
// pCompBlock->numOfSubBlocks = ;
pCompBlock->len = (int32_t)tsize;
pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0;
pCompBlock->numOfCols = nColsNotAllNull;
// pCompBlock->keyFirst = ;
// pCompBlock->keyLast = ;
pCompBlock->keyFirst = dataColsKeyFirst(pDataCols);
pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1);
tfree(pCompData);
return 0;
_err:
tfree(pCompData);
return -1;
}
// static int compareKeyBlock(const void *arg1, const void *arg2);
// /**
// * Init a read-write helper object for read or write usage.
// */
// int tsdbInitHelper(SRWHelper *pHelper, int maxTables, tsdb_rwhelper_t type, int maxRowSize, int maxRows,
// int maxCols) {
// if (pHelper == NULL) return -1;
// memset((void *)pHelper, 0, sizeof(SRWHelper));
// for (int ftype = TSDB_RW_HEADF; ftype <= TSDB_RW_LF; ftype++) {
// pHelper->files[ftype] = -1;
// }
// // Set type
// pHelper->type = type;
// // Set global configuration
// pHelper->maxTables = maxTables;
// pHelper->maxRowSize = maxRowSize;
// pHelper->maxRows = maxRows;
// pHelper->maxCols = maxCols;
// // Allocate SCompIdx part memory
// pHelper->compIdxSize = sizeof(SCompIdx) * maxTables;
// pHelper->pCompIdx = (SCompIdx *)malloc(pHelper->compIdxSize);
// if (pHelper->pCompIdx == NULL) goto _err;
// pHelper->compDataSize = sizeof(SCompData) + sizeof(SCompCol) * maxCols;
// pHelper->pCompData = (SCompData *)malloc(pHelper->compDataSize);
// pHelper->pDataCols = tdNewDataCols(maxRowSize, maxCols, maxRows);
// if (pHelper->pDataCols == NULL) goto _err;
// return 0;
// _err:
// tsdbDestroyHelper(pHelper);
// return -1;
// }
// void tsdbResetHelper(SRWHelper *pHelper) {
// if (pHelper->headF.fd > 0) {
// close(pHelper->headF.fd);
// pHelper->headF.fd = -1;
// }
// if (pHelper->dataF.fd > 0) {
// close(pHelper->dataF.fd);
// pHelper->dataF.fd = -1;
// }
// if (pHelper->lastF.fd > 0) {
// close(pHelper->lastF.fd);
// pHelper->lastF.fd = -1;
// }
// if (pHelper->hF.fd > 0) {
// close(pHelper->hF.fd);
// pHelper->hF.fd = -1;
// }
// if (pHelper->lF.fd > 0) {
// close(pHelper->lF.fd);
// pHelper->lF.fd = -1;
// }
// pHelper->state = 0;
// tdResetDataCols(pHelper->pDataCols);
// }
// int tsdbDestroyHelper(SRWHelper *pHelper) {
// if (pHelper->headF.fd > 0) close(pHelper->headF.fd);
// if (pHelper->dataF.fd > 0) close(pHelper->dataF.fd);
// if (pHelper->lastF.fd > 0) close(pHelper->lastF.fd);
// if (pHelper->hF.fd > 0) close(pHelper->hF.fd);
// if (pHelper->lF.fd > 0) close(pHelper->lF.fd);
// if (pHelper->pCompIdx) free(pHelper->pCompIdx);
// if (pHelper->pCompInfo) free(pHelper->pCompInfo);
// if (pHelper->pCompData) free(pHelper->pCompData);
// memset((void *)pHelper, 0, sizeof(SRWHelper));
// return 0;
// }
// int tsdbSetHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
// if (pHelper->state != 0) return -1;
// pHelper->fid = pGroup->fileId;
// pHelper->headF = pGroup->files[TSDB_FILE_TYPE_HEAD];
// pHelper->headF.fd = -1;
// pHelper->dataF = pGroup->files[TSDB_FILE_TYPE_DATA];
// pHelper->dataF.fd = -1;
// pHelper->lastF = pGroup->files[TSDB_FILE_TYPE_LAST];
// pHelper->lastF.fd = -1;
// if (pHelper->mode == TSDB_WRITE_HELPER) {
// char *fnameCpy = strdup(pHelper->headF.fname);
// if (fnameCpy == NULL) return -1;
// char *dataDir = dirname(fnameCpy);
// memset((void *)(&pHelper->hF), 0, sizeof(SFile));
// memset((void *)(&pHelper->lF), 0, sizeof(SFile));
// pHelper->hF.fd = -1;
// pHelper->lF.fd = -1;
// tsdbGetFileName(dataDir, pHelper->fid, ".h", pHelper->hF.fname);
// tsdbGetFileName(dataDir, pHelper->fid, ".l", pHelper->lF.fname);
// free((char *)fnameCpy);
// }
// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_FILE_SET);
// return 0;
// }
// static int tsdbNeedToCreateNewLastFile() {
// // TODO
// return 0;
// }
// int tsdbCloseHelperFile(SRWHelper *pHelper, int hasErr) {
// int ret = 0;
// if (pHelper->headF.fd > 0) {
// close(pHelper->headF.fd);
// pHelper->headF.fd = -1;
// }
// if (pHelper->dataF.fd > 0) {
// close(pHelper->dataF.fd);
// pHelper->dataF.fd = -1;
// }
// if (pHelper->lastF.fd > 0) {
// close(pHelper->lastF.fd);
// pHelper->lastF.fd = -1;
// }
// if (pHelper->hF.fd > 0) {
// close(pHelper->hF.fd);
// pHelper->hF.fd = -1;
// if (hasErr) remove(pHelper->hF.fname);
// }
// if (pHelper->lF.fd > 0) {
// close(pHelper->lF.fd);
// pHelper->lF.fd = -1;
// if (hasErr) remove(pHelper->hF.fname);
// }
// return 0;
// }
// int tsdbOpenHelperFile(SRWHelper *pHelper) {
// if (pHelper->state != TSDB_RWHELPER_FILE_SET) return -1;
// if (pHelper->mode == TSDB_READ_HELPER) { // The read helper
// if (tsdbOpenFile(&pHelper->headF, O_RDONLY) < 0) goto _err;
// if (tsdbOpenFile(&pHelper->dataF, O_RDONLY) < 0) goto _err;
// if (tsdbOpenFile(&pHelper->lastF, O_RDONLY) < 0) goto _err;
// } else {
// if (tsdbOpenFile(&pHelper->headF, O_RDONLY) < 0) goto _err;
// if (tsdbOpenFile(&pHelper->dataF, O_RDWR) < 0) goto _err;
// if (tsdbOpenFile(&pHelper->lastF, O_RDWR) < 0) goto _err;
// // Open .h and .l file
// if (tsdbOpenFile(&pHelper->hF, O_WRONLY | O_CREAT) < 0) goto _err;
// if (tsdbNeedToCreateNewLastFile()) {
// if (tsdbOpenFile(&pHelper->lF, O_WRONLY | O_CREAT) < 0) goto _err;
// }
// }
// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_FILE_OPENED);
// return 0;
// _err:
// tsdbCloseHelperFile(pHelper, 1);
// return -1;
// }
// int tsdbLoadCompIdx(SRWHelper *pHelper) {
// if (pHelper->state != (TSDB_RWHELPER_FILE_SET | TSDB_RWHELPER_FILE_OPENED)) return -1;
// if (lseek(pHelper->headF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
// if (tread(pHelper->headF.fd, (void *)(pHelper->pCompIdx), pHelper->compIdxSize) < pHelper->compIdxSize) return -1;
// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_COMPIDX_LOADED);
// return 0;
// }
// int tsdbSetHelperTable(SRWHelper *pHelper, int32_t tid, int64_t uid, STSchema *pSchema) {
// // TODO: add some check information
// pHelper->tid = tid;
// pHelper->uid = uid;
// tdInitDataCols(pHelper->pDataCols, pSchema);
// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_TABLE_SET);
// return 0;
// }
// int tsdbLoadCompBlocks(SRWHelper *pHelper) {
// SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tid;
// if (pIdx->offset <= 0) return 0;
// if (lseek(pHelper->headF.fd, pIdx->offset, SEEK_SET) < 0) return -1;
// if (pHelper->compInfoSize < pIdx->len) {
// pHelper->pCompInfo = (SCompInfo *)realloc((void *)(pHelper->pCompInfo), pIdx->len);
// if (pHelper->pCompInfo == NULL) return -1;
// pHelper->compInfoSize = pIdx->len;
// }
// if (tread(pHelper->headF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_COMPBLOCK_LOADED);
// return 0;
// }
// int tsdbRWHelperSetBlockIdx(SRWHelper *pHelper, int blkIdx) {
// SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tid;
// if (blkIdx > pIdx->numOfSuperBlocks) return -1;
// pHelper->blkIdx = blkIdx;
// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_BLOCKIDX_SET);
// return 0;
// }
// int tsdbRWHelperLoadCompData(SRWHelper *pHelper) {
// SCompBlock *pBlock = pHelper->pCompInfo->blocks + pHelper->blkIdx;
// if (pBlock->numOfSubBlocks == 1) { // Only one super block
// size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols;
// if (size > pHelper->compDataSize) {
// pHelper->pCompData = (SCompData *)realloc((void *)pHelper->pCompData, size);
// if (pHelper->pCompData == NULL) return -1;
// pHelper->compDataSize = size;
// }
// if (lseek(pHelper->dataF.fd, pBlock->offset, SEEK_SET) < 0) return -1;
// if (tread(pHelper->dataF.fd, (void *)(pHelper->pCompData), size) < size) return -1;
// } else { // TODO: More sub blocks
// }
// TSDB_SET_RWHELPER_STATE(pHelper, TSDB_RWHELPER_COMPCOL_LOADED);
// return 0;
// }
// static int compColIdCompCol(const void *arg1, const void *arg2) {
// int colId = *(int *)arg1;
// SCompCol *pCompCol = (SCompCol *)arg2;
// return (int)(colId - pCompCol->colId);
// }
// static int compColIdDataCol(const void *arg1, const void *arg2) {
// int colId = *(int *)arg1;
// SDataCol *pDataCol = (SDataCol *)arg2;
// return (int)(colId - pDataCol->colId);
// }
// int tsdbRWHelperLoadColData(SRWHelper *pHelper, int colId) {
// SCompBlock *pBlock = pHelper->pCompInfo->blocks + pHelper->blkIdx;
// if (pBlock->numOfSubBlocks == 1) { // only one super block
// SCompCol *pCompCol = bsearch((void *)(&colId), (void *)(pHelper->pCompData->cols), pBlock->numOfCols, compColIdCompCol, compColIdCompCol);
// if (pCompCol == NULL) return 0; // No data to read from this block , but we still return 0
// SDataCol *pDataCol = bsearch((void *)(&colId), (void *)(pHelper->pDataCols->cols), pHelper->pDataCols->numOfCols, sizeof(SDataCol), compColIdDataCol);
// assert(pDataCol != NULL);
// int fd = (pBlock->last) ? pHelper->lastF.fd : pHelper->dataF.fd;
// if (lseek(fd, pBlock->offset + pCompCol->offset, SEEK_SET) < 0) return -1;
// if (tread(fd, (void *)pDataCol->pData, pCompCol->len) < pCompCol->len) return -1;
// pDataCol->len = pCompCol->len;
// } else {
// // TODO: more than 1 blocks
// }
// return 0;
// }
// int tsdbRWHelperLoadBlockData(SRWHelper *pHelper, int blkIdx) {
// SCompBlock *pBlock = pHelper->pCompInfo->blocks + pHelper->blkIdx;
// if (pBlock->numOfSubBlocks == 1) {
// for (int i = 0; i < pHelper->pDataCols->numOfCols; i++) {
// if (tsdbRWHelperLoadBlockData(pHelper, pHelper->pDataCols->cols[i].colId) < 0) return -1;
// }
// } else {
// // TODO: more than 1 block of data
// }
// return 0;
// }
// int tsdbRWHelperCopyCompBlockPart(SRWHelper *pHelper) {
// // TODO
// return 0;
// }
// int tsdbRWHelperCopyDataBlockPart(SRWHelper *pHelper ) {
// // TODO
// return 0;
// }
// int tsdbRWHelperWriteCompIdx(SRWHelper *pHelper) {
// // TODO
// if (lseek(pHelper->hF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
// if (twrite(pHelper->hF.fd, (void *)(pHelper->pCompIdx), pHelper->compIdxSize) < pHelper->compIdxSize) return -1;
// return 0;
// }
// /**
// * Load the data block from file
// *
// * @return 0 for success
// * -1 for failure
// */
// int tsdbLoadDataBlock(SRWHelper *pHelper, int bldIdx) {
// SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tid;
// if (pIdx->)
// return 0;
// }
// /**
// * Append the block to a file, either .data
// */
// int tsdbAppendBlockToFile(SRWHelper *pHelper, tsdb_rw_file_t toFile, SDataCols *pDataCols, SCompBlock *pCompBlock, bool isSuper) {
// SFile *pFile = pHelper->files + toFile;
// int64_t offset = lseek(pFile->fd, 0, SEEK_END);
// if (*offset < 0) return -1;
// SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pDataCols->numOfCols);
// if (pCompData == NULL) return -1;
// int numOfNotAllNullCols = 0;
// int32_t toffset = 0;
// for (int i = 0; i < pDataCols->numOfCols; i++) {
// SDataCol *pDataCol = pDataCols->cols + i;
// SCompCol *pCompCol = pCompData->cols + numOfNotAllNullCols;
// if (0 /* All data in this column are NULL value */) {
// continue;
// }
// pCompCol->colId = pDataCol->colId;
// pCompCol->type = pDataCol->type;
// pCompCol->len = pDataCol->len;
// // pCompCol->offset = toffset;
// numOfNotAllNullCols++;
// // toffset += pDataCol->len;
// }
// pCompData->delimiter = TSDB_FILE_DELIMITER;
// pCompData->numOfCols = numOfNotAllNullCols;
// pCompData->uid = pHelper->uid;
// size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * numOfNotAllNullCols;
// if (twrite(pFile->fd, (void *)pCompData, tsize) < 0) return -1;
// for (int i = 0; i < numOfNotAllNullCols; i++) {
// SCompCol *pCompCol = pCompData->cols + i;
// SDataCol *pDataCol = NULL; // bsearch()
// tassert(pDataCol != NULL);
// if (twrite(pFile->fd, (void *)(pDataCol->pData), pDataCol->len) < pDataCol->len) return -1;
// }
// pCompBlock->last = (toFile == TSDB_RW_DATAF) ? 0 : 1;
// pCompBlock->offset = offset;
// pCompBlock->algorithm = pHelper->compression;
// pCompBlock->numOfPoints = pDataCols->numOfPoints;
// pCompBlock->sversion = pHelper->sversion;
// // pCompBlock->len = ;
// pCompBlock->numOfSubBlocks = isSuper ? 1 : 0;
// pCompBlock->numOfCols = numOfNotAllNullCols;
// pCompBlock->keyFirst = dataColsKeyFirst(pDataCols);
// pCompBlock->keyLast = dataColsKeyLast(pDataCols);
// return 0;
// }
// /**
// * Write the whole or part of the cached data block to file.
// *
// * There are four options:
// * 1. Append the whole block as a SUPER-BLOCK at the end
// * 2. Append part/whole block as a SUPER-BLOCK and insert in the middle
// * 3. Append part/whole block as a SUB-BLOCK
// * 4. Merge part/whole block as a SUPER-BLOCK
// */
// int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
// tassert(pHelper->type == TSDB_WRITE_HELPER);
// int rowsWritten = 0;
// SCompBlock compBlock;
// SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tid;
// // if ((no old data) OR (no last block AND cached first key is larger than the max key))
// if ((pIdx->offset == 0) || (pIdx->hasLast && dataColsKeyFirst(pDataCols) > pIdx->maxKey)) {
// // Append the whole block as a SUPER-BLOCK at the end
// if (pDataCols->numOfPoints >= pHelper->minRowPerFileBlock) {
// if (tsdbAppendBlockToFile(pHelper, TSDB_RW_DATAF, pDataCols, &compBlock, true) < 0) goto _err;
// } else {
// tsdb_rw_file_t ftype = (pHelper->files[TSDB_RW_LF].fd > 0) ? TSDB_RW_LF : TSDB_RW_LASTF;
// if (tsdbAppendBlockToFile(pHelper, ftype, pDataCols, &compBlock, true) < 0) goto _err;
// }
// // Copy the compBlock part to the end
// if (IS_COMPBLOCK_LOADED(pHelper)) {
// } else {
// }
// pIdx->hasLast = compBlock.last;
// pIdx->len += sizeof(compBlock);
// pIdx->numOfSuperBlocks++;
// pIdx->maxKey = compBlock.keyLast;
// rowsWritten = pDataCols->numOfPoints;
// } else {
// // Need to find a block to merge with
// int blkIdx = 0;
// // if (has last block AND cached Key is larger than the max Key)
// if (pIdx->hasLast && dataColsKeyFirst(pDataCols) > pIdx->maxKey) {
// blkIdx = pIdx->numOfSuperBlocks - 1;
// rowsWritten = tsdbMergeDataWithBlock(pHelper, pDataCols, blkIdx);
// if (rowsWritten < 0) goto _err;
// } else {
// ASSERT(IS_COMPBLOCK_LOADED(pHelper));
// // SCompBlock *pMergeBlock = taosbsearch();
// }
// }
// return numOfPointsWritten;
// _err:
// return -1;
// }
static int compareKeyBlock(const void *arg1, const void *arg2) {
TSKEY key = *(TSKEY *)arg1;
SCompBlock *pBlock = (SCompBlock *)arg2;
......@@ -947,7 +553,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
pFile = &(pHelper->files.lastF);
}
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rowsCanMerge, &compBlock, pCompBlock->last) < 0) goto _err;
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, rowsCanMerge, &compBlock, pCompBlock->last, false) < 0) goto _err;
// TODO: Add the sub-block
if (pCompBlock->numOfSubBlocks == 1) {
......@@ -985,7 +591,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
}
}
if (tsdbWriteBlockToFile(pHelper, pFile, pHelper->pDataCols[0], pCompBlock->numOfPoints + rowsCanMerge, &compBlock, isLast) < 0) goto _err;
if (tsdbWriteBlockToFile(pHelper, pFile, pHelper->pDataCols[0], pCompBlock->numOfPoints + rowsCanMerge, &compBlock, isLast, true) < 0) goto _err;
*pCompBlock = compBlock;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册