提交 38b8f4ef 编写于 作者: H Hongze Cheng

TD-611

上级 dd31a82c
...@@ -173,9 +173,14 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { ...@@ -173,9 +173,14 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
close(pHelper->files.nHeadF.fd); close(pHelper->files.nHeadF.fd);
pHelper->files.nHeadF.fd = -1; pHelper->files.nHeadF.fd = -1;
if (hasError) { if (hasError) {
remove(pHelper->files.nHeadF.fname); (void)remove(pHelper->files.nHeadF.fname);
} else { } else {
rename(pHelper->files.nHeadF.fname, pHelper->files.headF.fname); if (rename(pHelper->files.nHeadF.fname, pHelper->files.headF.fname) < 0) {
tsdbError("vgId:%d failed to rename file from %s to %s since %s", REPO_ID(pHelper->pRepo),
pHelper->files.nHeadF.fname, pHelper->files.headF.fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pHelper->files.headF.info = pHelper->files.nHeadF.info; pHelper->files.headF.info = pHelper->files.nHeadF.info;
} }
} }
...@@ -186,9 +191,14 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { ...@@ -186,9 +191,14 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
close(pHelper->files.nLastF.fd); close(pHelper->files.nLastF.fd);
pHelper->files.nLastF.fd = -1; pHelper->files.nLastF.fd = -1;
if (hasError) { if (hasError) {
remove(pHelper->files.nLastF.fname); (void)remove(pHelper->files.nLastF.fname);
} else { } else {
rename(pHelper->files.nLastF.fname, pHelper->files.lastF.fname); if (rename(pHelper->files.nLastF.fname, pHelper->files.lastF.fname) < 0) {
tsdbError("vgId:%d failed to rename file from %s to %s since %s", REPO_ID(pHelper->pRepo),
pHelper->files.nLastF.fname, pHelper->files.lastF.fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pHelper->files.lastF.info = pHelper->files.nLastF.info; pHelper->files.lastF.info = pHelper->files.nLastF.info;
} }
} }
...@@ -306,8 +316,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { ...@@ -306,8 +316,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
if (pCompBlock->numOfSubBlocks > 1) { if (pCompBlock->numOfSubBlocks > 1) {
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1; if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows > 0 && ASSERT(pHelper->pDataCols[0]->numOfRows > 0 && pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock);
pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0) pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0)
return -1; return -1;
...@@ -330,14 +339,27 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { ...@@ -330,14 +339,27 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
} }
int tsdbWriteCompInfo(SRWHelper *pHelper) { int tsdbWriteCompInfo(SRWHelper *pHelper) {
off_t offset = 0;
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) { if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
if (pIdx->offset > 0) { if (pIdx->offset > 0) {
pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
if (pIdx->offset < 0) return -1; if (offset < 0) {
tsdbError("vgId:%d failed to lseed file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pIdx->offset = offset;
ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) return -1; if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) {
tsdbError("vgId:%d failed to send %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
} }
} else { } else {
pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER; pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER;
...@@ -345,12 +367,23 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { ...@@ -345,12 +367,23 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
pHelper->pCompInfo->checksum = 0; pHelper->pCompInfo->checksum = 0;
ASSERT((pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0); ASSERT((pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0);
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len);
pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
if (offset < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pIdx->offset = offset;
pIdx->uid = pHelper->tableInfo.uid; pIdx->uid = pHelper->tableInfo.uid;
if (pIdx->offset < 0) return -1;
ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1; if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
pHelper->files.nHeadF.fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
} }
return 0; return 0;
...@@ -567,24 +600,24 @@ _err: ...@@ -567,24 +600,24 @@ _err:
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
ASSERT(pHelper->files.lastF.fd > 0); ASSERT(pHelper->files.lastF.fd > 0);
struct stat st; struct stat st;
fstat(pHelper->files.lastF.fd, &st); if (fstat(pHelper->files.lastF.fd, &st) < 0) return true;
if (st.st_size > 32 * 1024 + TSDB_FILE_HEAD_SIZE) return true; if (st.st_size > 32 * 1024 + TSDB_FILE_HEAD_SIZE) return true;
return false; return false;
} }
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
SCompBlock *pCompBlock, bool isLast, bool isSuperBlock) { SCompBlock *pCompBlock, bool isLast, bool isSuperBlock) {
STsdbCfg *pCfg = &(pHelper->pRepo->config); STsdbCfg * pCfg = &(pHelper->pRepo->config);
SCompData *pCompData = (SCompData *)(pHelper->pBuffer); SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
int64_t offset = 0; int64_t offset = 0;
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pCfg->maxRowsPerFileBlock); ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pCfg->maxRowsPerFileBlock);
ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true); ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true);
offset = lseek(pFile->fd, 0, SEEK_END); offset = lseek(pFile->fd, 0, SEEK_END);
if (offset < 0) { if (offset < 0) {
tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno)); tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
...@@ -639,9 +672,9 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa ...@@ -639,9 +672,9 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
} }
} }
pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))( pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr,
(char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->pBuffer) - lsize, pCfg->compression, tsizeof(pHelper->pBuffer) - lsize, pCfg->compression,
pHelper->compBuffer, tsizeof(pHelper->compBuffer)); pHelper->compBuffer, tsizeof(pHelper->compBuffer));
} else { } else {
pCompCol->len = tlen; pCompCol->len = tlen;
memcpy(tptr, pDataCol->pData, pCompCol->len); memcpy(tptr, pDataCol->pData, pCompCol->len);
...@@ -725,8 +758,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -725,8 +758,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
// ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0); // ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfRows < pCfg->minRowsPerFileBlock && ASSERT(blockAtIdx(pHelper, blkIdx)->numOfRows < pCfg->minRowsPerFileBlock && blkIdx == pIdx->numOfBlocks - 1);
blkIdx == pIdx->numOfBlocks - 1);
int defaultRowsToWrite = pCfg->maxRowsPerFileBlock * 4 / 5; // TODO: make a interface int defaultRowsToWrite = pCfg->maxRowsPerFileBlock * 4 / 5; // TODO: make a interface
rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows); rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows);
...@@ -1051,7 +1083,7 @@ static void tsdbResetHelperFileImpl(SRWHelper *pHelper) { ...@@ -1051,7 +1083,7 @@ static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
static int tsdbInitHelperFile(SRWHelper *pHelper) { static int tsdbInitHelperFile(SRWHelper *pHelper) {
STsdbCfg *pCfg = &pHelper->pRepo->config; STsdbCfg *pCfg = &pHelper->pRepo->config;
size_t tsize = sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM); size_t tsize = sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM);
pHelper->pCompIdx = (SCompIdx *)tmalloc(tsize); pHelper->pCompIdx = (SCompIdx *)tmalloc(tsize);
if (pHelper->pCompIdx == NULL) { if (pHelper->pCompIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
...@@ -1099,10 +1131,8 @@ static int tsdbInitHelperBlock(SRWHelper *pHelper) { ...@@ -1099,10 +1131,8 @@ static int tsdbInitHelperBlock(SRWHelper *pHelper) {
STsdbRepo *pRepo = helperRepo(pHelper); STsdbRepo *pRepo = helperRepo(pHelper);
STsdbMeta *pMeta = pHelper->pRepo->tsdbMeta; STsdbMeta *pMeta = pHelper->pRepo->tsdbMeta;
pHelper->pDataCols[0] = pHelper->pDataCols[0] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock); pHelper->pDataCols[1] = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
pHelper->pDataCols[1] =
tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) { if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
...@@ -1222,12 +1252,16 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 ...@@ -1222,12 +1252,16 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) { static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
ASSERT(pCompBlock->numOfSubBlocks <= 1); ASSERT(pCompBlock->numOfSubBlocks <= 1);
ASSERT(tsizeof(pHelper->pBuffer) >= pCompBlock->len);
SCompData *pCompData = (SCompData *)pHelper->pBuffer; SCompData *pCompData = (SCompData *)pHelper->pBuffer;
SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF); SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);
pHelper->pBuffer = trealloc(pHelper->pBuffer, pCompBlock->len);
if (pHelper->pBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
int fd = pFile->fd; int fd = pFile->fd;
if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) { if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) {
tsdbError("vgId:%d tid:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, tsdbError("vgId:%d tid:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
......
...@@ -78,8 +78,8 @@ int tdCreateKVStore(char *fname) { ...@@ -78,8 +78,8 @@ int tdCreateKVStore(char *fname) {
return 0; return 0;
_err: _err:
if (fd > 0) close(fd); if (fd >= 0) close(fd);
remove(fname); (void)remove(fname);
return -1; return -1;
} }
...@@ -106,15 +106,15 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH ...@@ -106,15 +106,15 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
goto _err; goto _err;
} }
if (access(pStore->fsnap, F_OK) == 0) { // .snap file exists pStore->sfd = open(pStore->fsnap, O_RDONLY);
uTrace("file %s exists, try to recover the KV store", pStore->fsnap); if (pStore->sfd < 0) {
pStore->sfd = open(pStore->fsnap, O_RDONLY); if (errno != ENOENT) {
if (pStore->sfd < 0) {
uError("failed to open file %s since %s", pStore->fsnap, strerror(errno)); uError("failed to open file %s since %s", pStore->fsnap, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
} else {
uTrace("file %s exists, try to recover the KV store", pStore->fsnap);
if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) { if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) {
if (terrno != TSDB_CODE_COM_FILE_CORRUPTED) goto _err; if (terrno != TSDB_CODE_COM_FILE_CORRUPTED) goto _err;
} else { } else {
...@@ -133,7 +133,7 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH ...@@ -133,7 +133,7 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
close(pStore->sfd); close(pStore->sfd);
pStore->sfd = -1; pStore->sfd = -1;
remove(pStore->fsnap); (void)remove(pStore->fsnap);
} }
if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err; if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err;
...@@ -212,7 +212,7 @@ _err: ...@@ -212,7 +212,7 @@ _err:
if (pStore->sfd > 0) { if (pStore->sfd > 0) {
close(pStore->sfd); close(pStore->sfd);
pStore->sfd = -1; pStore->sfd = -1;
remove(pStore->fsnap); (void)remove(pStore->fsnap);
} }
if (pStore->fd > 0) { if (pStore->fd > 0) {
close(pStore->fd); close(pStore->fd);
...@@ -314,7 +314,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) { ...@@ -314,7 +314,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) {
} }
pStore->fd = -1; pStore->fd = -1;
remove(pStore->fsnap); (void)remove(pStore->fsnap);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册