提交 520e97be 编写于 作者: H Hongze Cheng

fix maxtables problem

上级 e52fd48d
...@@ -465,7 +465,7 @@ typedef struct { ...@@ -465,7 +465,7 @@ typedef struct {
SCompData *pCompData; SCompData *pCompData;
SDataCols *pDataCols[2]; SDataCols *pDataCols[2];
void *blockBuffer; // Buffer to hold the whole data block void *pBuffer; // Buffer to hold the whole data block
void *compBuffer; // Buffer for temperary compress/decompress purpose void *compBuffer; // Buffer for temperary compress/decompress purpose
} SRWHelper; } SRWHelper;
...@@ -512,6 +512,7 @@ void tsdbFitRetention(STsdbRepo *pRepo); ...@@ -512,6 +512,7 @@ void tsdbFitRetention(STsdbRepo *pRepo);
int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
void tsdbAdjustCacheBlocks(STsdbCache *pCache); void tsdbAdjustCacheBlocks(STsdbCache *pCache);
int32_t tsdbGetMetaFileName(char *rootDir, char *fname); int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -37,7 +37,6 @@ const char *tsdbFileSuffix[] = { ...@@ -37,7 +37,6 @@ const char *tsdbFileSuffix[] = {
static int compFGroupKey(const void *key, const void *fgroup); static int compFGroupKey(const void *key, const void *fgroup);
static int compFGroup(const void *arg1, const void *arg2); static int compFGroup(const void *arg1, const void *arg2);
static int tsdbWriteFileHead(SFile *pFile);
static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid); static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid);
STsdbFileH *tsdbInitFileH(char *dataDir, STsdbCfg *pCfg) { STsdbFileH *tsdbInitFileH(char *dataDir, STsdbCfg *pCfg) {
...@@ -83,11 +82,21 @@ void tsdbCloseFileH(STsdbFileH *pFileH) { ...@@ -83,11 +82,21 @@ void tsdbCloseFileH(STsdbFileH *pFileH) {
} }
static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile) { static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile) {
uint32_t version;
char buf[512] = "\0";
tsdbGetFileName(dataDir, fid, suffix, pFile->fname); tsdbGetFileName(dataDir, fid, suffix, pFile->fname);
if (access(pFile->fname, F_OK|R_OK|W_OK) < 0) return -1; if (access(pFile->fname, F_OK|R_OK|W_OK) < 0) return -1;
pFile->fd = -1; pFile->fd = -1;
// TODO: recover the file info if (tsdbOpenFile(pFile, O_RDONLY) < 0) return -1;
// pFile->info = {0};
if (tread(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) return -1;
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) return -1;
void *pBuf = buf;
pBuf = taosDecodeFixed32(pBuf, &version);
pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info));
return 0; return 0;
} }
...@@ -284,18 +293,6 @@ static int compFGroup(const void *arg1, const void *arg2) { ...@@ -284,18 +293,6 @@ static int compFGroup(const void *arg1, const void *arg2) {
return ((SFileGroup *)arg1)->fileId - ((SFileGroup *)arg2)->fileId; return ((SFileGroup *)arg1)->fileId - ((SFileGroup *)arg2)->fileId;
} }
static int tsdbWriteFileHead(SFile *pFile) {
char head[TSDB_FILE_HEAD_SIZE] = "\0";
pFile->info.size += TSDB_FILE_HEAD_SIZE;
// TODO: write version and File statistic to the head
lseek(pFile->fd, 0, SEEK_SET);
if (write(pFile->fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1;
return 0;
}
int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname) { int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname) {
if (dataDir == NULL || fname == NULL) return -1; if (dataDir == NULL || fname == NULL) return -1;
...@@ -345,7 +342,9 @@ int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, SFile *pFile) ...@@ -345,7 +342,9 @@ int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, SFile *pFile)
return -1; return -1;
} }
if (tsdbWriteFileHead(pFile) < 0) { pFile->info.size = TSDB_FILE_HEAD_SIZE;
if (tsdbUpdateFileHeader(pFile, 0) < 0) {
tsdbCloseFile(pFile); tsdbCloseFile(pFile);
return -1; return -1;
} }
......
...@@ -132,10 +132,10 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t ...@@ -132,10 +132,10 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t
// Init block part // Init block part
if (tsdbInitHelperBlock(pHelper) < 0) goto _err; if (tsdbInitHelperBlock(pHelper) < 0) goto _err;
pHelper->blockBuffer = pHelper->pBuffer =
tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pHelper->config.maxCols + tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pHelper->config.maxCols +
pHelper->config.maxRowSize * pHelper->config.maxRowsPerFileBlock + sizeof(TSCKSUM)); pHelper->config.maxRowSize * pHelper->config.maxRowsPerFileBlock + sizeof(TSCKSUM));
if (pHelper->blockBuffer == NULL) goto _err; if (pHelper->pBuffer == NULL) goto _err;
return 0; return 0;
...@@ -155,7 +155,7 @@ int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { ...@@ -155,7 +155,7 @@ int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
void tsdbDestroyHelper(SRWHelper *pHelper) { void tsdbDestroyHelper(SRWHelper *pHelper) {
if (pHelper) { if (pHelper) {
tzfree(pHelper->blockBuffer); tzfree(pHelper->pBuffer);
tzfree(pHelper->compBuffer); tzfree(pHelper->compBuffer);
tsdbDestroyHelperFile(pHelper); tsdbDestroyHelperFile(pHelper);
tsdbDestroyHelperTable(pHelper); tsdbDestroyHelperTable(pHelper);
...@@ -241,6 +241,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { ...@@ -241,6 +241,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
pHelper->files.headF.fd = -1; pHelper->files.headF.fd = -1;
} }
if (pHelper->files.dataF.fd > 0) { if (pHelper->files.dataF.fd > 0) {
if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.dataF), 0);
close(pHelper->files.dataF.fd); close(pHelper->files.dataF.fd);
pHelper->files.dataF.fd = -1; pHelper->files.dataF.fd = -1;
} }
...@@ -249,6 +250,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { ...@@ -249,6 +250,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
pHelper->files.lastF.fd = -1; pHelper->files.lastF.fd = -1;
} }
if (pHelper->files.nHeadF.fd > 0) { if (pHelper->files.nHeadF.fd > 0) {
if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nHeadF), 0);
close(pHelper->files.nHeadF.fd); close(pHelper->files.nHeadF.fd);
pHelper->files.nHeadF.fd = -1; pHelper->files.nHeadF.fd = -1;
if (hasError) { if (hasError) {
...@@ -260,6 +262,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { ...@@ -260,6 +262,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
} }
if (pHelper->files.nLastF.fd > 0) { if (pHelper->files.nLastF.fd > 0) {
if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nLastF), 0);
close(pHelper->files.nLastF.fd); close(pHelper->files.nLastF.fd);
pHelper->files.nLastF.fd = -1; pHelper->files.nLastF.fd = -1;
if (hasError) { if (hasError) {
...@@ -419,7 +422,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { ...@@ -419,7 +422,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
pIdx->uid = pHelper->tableInfo.uid; pIdx->uid = pHelper->tableInfo.uid;
if (pIdx->offset < 0) return -1; if (pIdx->offset < 0) return -1;
ASSERT(pIdx->offset >= tsizeof(pHelper->pCompIdx)); ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1; if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
} }
...@@ -435,7 +438,8 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { ...@@ -435,7 +438,8 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
SFile *pFile = &(pHelper->files.nHeadF); SFile *pFile = &(pHelper->files.nHeadF);
pFile->info.offset = offset; pFile->info.offset = offset;
void *buf = pHelper->blockBuffer; // TODO: change the implementation of pHelper->pBuffer
void *buf = pHelper->pBuffer;
for (uint32_t i = 0; i < pHelper->config.maxTables; i++) { for (uint32_t i = 0; i < pHelper->config.maxTables; i++) {
SCompIdx *pCompIdx = pHelper->pCompIdx + i; SCompIdx *pCompIdx = pHelper->pCompIdx + i;
if (pCompIdx->offset > 0) { if (pCompIdx->offset > 0) {
...@@ -444,10 +448,10 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { ...@@ -444,10 +448,10 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
} }
} }
int tsize = (char *)buf - (char *)pHelper->blockBuffer + sizeof(TSCKSUM); int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)pHelper->blockBuffer, tsize); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize);
if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->blockBuffer, tsize) < tsize) return -1; if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) return -1;
pFile->info.len = tsize; pFile->info.len = tsize;
return 0; return 0;
} }
...@@ -465,23 +469,23 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { ...@@ -465,23 +469,23 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
ASSERT(pFile->info.offset > TSDB_FILE_HEAD_SIZE); ASSERT(pFile->info.offset > TSDB_FILE_HEAD_SIZE);
if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) return -1; if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) return -1;
if (tread(fd, (void *)(pHelper->blockBuffer), pFile->info.len) < pFile->info.len) if (tread(fd, (void *)(pHelper->pBuffer), pFile->info.len) < pFile->info.len)
return -1; return -1;
if (!taosCheckChecksumWhole((uint8_t *)(pHelper->blockBuffer), pFile->info.len)) { if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) {
// TODO: File is broken, try to deal with it // TODO: File is broken, try to deal with it
return -1; return -1;
} }
// Decode it // Decode it
void *ptr = pHelper->blockBuffer; void *ptr = pHelper->pBuffer;
while ((char *)ptr - (char *)pHelper->blockBuffer >= pFile->info.len - sizeof(TSCKSUM)) { while ((char *)ptr - (char *)pHelper->pBuffer >= pFile->info.len - sizeof(TSCKSUM)) {
uint32_t tid = 0; uint32_t tid = 0;
if ((ptr = taosDecodeVariant32(ptr, &tid)) == NULL) return -1; if ((ptr = taosDecodeVariant32(ptr, &tid)) == NULL) return -1;
ASSERT(tid > 0 && tid < pHelper->config.maxTables); ASSERT(tid > 0 && tid < pHelper->config.maxTables);
if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1; if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1;
ASSERT((char *)ptr - (char *)pHelper->blockBuffer <= pFile->info.len - sizeof(TSCKSUM)); ASSERT((char *)ptr - (char *)pHelper->pBuffer <= pFile->info.len - sizeof(TSCKSUM));
} }
} }
...@@ -626,9 +630,9 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 ...@@ -626,9 +630,9 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
ASSERT(pCompBlock->numOfSubBlocks <= 1); ASSERT(pCompBlock->numOfSubBlocks <= 1);
ASSERT(tsizeof(pHelper->blockBuffer) >= pCompBlock->len); ASSERT(tsizeof(pHelper->pBuffer) >= pCompBlock->len);
SCompData *pCompData = (SCompData *)pHelper->blockBuffer; SCompData *pCompData = (SCompData *)pHelper->pBuffer;
int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) goto _err; if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) goto _err;
...@@ -721,7 +725,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa ...@@ -721,7 +725,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints && ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints &&
rowsToWrite <= pHelper->config.maxRowsPerFileBlock); rowsToWrite <= pHelper->config.maxRowsPerFileBlock);
SCompData *pCompData = (SCompData *)(pHelper->blockBuffer); SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
int64_t offset = 0; int64_t offset = 0;
offset = lseek(pFile->fd, 0, SEEK_END); offset = lseek(pFile->fd, 0, SEEK_END);
...@@ -776,7 +780,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa ...@@ -776,7 +780,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
} }
pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))( pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))(
(char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->blockBuffer) - lsize, (char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->pBuffer) - lsize,
pHelper->config.compress, pHelper->compBuffer, tsizeof(pHelper->compBuffer)); pHelper->config.compress, pHelper->compBuffer, tsizeof(pHelper->compBuffer));
} else { } else {
pCompCol->len = tlen; pCompCol->len = tlen;
...@@ -1242,11 +1246,8 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) { ...@@ -1242,11 +1246,8 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
void *pBuf = (void *)buf; void *pBuf = (void *)buf;
pBuf = taosEncodeFixed32(pBuf, version); pBuf = taosEncodeFixed32(pBuf, version);
pBuf = tsdbEncodeSFileInfo(pBuf, &(pFile->info)); pBuf = tsdbEncodeSFileInfo(pBuf, &(pFile->info));
int tsize = (char *)pBuf - buf + sizeof(TSCKSUM);
ASSERT(tsize <= TSDB_FILE_HEAD_SIZE); taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
taosCalcChecksumAppend(0, (uint8_t *)buf, tsize);
if (lseek(pFile->fd, 0, SEEK_SET) < 0) return -1; if (lseek(pFile->fd, 0, SEEK_SET) < 0) return -1;
if (twrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) return -1; if (twrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册