提交 84bd2fb7 编写于 作者: H Haojun Liao

Merge branch 'enh/tsdb_optimize' of github.com:taosdata/tdengine into enh/tsdb_optimize

...@@ -16,9 +16,9 @@ ...@@ -16,9 +16,9 @@
#ifndef _TD_VNODE_TSDB_H_ #ifndef _TD_VNODE_TSDB_H_
#define _TD_VNODE_TSDB_H_ #define _TD_VNODE_TSDB_H_
//#include "../tsdb/tsdbFile2.h" // #include "../tsdb/tsdbFile2.h"
//#include "../tsdb/tsdbMerge.h" // #include "../tsdb/tsdbMerge.h"
//#include "../tsdb/tsdbSttFileRW.h" // #include "../tsdb/tsdbSttFileRW.h"
#include "tsimplehash.h" #include "tsimplehash.h"
#include "vnodeInt.h" #include "vnodeInt.h"
...@@ -703,20 +703,20 @@ typedef struct { ...@@ -703,20 +703,20 @@ typedef struct {
typedef struct SSttBlockLoadInfo { typedef struct SSttBlockLoadInfo {
SBlockData blockData[2]; SBlockData blockData[2];
void* pBlockArray; void *pBlockArray;
SArray *aSttBlk; SArray *aSttBlk;
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
int32_t currentLoadBlockIndex; int32_t currentLoadBlockIndex;
int32_t loadBlocks; int32_t loadBlocks;
double elapsedTime; double elapsedTime;
STSchema *pSchema; STSchema *pSchema;
int16_t *colIds; int16_t *colIds;
int32_t numOfCols; int32_t numOfCols;
bool checkRemainingRow; bool checkRemainingRow;
bool isLast; bool isLast;
bool sttBlockLoaded; bool sttBlockLoaded;
int32_t numOfStt; int32_t numOfStt;
// keep the last access position, this position may be used to reduce the binary times for // keep the last access position, this position may be used to reduce the binary times for
// starting last block data for a new table // starting last block data for a new table
...@@ -775,19 +775,19 @@ struct SDiskDataBuilder { ...@@ -775,19 +775,19 @@ struct SDiskDataBuilder {
}; };
typedef struct SLDataIter { typedef struct SLDataIter {
SRBTreeNode node; SRBTreeNode node;
SSttBlk *pSttBlk; SSttBlk *pSttBlk;
int32_t iStt; int32_t iStt;
int8_t backward; int8_t backward;
int32_t iSttBlk; int32_t iSttBlk;
int32_t iRow; int32_t iRow;
SRowInfo rInfo; SRowInfo rInfo;
uint64_t uid; uint64_t uid;
STimeWindow timeWindow; STimeWindow timeWindow;
SVersionRange verRange; SVersionRange verRange;
SSttBlockLoadInfo *pBlockLoadInfo; SSttBlockLoadInfo *pBlockLoadInfo;
bool ignoreEarlierTs; bool ignoreEarlierTs;
struct SSttFileReader* pReader; struct SSttFileReader *pReader;
} SLDataIter; } SLDataIter;
#define tMergeTreeGetRow(_t) (&((_t)->pIter->rInfo.row)) #define tMergeTreeGetRow(_t) (&((_t)->pIter->rInfo.row))
...@@ -795,21 +795,21 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead ...@@ -795,21 +795,21 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter); bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter);
int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb* pTsdb, uint64_t suid, uint64_t uid, int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo, STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter, void* pCurrentFileSet); bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter,
void *pCurrentFileSet);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree); bool tMergeTreeNext(SMergeTree *pMTree);
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfStt); SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfStt);
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el); void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el);
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void destroySttBlockReader(SLDataIter* pLDataIter, int32_t numOfIter); void destroySttBlockReader(SLDataIter *pLDataIter, int32_t numOfIter);
// tsdbCache ============================================================================================== // tsdbCache ==============================================================================================
typedef struct SCacheRowsReader { typedef struct SCacheRowsReader {
......
...@@ -142,10 +142,10 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) { ...@@ -142,10 +142,10 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) {
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) { for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) {
if (record->ekey < committer->ctx->minKey) { if (record->ekey < committer->ctx->minKey) {
continue; goto _next;
} else if (record->skey > committer->ctx->maxKey) { } else if (record->skey > committer->ctx->maxKey) {
committer->ctx->maxKey = TMIN(record->skey, committer->ctx->maxKey); committer->ctx->maxKey = TMIN(record->skey, committer->ctx->maxKey);
continue; goto _next;
} }
if (record->ekey > committer->ctx->maxKey) { if (record->ekey > committer->ctx->maxKey) {
...@@ -158,6 +158,7 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) { ...@@ -158,6 +158,7 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) {
code = tsdbFSetWriteTombRecord(committer->writer, record); code = tsdbFSetWriteTombRecord(committer->writer, record);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_next:
code = tsdbIterMergerNext(committer->tombIterMerger); code = tsdbIterMergerNext(committer->tombIterMerger);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
......
...@@ -269,7 +269,7 @@ int32_t tDFileSetCmprFn(const void *p1, const void *p2) { ...@@ -269,7 +269,7 @@ int32_t tDFileSetCmprFn(const void *p1, const void *p2) {
return 0; return 0;
} }
static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) { void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
SVnode *pVnode = pTsdb->pVnode; SVnode *pVnode = pTsdb->pVnode;
if (pVnode->pTfs) { if (pVnode->pTfs) {
if (current) { if (current) {
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "tsdbFS2.h" #include "tsdbFS2.h"
#include "tsdbUpgrade.h"
extern int vnodeScheduleTask(int (*execute)(void *), void *arg); extern int vnodeScheduleTask(int (*execute)(void *), void *arg);
extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg); extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg);
...@@ -28,12 +29,6 @@ enum { ...@@ -28,12 +29,6 @@ enum {
TSDB_FS_STATE_CLOSE, TSDB_FS_STATE_CLOSE,
}; };
typedef enum {
TSDB_FCURRENT = 1,
TSDB_FCURRENT_C, // for commit
TSDB_FCURRENT_M, // for merge
} EFCurrentT;
static const char *gCurrentFname[] = { static const char *gCurrentFname[] = {
[TSDB_FCURRENT] = "current.json", [TSDB_FCURRENT] = "current.json",
[TSDB_FCURRENT_C] = "current.c.json", [TSDB_FCURRENT_C] = "current.c.json",
...@@ -73,7 +68,7 @@ static int32_t destroy_fs(STFileSystem **fs) { ...@@ -73,7 +68,7 @@ static int32_t destroy_fs(STFileSystem **fs) {
return 0; return 0;
} }
static int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) { int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) {
if (pTsdb->pVnode->pTfs) { if (pTsdb->pVnode->pTfs) {
snprintf(fname, // snprintf(fname, //
TSDB_FILENAME_LEN, // TSDB_FILENAME_LEN, //
...@@ -161,7 +156,7 @@ _exit: ...@@ -161,7 +156,7 @@ _exit:
return code; return code;
} }
static int32_t save_fs(const TFileSetArray *arr, const char *fname) { int32_t save_fs(const TFileSetArray *arr, const char *fname) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
...@@ -375,11 +370,6 @@ static int32_t tsdbFSScanAndFix(STFileSystem *fs) { ...@@ -375,11 +370,6 @@ static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
return 0; return 0;
} }
static int32_t update_fs_if_needed(STFileSystem *pFS) {
// TODO
return 0;
}
static int32_t tsdbFSDupState(STFileSystem *fs) { static int32_t tsdbFSDupState(STFileSystem *fs) {
int32_t code; int32_t code;
...@@ -405,9 +395,6 @@ static int32_t open_fs(STFileSystem *fs, int8_t rollback) { ...@@ -405,9 +395,6 @@ static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
int32_t lino = 0; int32_t lino = 0;
STsdb *pTsdb = fs->tsdb; STsdb *pTsdb = fs->tsdb;
code = update_fs_if_needed(fs);
TSDB_CHECK_CODE(code, lino, _exit);
char fCurrent[TSDB_FILENAME_LEN]; char fCurrent[TSDB_FILENAME_LEN];
char cCurrent[TSDB_FILENAME_LEN]; char cCurrent[TSDB_FILENAME_LEN];
char mCurrent[TSDB_FILENAME_LEN]; char mCurrent[TSDB_FILENAME_LEN];
...@@ -483,10 +470,13 @@ static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSe ...@@ -483,10 +470,13 @@ static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSe
} }
static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) { static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
TFileSetArray *fsetArray = fs->fSetArrTmp;
code = tsdbFSDupState(fs);
if (code) return code;
TFileSetArray *fsetArray = fs->fSetArrTmp;
STFileSet *fset = NULL; STFileSet *fset = NULL;
const STFileOp *op; const STFileOp *op;
TARRAY2_FOREACH_PTR(opArray, op) { TARRAY2_FOREACH_PTR(opArray, op) {
...@@ -527,6 +517,9 @@ int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback) { ...@@ -527,6 +517,9 @@ int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback) {
int32_t code; int32_t code;
int32_t lino; int32_t lino;
code = tsdbCheckAndUpgradeFileSystem(pTsdb, rollback);
TSDB_CHECK_CODE(code, lino, _exit);
code = create_fs(pTsdb, fs); code = create_fs(pTsdb, fs);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
......
...@@ -38,6 +38,12 @@ typedef enum { ...@@ -38,6 +38,12 @@ typedef enum {
TSDB_BG_TASK_COMPACT, TSDB_BG_TASK_COMPACT,
} EFSBgTaskT; } EFSBgTaskT;
typedef enum {
TSDB_FCURRENT = 1,
TSDB_FCURRENT_C, // for commit
TSDB_FCURRENT_M, // for merge
} EFCurrentT;
/* Exposed APIs */ /* Exposed APIs */
// open/close // open/close
int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback); int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbUpgrade.h"
// old
extern void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t);
// new
extern int32_t save_fs(const TFileSetArray *arr, const char *fname);
extern int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype);
static int32_t tsdbUpgradeFileSet(STsdb *tsdb, SDFileSet *pDFileSet, TFileSetArray *fileSetArray) {
int32_t code = 0;
int32_t lino = 0;
SDataFReader *reader;
code = tsdbDataFReaderOpen(&reader, tsdb, pDFileSet);
TSDB_CHECK_CODE(code, lino, _exit);
// .head
{
SArray *aBlockIdx = NULL;
SMapData mDataBlk[1] = {0};
SBrinBlock brinBlock[1] = {0};
TBrinBlkArray brinBlkArray[1] = {0};
if ((aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbReadBlockIdx(reader, aBlockIdx);
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t i = 0; i < taosArrayGetSize(aBlockIdx); ++i) {
SBlockIdx *pBlockIdx = taosArrayGet(aBlockIdx, i);
code = tsdbReadDataBlk(reader, pBlockIdx, mDataBlk);
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t j = 0; j < mDataBlk->nItem; ++j) {
SDataBlk dataBlk[1];
tMapDataGetItemByIdx(mDataBlk, j, dataBlk, tGetDataBlk);
SBrinRecord record = {
.suid = pBlockIdx->suid,
.uid = pBlockIdx->uid,
.firstKey = dataBlk->minKey.ts,
.firstKeyVer = dataBlk->minKey.version,
.lastKey = dataBlk->maxKey.ts,
.lastKeyVer = dataBlk->maxKey.version,
.minVer = dataBlk->minVer,
.maxVer = dataBlk->maxVer,
.blockOffset = dataBlk->aSubBlock->offset,
.smaOffset = dataBlk->smaInfo.offset,
.blockSize = dataBlk->aSubBlock->szBlock,
.blockKeySize = dataBlk->aSubBlock->szKey,
.smaSize = dataBlk->smaInfo.size,
.numRow = dataBlk->nRow,
.count = dataBlk->nRow,
};
if (dataBlk->hasDup) {
ASSERT(0);
// TODO: need to get count
// record.count = 0;
}
code = tBrinBlockPut(brinBlock, &record);
TSDB_CHECK_CODE(code, lino, _exit);
if (BRIN_BLOCK_SIZE(brinBlock) >= tsdb->pVnode->config.tsdbCfg.maxRows) {
// TODO
tBrinBlockClear(brinBlock);
}
}
}
if (BRIN_BLOCK_SIZE(brinBlock) > 0) {
// TODO
ASSERT(0);
}
// TODO
ASSERT(0);
TARRAY2_DESTROY(brinBlkArray, NULL);
tBrinBlockDestroy(brinBlock);
taosArrayDestroy(aBlockIdx);
tMapDataClear(mDataBlk);
}
// .data
// .sma
// .stt
for (int32_t i = 0; i < pDFileSet->nSttF; ++i) {
// TODO
}
tsdbDataFReaderClose(&reader);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbUpgradeTombFile(STsdb *tsdb, SDelFile *pDelFile, TFileSetArray *fileSetArray) {
int32_t code = 0;
int32_t lino = 0;
// TODO
ASSERT(0);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
int32_t code = 0;
int32_t lino = 0;
TFileSetArray fileSetArray[1] = {0};
// load old file system and convert
code = tsdbFSOpen(tsdb, rollback);
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t i = 0; i < taosArrayGetSize(tsdb->fs.aDFileSet); i++) {
SDFileSet *pDFileSet = taosArrayGet(tsdb->fs.aDFileSet, i);
code = tsdbUpgradeFileSet(tsdb, pDFileSet, fileSetArray);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (tsdb->fs.pDelFile != NULL) {
code = tsdbUpgradeTombFile(tsdb, tsdb->fs.pDelFile, fileSetArray);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbFSClose(tsdb);
TSDB_CHECK_CODE(code, lino, _exit);
// save new file system
char fname[TSDB_FILENAME_LEN];
current_fname(tsdb, fname, TSDB_FCURRENT);
code = save_fs(fileSetArray, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
}
return code;
}
int32_t tsdbCheckAndUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
char fname[TSDB_FILENAME_LEN];
tsdbGetCurrentFName(tsdb, fname, NULL);
if (!taosCheckExistFile(fname)) return 0;
int32_t code = tsdbDoUpgradeFileSystem(tsdb, rollback);
if (code) return code;
taosRemoveFile(fname);
return 0;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
#include "tsdbDef.h"
#include "tsdbFS2.h"
#include "tsdbUtil2.h"
#ifndef _TSDB_UPGRADE_H_
#define _TSDB_UPGRADE_H_
#ifdef __cplusplus
extern "C" {
#endif
int32_t tsdbCheckAndUpgradeFileSystem(STsdb *tsdb, int8_t rollback);
#ifdef __cplusplus
}
#endif
#endif /*_TSDB_UPGRADE_H_*/
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册