未验证 提交 22cbd962 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2995 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -151,22 +151,22 @@ typedef struct { ...@@ -151,22 +151,22 @@ typedef struct {
// ------------------ tsdbFile.c // ------------------ tsdbFile.c
extern const char* tsdbFileSuffix[]; extern const char* tsdbFileSuffix[];
typedef enum { typedef enum {
#ifdef TSDB_IDX
TSDB_FILE_TYPE_IDX = 0,
TSDB_FILE_TYPE_HEAD,
#else
TSDB_FILE_TYPE_HEAD = 0, TSDB_FILE_TYPE_HEAD = 0,
#endif
TSDB_FILE_TYPE_DATA, TSDB_FILE_TYPE_DATA,
TSDB_FILE_TYPE_LAST, TSDB_FILE_TYPE_LAST,
TSDB_FILE_TYPE_MAX, TSDB_FILE_TYPE_STAT,
#ifdef TSDB_IDX
TSDB_FILE_TYPE_NIDX,
#endif
TSDB_FILE_TYPE_NHEAD, TSDB_FILE_TYPE_NHEAD,
TSDB_FILE_TYPE_NLAST TSDB_FILE_TYPE_NDATA,
TSDB_FILE_TYPE_NLAST,
TSDB_FILE_TYPE_NSTAT
} TSDB_FILE_TYPE; } TSDB_FILE_TYPE;
#ifndef TDINTERNAL
#define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_LAST+1)
#else
#define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_STAT+1)
#endif
typedef struct { typedef struct {
uint32_t magic; uint32_t magic;
uint32_t len; uint32_t len;
...@@ -281,9 +281,6 @@ typedef struct { ...@@ -281,9 +281,6 @@ typedef struct {
TSKEY minKey; TSKEY minKey;
TSKEY maxKey; TSKEY maxKey;
SFileGroup fGroup; SFileGroup fGroup;
#ifdef TSDB_IDX
SFile nIdxF;
#endif
SFile nHeadF; SFile nHeadF;
SFile nLastF; SFile nLastF;
} SHelperFile; } SHelperFile;
...@@ -497,10 +494,6 @@ void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TS ...@@ -497,10 +494,6 @@ void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TS
#define helperState(h) (h)->state #define helperState(h) (h)->state
#define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0) #define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0)
#define helperFileId(h) ((h)->files.fGroup.fileId) #define helperFileId(h) ((h)->files.fGroup.fileId)
#ifdef TSDB_IDX
#define helperIdxF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_IDX]))
#define helperNewIdxF(h) (&((h)->files.nIdxF))
#endif
#define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD])) #define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD]))
#define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA])) #define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA]))
#define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST])) #define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST]))
...@@ -512,7 +505,7 @@ int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo); ...@@ -512,7 +505,7 @@ int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
void tsdbDestroyHelper(SRWHelper* pHelper); void tsdbDestroyHelper(SRWHelper* pHelper);
void tsdbResetHelper(SRWHelper* pHelper); void tsdbResetHelper(SRWHelper* pHelper);
int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup); int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError); int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError, SFileGroup* pGroup);
int tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo); int tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey); int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey);
int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper); int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
......
...@@ -21,11 +21,7 @@ ...@@ -21,11 +21,7 @@
#define TAOS_RANDOM_FILE_FAIL_TEST #define TAOS_RANDOM_FILE_FAIL_TEST
#ifdef TSDB_IDX const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
const char *tsdbFileSuffix[] = {".idx", ".head", ".data", ".last", "", ".i", ".h", ".l"};
#else
const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".l"};
#endif
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
static void tsdbDestroyFile(SFile *pFile); static void tsdbDestroyFile(SFile *pFile);
...@@ -413,6 +409,10 @@ static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { ...@@ -413,6 +409,10 @@ static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
pBuf = taosDecodeFixedU32(pBuf, &version); pBuf = taosDecodeFixedU32(pBuf, &version);
pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info)); pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info));
if (pFile->info.size == TSDB_FILE_HEAD_SIZE) {
pFile->info.size = lseek(pFile->fd, 0, SEEK_END);
}
if (version != TSDB_FILE_VERSION) { if (version != TSDB_FILE_VERSION) {
tsdbError("vgId:%d file %s version %u is not the same as program version %u which may cause problem", tsdbError("vgId:%d file %s version %u is not the same as program version %u which may cause problem",
REPO_ID(pRepo), pFile->fname, version, TSDB_FILE_VERSION); REPO_ID(pRepo), pFile->fname, version, TSDB_FILE_VERSION);
......
...@@ -679,15 +679,10 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe ...@@ -679,15 +679,10 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
} }
taosTFree(dataDir); taosTFree(dataDir);
tsdbCloseHelperFile(pHelper, 0); tsdbCloseHelperFile(pHelper, 0, pGroup);
pthread_rwlock_wrlock(&(pFileH->fhlock)); pthread_rwlock_wrlock(&(pFileH->fhlock));
#ifdef TSDB_IDX
rename(helperNewIdxF(pHelper)->fname, helperIdxF(pHelper)->fname);
pGroup->files[TSDB_FILE_TYPE_IDX].info = helperNewIdxF(pHelper)->info;
#endif
rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname); rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname);
pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info; pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
...@@ -706,7 +701,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe ...@@ -706,7 +701,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
_err: _err:
taosTFree(dataDir); taosTFree(dataDir);
tsdbCloseHelperFile(pHelper, 1); tsdbCloseHelperFile(pHelper, 1, NULL);
return -1; return -1;
} }
......
...@@ -91,7 +91,7 @@ void tsdbResetHelper(SRWHelper *pHelper) { ...@@ -91,7 +91,7 @@ void tsdbResetHelper(SRWHelper *pHelper) {
tsdbResetHelperTableImpl(pHelper); tsdbResetHelperTableImpl(pHelper);
// Reset the file part // Reset the file part
tsdbCloseHelperFile(pHelper, false); tsdbCloseHelperFile(pHelper, false, NULL);
tsdbResetHelperFileImpl(pHelper); tsdbResetHelperFileImpl(pHelper);
pHelper->state = TSDB_HELPER_CLEAR_STATE; pHelper->state = TSDB_HELPER_CLEAR_STATE;
...@@ -110,31 +110,16 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { ...@@ -110,31 +110,16 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
// Set the files // Set the files
pHelper->files.fGroup = *pGroup; pHelper->files.fGroup = *pGroup;
if (helperType(pHelper) == TSDB_WRITE_HELPER) { if (helperType(pHelper) == TSDB_WRITE_HELPER) {
#ifdef TSDB_IDX
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NIDX, helperNewIdxF(pHelper)->fname);
#endif
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD, helperNewHeadF(pHelper)->fname); tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD, helperNewHeadF(pHelper)->fname);
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST, helperNewLastF(pHelper)->fname); tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST, helperNewLastF(pHelper)->fname);
} }
// Open the files // Open the files
#ifdef TSDB_IDX
if (tsdbOpenFile(helperIdxF(pHelper), O_RDONLY) < 0) return -1;
#endif
if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) return -1; if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) return -1;
if (helperType(pHelper) == TSDB_WRITE_HELPER) { if (helperType(pHelper) == TSDB_WRITE_HELPER) {
if (tsdbOpenFile(helperDataF(pHelper), O_RDWR) < 0) return -1; if (tsdbOpenFile(helperDataF(pHelper), O_RDWR) < 0) return -1;
if (tsdbOpenFile(helperLastF(pHelper), O_RDWR) < 0) return -1; if (tsdbOpenFile(helperLastF(pHelper), O_RDWR) < 0) return -1;
#ifdef TSDB_IDX
// Create and open .i file
pFile = helperNewIdxF(pHelper);
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1;
pFile->info.size = TSDB_FILE_HEAD_SIZE;
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
if (tsdbUpdateFileHeader(pFile) < 0) return -1;
#endif
// Create and open .h // Create and open .h
pFile = helperNewHeadF(pHelper); pFile = helperNewHeadF(pHelper);
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1; if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1;
...@@ -161,14 +146,9 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { ...@@ -161,14 +146,9 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
return 0; return 0;
} }
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError, SFileGroup *pGroup) {
SFile *pFile = NULL; SFile *pFile = NULL;
#ifdef TSDB_IDX
pFile = helperIdxF(pHelper);
tsdbCloseFile(pFile);
#endif
pFile = helperHeadF(pHelper); pFile = helperHeadF(pHelper);
tsdbCloseFile(pFile); tsdbCloseFile(pFile);
...@@ -177,10 +157,11 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { ...@@ -177,10 +157,11 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
if (helperType(pHelper) == TSDB_WRITE_HELPER) { if (helperType(pHelper) == TSDB_WRITE_HELPER) {
if (!hasError) { if (!hasError) {
tsdbUpdateFileHeader(pFile); tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
} else { } else {
// TODO: shrink back to origin ASSERT(pGroup != NULL);
taosFtruncate(pFile->fd, pGroup->files[TSDB_FILE_TYPE_DATA].info.size);
} }
fsync(pFile->fd);
} }
tsdbCloseFile(pFile); tsdbCloseFile(pFile);
} }
...@@ -190,27 +171,16 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { ...@@ -190,27 +171,16 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
if (helperType(pHelper) == TSDB_WRITE_HELPER && !TSDB_NLAST_FILE_OPENED(pHelper)) { if (helperType(pHelper) == TSDB_WRITE_HELPER && !TSDB_NLAST_FILE_OPENED(pHelper)) {
if (!hasError) { if (!hasError) {
tsdbUpdateFileHeader(pFile); tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
} else { } else {
// TODO: shrink back to origin ASSERT(pGroup != NULL);
taosFtruncate(pFile->fd, pGroup->files[TSDB_FILE_TYPE_LAST].info.size);
} }
fsync(pFile->fd);
} }
tsdbCloseFile(pFile); tsdbCloseFile(pFile);
} }
if (helperType(pHelper) == TSDB_WRITE_HELPER) { if (helperType(pHelper) == TSDB_WRITE_HELPER) {
#ifdef TSDB_IDX
pFile = helperNewIdxF(pHelper);
if (pFile->fd > 0) {
if (!hasError) {
tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
}
tsdbCloseFile(pFile);
if (hasError) (void)remove(pFile->fname);
}
#endif
pFile = helperNewHeadF(pHelper); pFile = helperNewHeadF(pHelper);
if (pFile->fd > 0) { if (pFile->fd > 0) {
if (!hasError) { if (!hasError) {
...@@ -412,10 +382,6 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { ...@@ -412,10 +382,6 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
return -1; return -1;
} }
#ifdef TSDB_IDX
pFile = helperNewIdxF(pHelper);
#endif
if (taosTSizeof(pHelper->pWIdx) < pFile->info.len + sizeof(SCompIdx) + 12) { if (taosTSizeof(pHelper->pWIdx) < pFile->info.len + sizeof(SCompIdx) + 12) {
pHelper->pWIdx = taosTRealloc(pHelper->pWIdx, taosTSizeof(pHelper->pWIdx) == 0 ? 1024 : taosTSizeof(pHelper->pWIdx) * 2); pHelper->pWIdx = taosTRealloc(pHelper->pWIdx, taosTSizeof(pHelper->pWIdx) == 0 ? 1024 : taosTSizeof(pHelper->pWIdx) * 2);
if (pHelper->pWIdx == NULL) { if (pHelper->pWIdx == NULL) {
...@@ -426,6 +392,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { ...@@ -426,6 +392,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
void *pBuf = POINTER_SHIFT(pHelper->pWIdx, pFile->info.len); void *pBuf = POINTER_SHIFT(pHelper->pWIdx, pFile->info.len);
pFile->info.len += tsdbEncodeSCompIdx(&pBuf, &(pHelper->curCompIdx)); pFile->info.len += tsdbEncodeSCompIdx(&pBuf, &(pHelper->curCompIdx));
pFile->info.size += pIdx->len;
// ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR));
} }
return 0; return 0;
...@@ -435,11 +404,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { ...@@ -435,11 +404,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER); ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
off_t offset = 0; off_t offset = 0;
#ifdef TSDB_IDX
SFile *pFile = helperNewIdxF(pHelper);
#else
SFile *pFile = helperNewHeadF(pHelper); SFile *pFile = helperNewHeadF(pHelper);
#endif
pFile->info.len += sizeof(TSCKSUM); pFile->info.len += sizeof(TSCKSUM);
if (taosTSizeof(pHelper->pWIdx) < pFile->info.len) { if (taosTSizeof(pHelper->pWIdx) < pFile->info.len) {
...@@ -460,7 +425,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { ...@@ -460,7 +425,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
return -1; return -1;
} }
pFile->info.offset = offset; ASSERT(offset == pFile->info.size);
if (taosTWrite(pFile->fd, (void *)pHelper->pWIdx, pFile->info.len) < (int)pFile->info.len) { if (taosTWrite(pFile->fd, (void *)pHelper->pWIdx, pFile->info.len) < (int)pFile->info.len) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len, tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len,
...@@ -469,16 +434,16 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { ...@@ -469,16 +434,16 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
return -1; return -1;
} }
pFile->info.offset = offset;
pFile->info.size += pFile->info.len;
// ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR));
return 0; return 0;
} }
int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN); ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN);
#ifdef TSDB_IDX
SFile *pFile = helperIdxF(pHelper);
#else
SFile *pFile = helperHeadF(pHelper); SFile *pFile = helperHeadF(pHelper);
#endif
int fd = pFile->fd; int fd = pFile->fd;
if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) { if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) {
...@@ -847,6 +812,9 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa ...@@ -847,6 +812,9 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
(int)(pCompBlock->numOfRows), pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst, (int)(pCompBlock->numOfRows), pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst,
pCompBlock->keyLast); pCompBlock->keyLast);
pFile->info.size += pCompBlock->len;
// ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR));
return 0; return 0;
_err: _err:
...@@ -1052,10 +1020,6 @@ static void tsdbResetHelperFileImpl(SRWHelper *pHelper) { ...@@ -1052,10 +1020,6 @@ static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
helperLastF(pHelper)->fd = -1; helperLastF(pHelper)->fd = -1;
helperNewHeadF(pHelper)->fd = -1; helperNewHeadF(pHelper)->fd = -1;
helperNewLastF(pHelper)->fd = -1; helperNewLastF(pHelper)->fd = -1;
#ifdef TSDB_IDX
helperIdxF(pHelper)->fd = -1;
helperNewIdxF(pHelper)->fd = -1;
#endif
} }
static int tsdbInitHelperFile(SRWHelper *pHelper) { static int tsdbInitHelperFile(SRWHelper *pHelper) {
...@@ -1064,7 +1028,7 @@ static int tsdbInitHelperFile(SRWHelper *pHelper) { ...@@ -1064,7 +1028,7 @@ static int tsdbInitHelperFile(SRWHelper *pHelper) {
} }
static void tsdbDestroyHelperFile(SRWHelper *pHelper) { static void tsdbDestroyHelperFile(SRWHelper *pHelper) {
tsdbCloseHelperFile(pHelper, false); tsdbCloseHelperFile(pHelper, false, NULL);
tsdbResetHelperFileImpl(pHelper); tsdbResetHelperFileImpl(pHelper);
taosTZfree(pHelper->idxH.pIdxArray); taosTZfree(pHelper->idxH.pIdxArray);
taosTZfree(pHelper->pWIdx); taosTZfree(pHelper->pWIdx);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册