提交 d4e84b4f 编写于 作者: H Hongze Cheng

TD-353

上级 7ca58999
......@@ -115,15 +115,18 @@ typedef struct {
typedef enum { TSDB_FILE_TYPE_HEAD = 0, TSDB_FILE_TYPE_DATA, TSDB_FILE_TYPE_LAST, TSDB_FILE_TYPE_MAX } TSDB_FILE_TYPE;
typedef struct {
uint32_t offset;
uint32_t len;
uint64_t size; // total size of the file
uint64_t tombSize; // unused file size
uint32_t totalBlocks;
uint32_t totalSubBlocks;
} STsdbFileInfo;
typedef struct {
char* fname;
int fd;
uint64_t size;
uint64_t tombSize;
uint64_t totalBlocks;
uint64_t totalSubBlocks;
char* fname;
int fd;
STsdbFileInfo info;
} SFile;
typedef struct {
......@@ -201,7 +204,7 @@ typedef struct {
typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t;
typedef struct {
int fid;
int fid;
TSKEY minKey;
TSKEY maxKey;
// For read/write purpose
......@@ -232,7 +235,7 @@ typedef struct {
SCompInfo* pCompInfo;
bool hasOldLastBlock;
// For block set usage
SCompData* pCompData[TSDB_MAX_SUBBLOCKS];
SCompData* pCompData;
SDataCols* pDataCols[2];
void* pBuffer; // Buffer to hold the whole data block
void* compBuffer; // Buffer for temperary compress/decompress purpose
......@@ -313,9 +316,12 @@ void tsdbFreeFileH(STsdbFileH* pFileH);
#define TSDB_HELPER_TABLE_SET 0x4 // Table is set
#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded
#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded
#define helperSetState(h, s) (((h)->state) |= (s))
#define helperClearState(h, s) ((h)->state &= (~(s)))
#define helperHasState(h, s) ((((h)->state) & (s)) == (s))
#define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx)
#define TSDB_MAX_SUBBLOCKS 8
#define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0)
#define helperType(h) (h)->type
#define helperRepo(h) (h)->pRepo
#define helperState(h) (h)->state
......@@ -333,12 +339,6 @@ void* tsdbCommitData(void* arg);
// --------- Helper state
#define TSDB_HELPER_TYPE(h) ((h)->config.type)
#define helperSetState(h, s) (((h)->state) |= (s))
#define helperClearState(h, s) ((h)->state &= (~(s)))
#define helperHasState(h, s) ((((h)->state) & (s)) == (s))
#define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx)
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo);
int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo);
......
......@@ -71,7 +71,10 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST];
if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) {
char *fnameDup = strdup(pHelper->files.headF.fname);
if (fnameDup == NULL) return -1;
if (fnameDup == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
char *dataDir = dirname(fnameDup);
tsdbGetFileName(dataDir, pHelper->files.fid, ".h", pHelper->files.nHeadF.fname);
......@@ -81,21 +84,28 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
// Open the files
if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err;
if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) {
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
if (tsdbOpenFile(&(pHelper->files.dataF), O_RDWR) < 0) goto _err;
if (tsdbOpenFile(&(pHelper->files.lastF), O_RDWR) < 0) goto _err;
// Create and open .h
if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) return -1;
// size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM);
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE)
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
TSDB_FILE_HEAD_SIZE, hpHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno));
errno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// Create and open .l file if should
if (tsdbShouldCreateNewLast(pHelper)) {
if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err;
if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE)
goto _err;
tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
TSDB_FILE_HEAD_SIZE, hpHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
} else {
if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err;
......@@ -164,7 +174,7 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
pHelper->tableInfo.tid = pTable->tableId.tid;
pHelper->tableInfo.uid = pTable->tableId.uid;
STSchema *pSchema = tsdbGetTableSchema(pRepo->tsdbMeta, pTable);
STSchema *pSchema = tsdbGetTableSchema(pTable);
pHelper->tableInfo.sversion = schemaVersion(pSchema);
tdInitDataCols(pHelper->pDataCols[0], pSchema);
......@@ -354,28 +364,45 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
if (pFile->info.offset > 0) {
ASSERT(pFile->info.offset > TSDB_FILE_HEAD_SIZE);
if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) return -1;
if ((pHelper->pBuffer = trealloc(pHelper->pBuffer, pFile->info.len)) == NULL) return -1;
if (tread(fd, (void *)(pHelper->pBuffer), pFile->info.len) < pFile->info.len) return -1;
if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s to %u since %s", REPO_ID(pHelper->pRepo), pFile->fname,
pFile->info.offset, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if ((pHelper->pBuffer = trealloc(pHelper->pBuffer, pFile->info.len)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (tread(fd, (void *)(pHelper->pBuffer), pFile->info.len) < pFile->info.len) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) {
// TODO: File is broken, try to deal with it
tsdbError("vgId:%d file %s SCompIdx part is corrupted. offset %u len %u", REPO_ID(pHelper->pRepo), pFile->fname,
pFile->info.offset, pFile->info.len);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
// Decode it
void *ptr = pHelper->pBuffer;
while (((char *)ptr - (char *)pHelper->pBuffer) < (pFile->info.len - sizeof(TSCKSUM))) {
while (POINTER_DISTANCE(ptr, pHelper->pBuffer) < (pFile->info.len - sizeof(TSCKSUM))) {
uint32_t tid = 0;
if ((ptr = taosDecodeVariantU32(ptr, &tid)) == NULL) return -1;
ASSERT(tid > 0 && tid < pHelper->config.maxTables);
if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1;
ASSERT((char *)ptr - (char *)pHelper->pBuffer <= pFile->info.len - sizeof(TSCKSUM));
ASSERT(POINTER_DISTANCE(ptr, pHelper->pBuffer) <= pFile->info.len - sizeof(TSCKSUM));
}
ASSERT(((char *)ptr - (char *)pHelper->pBuffer) == (pFile->info.len - sizeof(TSCKSUM)));
if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
}
helperSetState(pHelper, TSDB_HELPER_IDX_LOAD);
......@@ -507,8 +534,16 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
if (lseek(pFile->fd, 0, SEEK_SET) < 0) return -1;
if (twrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) return -1;
if (lseek(pFile->fd, 0, SEEK_SET) < 0) {
tsdbError("failed to lseek file %s since %s", pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (twrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
......@@ -851,6 +886,9 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst);
}
tsdbTrace("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
blkIdx);
return 0;
_err:
......@@ -933,6 +971,8 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
tsdbTrace("vgId:%d tid:%d a subblock is added at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx);
return 0;
_err:
......@@ -971,6 +1011,9 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
tsdbTrace("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
blkIdx);
return 0;
}
......@@ -1001,18 +1044,26 @@ static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey)
static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
memset((void *)&pHelper->files, 0, sizeof(pHelper->files));
pHelper->files.fid = -1;
tfree(pHelper->files.headF.fname);
pHelper->files.headF.fd = -1;
tfree(pHelper->files.dataF.fname);
pHelper->files.dataF.fd = -1;
tfree(pHelper->files.lastF.fname);
pHelper->files.lastF.fd = -1;
tfree(pHelper->files.nHeadF.fname);
pHelper->files.nHeadF.fd = -1;
tfree(pHelper->files.nLastF.fname);
pHelper->files.nLastF.fd = -1;
}
static int tsdbInitHelperFile(SRWHelper *pHelper) {
// pHelper->compIdxSize = sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM);
size_t tsize = sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM);
STsdbCfg *pCfg = &pHelper->pRepo->config;
size_t tsize = sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM);
pHelper->pCompIdx = (SCompIdx *)tmalloc(tsize);
if (pHelper->pCompIdx == NULL) return -1;
if (pHelper->pCompIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tsdbResetHelperFileImpl(pHelper);
return 0;
......@@ -1020,6 +1071,7 @@ static int tsdbInitHelperFile(SRWHelper *pHelper) {
static void tsdbDestroyHelperFile(SRWHelper *pHelper) {
tsdbCloseHelperFile(pHelper, false);
tsdbResetHelperFileImpl(pHelper);
tzfree(pHelper->pCompIdx);
}
......@@ -1051,9 +1103,16 @@ static void tsdbResetHelperBlock(SRWHelper *pHelper) {
}
static int tsdbInitHelperBlock(SRWHelper *pHelper) {
pHelper->pDataCols[0] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows);
pHelper->pDataCols[1] = tdNewDataCols(pHelper->config.maxRowSize, pHelper->config.maxCols, pHelper->config.maxRows);
if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) return -1;
STsdbRepo *pRepo = helperRepo(pHelper);
pHelper->pDataCols[0] =
tdNewDataCols(pRepo->imem->maxRowBytes, pRepo->imem->maxCols, pRepo->config.maxRowsPerFileBlock);
pHelper->pDataCols[1] =
tdNewDataCols(pRepo->imem->maxRowBytes, pRepo->imem->maxCols, pRepo->config.maxRowsPerFileBlock);
if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tsdbResetHelperBlockImpl(pHelper);
......@@ -1067,6 +1126,7 @@ static void tsdbDestroyHelperBlock(SRWHelper *pHelper) {
}
static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type) {
STsdbCfg *pCfg = &pRepo->config;
memset((void *)pHelper, 0, sizeof(*pHelper));
helperType(pHelper) = type;
......@@ -1083,9 +1143,12 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t
if (tsdbInitHelperBlock(pHelper) < 0) goto _err;
pHelper->pBuffer =
tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pHelper->config.maxCols +
pHelper->config.maxRowSize * pHelper->config.maxRowsPerFileBlock + sizeof(TSCKSUM));
if (pHelper->pBuffer == NULL) goto _err;
tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pRepo->imem->maxCols +
pRepo->imem->maxRowBytes * pCfg->maxRowsPerFileBlock + sizeof(TSCKSUM));
if (pHelper->pBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
return 0;
......@@ -1167,13 +1230,30 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
SCompData *pCompData = (SCompData *)pHelper->pBuffer;
int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) goto _err;
if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) goto _err;
SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);
int fd = pFile->fd;
if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d tid:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (tread(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompBlock->len,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
ASSERT(pCompData->numOfCols == pCompBlock->numOfCols);
int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM);
if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) goto _err;
if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) {
tsdbError("vgId:%d file %s block data is corrupted offset %" PRId64 " len %d", REPO_ID(pHelper->pRepo),
pFile->fname, pCompBlock->offset, pCompBlock->len);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
goto _err;
}
pDataCols->numOfRows = pCompBlock->numOfRows;
......@@ -1198,7 +1278,10 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
zsize += (sizeof(VarDataLenT) * pCompBlock->numOfRows);
}
pHelper->compBuffer = trealloc(pHelper->compBuffer, zsize);
if (pHelper->compBuffer == NULL) goto _err;
if (pHelper->compBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
}
if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len,
pCompBlock->algorithm, pCompBlock->numOfRows, pDataCols->maxPoints,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册