未验证 提交 8948dfb3 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1490 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -78,7 +78,7 @@ typedef struct STable { ...@@ -78,7 +78,7 @@ typedef struct STable {
void * tsdbEncodeTable(STable *pTable, int *contLen); void * tsdbEncodeTable(STable *pTable, int *contLen);
STable *tsdbDecodeTable(void *cont, int contLen); STable *tsdbDecodeTable(void *cont, int contLen);
void * tsdbFreeEncode(void *cont); void tsdbFreeEncode(void *cont);
// ---------- TSDB META HANDLE DEFINITION // ---------- TSDB META HANDLE DEFINITION
typedef struct { typedef struct {
...@@ -97,7 +97,7 @@ typedef struct { ...@@ -97,7 +97,7 @@ typedef struct {
int maxCols; int maxCols;
} STsdbMeta; } STsdbMeta;
STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables); STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables);
int32_t tsdbFreeMeta(STsdbMeta *pMeta); int32_t tsdbFreeMeta(STsdbMeta *pMeta);
STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable); STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable);
...@@ -216,7 +216,7 @@ typedef struct { ...@@ -216,7 +216,7 @@ typedef struct {
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles); STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles);
void tsdbCloseFileH(STsdbFileH *pFileH); void tsdbCloseFileH(STsdbFileH *pFileH);
int tsdbCreateFile(char *dataDir, int fileId, char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose); int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose);
int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables); int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables);
int tsdbOpenFile(SFile *pFile, int oflag); int tsdbOpenFile(SFile *pFile, int oflag);
int tsdbCloseFile(SFile *pFile); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); int tsdbCloseFile(SFile *pFile); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid);
......
...@@ -72,7 +72,7 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) { ...@@ -72,7 +72,7 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) {
if (bytes > pCache->cacheBlockSize) return NULL; if (bytes > pCache->cacheBlockSize) return NULL;
if (pCache->curBlock == NULL || pCache->curBlock->remain < bytes) { if (pCache->curBlock == NULL || pCache->curBlock->remain < bytes) {
if (pCache->curBlock !=NULL && (pCache->mem->list) >= pCache->totalCacheBlocks/2) { if (pCache->curBlock !=NULL && listNEles(pCache->mem->list) >= pCache->totalCacheBlocks/2) {
tsdbTriggerCommit(pCache->pRepo); tsdbTriggerCommit(pCache->pRepo);
} }
...@@ -130,7 +130,7 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) { ...@@ -130,7 +130,7 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
if (pCache->mem == NULL) { // Create a new one if (pCache->mem == NULL) { // Create a new one
pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem)); pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem));
if (pCache->mem == NULL) return NULL; if (pCache->mem == NULL) return -1;
pCache->mem->keyFirst = INT64_MAX; pCache->mem->keyFirst = INT64_MAX;
pCache->mem->keyLast = 0; pCache->mem->keyLast = 0;
pCache->mem->numOfPoints = 0; pCache->mem->numOfPoints = 0;
......
...@@ -33,7 +33,7 @@ const char *tsdbFileSuffix[] = { ...@@ -33,7 +33,7 @@ const char *tsdbFileSuffix[] = {
static int compFGroupKey(const void *key, const void *fgroup); static int compFGroupKey(const void *key, const void *fgroup);
static int compFGroup(const void *arg1, const void *arg2); static int compFGroup(const void *arg1, const void *arg2);
static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname); static int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname);
static int tsdbWriteFileHead(SFile *pFile); static int tsdbWriteFileHead(SFile *pFile);
static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables);
static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid); static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid);
...@@ -53,8 +53,6 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { ...@@ -53,8 +53,6 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) {
} }
struct dirent *dp = NULL; struct dirent *dp = NULL;
int fid = 0;
SFileGroup fGroup = {0};
while ((dp = readdir(dir)) != NULL) { while ((dp = readdir(dir)) != NULL) {
if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 1) == 0) continue; if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 1) == 0) continue;
int fid = 0; int fid = 0;
...@@ -70,7 +68,7 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { ...@@ -70,7 +68,7 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) {
void tsdbCloseFileH(STsdbFileH *pFileH) { free(pFileH); } void tsdbCloseFileH(STsdbFileH *pFileH) { free(pFileH); }
static int tsdbInitFile(char *dataDir, int fid, char *suffix, SFile *pFile) { static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile) {
tsdbGetFileName(dataDir, fid, suffix, pFile->fname); tsdbGetFileName(dataDir, fid, suffix, pFile->fname);
if (access(pFile->fname, F_OK|R_OK|W_OK) < 0) return -1; if (access(pFile->fname, F_OK|R_OK|W_OK) < 0) return -1;
pFile->fd = -1; pFile->fd = -1;
...@@ -82,7 +80,6 @@ static int tsdbInitFile(char *dataDir, int fid, char *suffix, SFile *pFile) { ...@@ -82,7 +80,6 @@ static int tsdbInitFile(char *dataDir, int fid, char *suffix, SFile *pFile) {
static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid) { static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid) {
if (tsdbSearchFGroup(pFileH, fid) != NULL) return 0; if (tsdbSearchFGroup(pFileH, fid) != NULL) return 0;
char fname[128] = "\0";
SFileGroup fGroup = {0}; SFileGroup fGroup = {0};
fGroup.fileId = fid; fGroup.fileId = fid;
...@@ -163,8 +160,8 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { ...@@ -163,8 +160,8 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
SFileGroup *ret = pIter->pFileGroup; SFileGroup *ret = pIter->pFileGroup;
if (ret == NULL) return NULL; if (ret == NULL) return NULL;
if (pIter->direction = TSDB_FGROUP_ITER_FORWARD) { if (pIter->direction == TSDB_FGROUP_ITER_FORWARD) {
if (pIter->pFileGroup + 1 == pIter->base + pIter->numOfFGroups) { if ((pIter->pFileGroup + 1) == (pIter->base + pIter->numOfFGroups)) {
pIter->pFileGroup = NULL; pIter->pFileGroup = NULL;
} else { } else {
pIter->pFileGroup += 1; pIter->pFileGroup += 1;
...@@ -223,7 +220,7 @@ int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInf ...@@ -223,7 +220,7 @@ int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInf
if (pCompData == NULL) return -1; if (pCompData == NULL) return -1;
// Load data from the block // Load data from the block
if (tsdbLoadDataBlock(pOutFile, pStartBlock, numOfBlocks, pCols, pCompData)); // if (tsdbLoadDataBlock(pOutFile, pStartBlock, numOfBlocks, pCols, pCompData));
// Write data block to the file // Write data block to the file
{ {
...@@ -315,7 +312,7 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) { ...@@ -315,7 +312,7 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) {
return 0; return 0;
} }
static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname) { static int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname) {
if (dataDir == NULL || fname == NULL) return -1; if (dataDir == NULL || fname == NULL) return -1;
sprintf(fname, "%s/f%d%s", dataDir, fileId, suffix); sprintf(fname, "%s/f%d%s", dataDir, fileId, suffix);
...@@ -348,7 +345,7 @@ SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) { ...@@ -348,7 +345,7 @@ SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) {
return pGroup; return pGroup;
} }
int tsdbCreateFile(char *dataDir, int fileId, char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose) { int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose) {
memset((void *)pFile, 0, sizeof(SFile)); memset((void *)pFile, 0, sizeof(SFile));
pFile->fd = -1; pFile->fd = -1;
......
...@@ -806,7 +806,7 @@ static void tsdbDestroyTableIters(SSkipListIterator **iters, int maxTables) { ...@@ -806,7 +806,7 @@ static void tsdbDestroyTableIters(SSkipListIterator **iters, int maxTables) {
} }
static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables) { static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables) {
SSkipListIterator **iters = (SSkipListIterator *)calloc(maxTables, sizeof(SSkipListIterator *)); SSkipListIterator **iters = (SSkipListIterator **)calloc(maxTables, sizeof(SSkipListIterator *));
if (iters == NULL) return NULL; if (iters == NULL) return NULL;
for (int tid = 0; tid < maxTables; tid++) { for (int tid = 0; tid < maxTables; tid++) {
...@@ -893,9 +893,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -893,9 +893,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
SFileGroup *pGroup = NULL; SFileGroup *pGroup = NULL;
SCompIdx * pIndices = NULL; SCompIdx * pIndices = NULL;
SCompInfo * pCompInfo = NULL; SCompInfo * pCompInfo = NULL;
size_t compInfoSize = 0; // size_t compInfoSize = 0;
SCompBlock compBlock; // SCompBlock compBlock;
SCompBlock *pBlock = &compBlock; // SCompBlock *pBlock = &compBlock;
TSKEY minKey = 0, maxKey = 0; TSKEY minKey = 0, maxKey = 0;
tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
...@@ -963,10 +963,11 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -963,10 +963,11 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pTBlock, nBlocks); TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pTBlock, nBlocks);
SCompBlock tBlock; SCompBlock tBlock;
int64_t toffset, tlen; int64_t toffset;
int32_t tlen;
tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_LAST], pTBlock, nBlocks, pCols, &tBlock); tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_LAST], pTBlock, nBlocks, pCols, &tBlock);
tsdbWriteBlockToFileImpl(&lFile, pCols, pCols->numOfPoints, &toffset, tlen, pTable->tableId.uid); tsdbWriteBlockToFileImpl(&lFile, pCols, pCols->numOfPoints, &toffset, &tlen, pTable->tableId.uid);
pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks); pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks);
pTBlock->offset = toffset; pTBlock->offset = toffset;
pTBlock->len = tlen; pTBlock->len = tlen;
...@@ -1164,7 +1165,6 @@ static int compareKeyBlock(const void *arg1, const void *arg2) { ...@@ -1164,7 +1165,6 @@ static int compareKeyBlock(const void *arg1, const void *arg2) {
int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) { int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) {
STsdbCfg * pCfg = &(pRepo->config); STsdbCfg * pCfg = &(pRepo->config);
SCompData *pCompData = NULL;
SFile * pFile = NULL; SFile * pFile = NULL;
int numOfPointsToWrite = 0; int numOfPointsToWrite = 0;
int64_t offset = 0; int64_t offset = 0;
......
...@@ -89,7 +89,7 @@ STable *tsdbDecodeTable(void *cont, int contLen) { ...@@ -89,7 +89,7 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
return pTable; return pTable;
} }
void *tsdbFreeEncode(void *cont) { void tsdbFreeEncode(void *cont) {
if (cont != NULL) free(cont); if (cont != NULL) free(cont);
} }
...@@ -124,7 +124,7 @@ void tsdbOrgMeta(void *pHandle) { ...@@ -124,7 +124,7 @@ void tsdbOrgMeta(void *pHandle) {
* Initialize the meta handle * Initialize the meta handle
* ASSUMPTIONS: VALID PARAMETER * ASSUMPTIONS: VALID PARAMETER
*/ */
STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables) { STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) {
STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta)); STsdbMeta *pMeta = (STsdbMeta *)malloc(sizeof(STsdbMeta));
if (pMeta == NULL) return NULL; if (pMeta == NULL) return NULL;
......
...@@ -174,7 +174,7 @@ int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co ...@@ -174,7 +174,7 @@ int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co
void tsdbCloseMetaFile(SMetaFile *mfh) { void tsdbCloseMetaFile(SMetaFile *mfh) {
if (mfh == NULL) return; if (mfh == NULL) return;
close(mfh); close(mfh->fd);
taosHashCleanup(mfh->map); taosHashCleanup(mfh->map);
} }
...@@ -210,7 +210,7 @@ static int tsdbCreateMetaFile(char *fname) { ...@@ -210,7 +210,7 @@ static int tsdbCreateMetaFile(char *fname) {
if (tsdbWriteMetaHeader(fd) < 0) { if (tsdbWriteMetaHeader(fd) < 0) {
close(fd); close(fd);
return NULL; return -1;
} }
return fd; return fd;
...@@ -242,7 +242,7 @@ static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh) { ...@@ -242,7 +242,7 @@ static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh) {
mfh->fd = fd; mfh->fd = fd;
void *buf = NULL; void *buf = NULL;
int buf_size = 0; // int buf_size = 0;
SRecordInfo info; SRecordInfo info;
while (1) { while (1) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册