提交 58ca1acd 编写于 作者: H Hongze Cheng

refactor more code

上级 fea49884
...@@ -525,11 +525,13 @@ static FORCE_INLINE int tsdbAllocBuf(void **ppBuf, uint32_t size) { ...@@ -525,11 +525,13 @@ static FORCE_INLINE int tsdbAllocBuf(void **ppBuf, uint32_t size) {
*ppBuf = taosTRealloc(pBuf, tsize); *ppBuf = taosTRealloc(pBuf, tsize);
if (*ppBuf == NULL) return -1; if (*ppBuf == NULL) return -1;
return 0;
} }
int tsdbEncodeBlockIdx(void** buf, SBlockIdx* pBlockIdx); int tsdbEncodeBlockIdx(void** buf, SBlockIdx* pBlockIdx);
void* tsdbDecodeBlockIdx(void* buf, SBlockIdx* pBlockIdx); void* tsdbDecodeBlockIdx(void* buf, SBlockIdx* pBlockIdx);
int tsdbLoadKeyCol(SReadHandle* pReadH, SBlockInfo* pBlockInfo, SBlock* pBlock); int tsdbLoadKeyCol(SReadHandle* pReadH, SBlockInfo* pBlockInfo, SBlock* pBlock);
int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <errno.h>
#include <fcntl.h> #include <fcntl.h>
#include <limits.h> #include <limits.h>
#include <sys/stat.h> #include <sys/stat.h>
...@@ -44,6 +45,7 @@ typedef struct { ...@@ -44,6 +45,7 @@ typedef struct {
SBlock * pSubBlock; SBlock * pSubBlock;
int nSubBlocks; int nSubBlocks;
SDataCols * pDataCols; SDataCols * pDataCols;
int miter;
} STSCommitHandle; } STSCommitHandle;
typedef struct { typedef struct {
...@@ -63,6 +65,52 @@ typedef struct { ...@@ -63,6 +65,52 @@ typedef struct {
SFileGroup nfgroup; SFileGroup nfgroup;
} SDataFileChange; } SDataFileChange;
static int tsdbStartCommit(SCommitHandle *pCommitH);
static void tsdbEndCommit(SCommitHandle *pCommitH, bool hasError);
static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH);
static int tsdbCommitMetaData(SCommitHandle *pCommitH);
static SCommitIter * tsdbCreateCommitIters(STsdbRepo *pRepo);
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
static int tsdbCommitToFileGroup(STSCommitHandle *pTSCh, SFileGroup *pOldGroup, SFileGroup *pNewGroup);
static int tsdbHasDataToCommit(STSCommitHandle *pTSCh, TSKEY minKey, TSKEY maxKey);
static STSCommitHandle *tsdbNewTSCommitHandle(STsdbRepo *pRepo);
static void tsdbFreeTSCommitHandle(STSCommitHandle *pTSCh);
static int tsdbLogFileChange(SCommitHandle *pCommitH, STsdbFileChange *pChange);
static int tsdbEncodeFileChange(void **buf, STsdbFileChange *pChange);
static void * tsdbDecodeFileChange(void *buf, STsdbFileChange *pChange);
static int tsdbLogTSFileChange(SCommitHandle *pCommitH, int fid);
static int tsdbLogMetaFileChange(SCommitHandle *pCommitH);
static int tsdbLogRetentionChange(SCommitHandle *pCommitH, int mfid);
static int tsdbApplyFileChange(STsdbFileChange *pChange, bool isCommitEnd);
static void tsdbSeekTSCommitHandle(STSCommitHandle *pTSCh, TSKEY key);
static int tsdbEncodeSFileGroup(void **buf, SFileGroup *pFGroup);
static void * tsdbDecodeSFileGroup(void *buf, SFileGroup *pFGroup);
static void tsdbGetNextCommitFileGroup(STsdbRepo *pRepo, int vid, SFileGroup *pOldGroup, SFileGroup *pNewGroup);
static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid);
static int tsdbWriteBlockToRightFile(STSCommitHandle *pTSCh, SDataCols *pDataCols, SBlock *pBlock);
static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, SFileGroup *pOldGroup, SFileGroup *pNewGroup);
static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError);
static int tsdbWriteBlockInfo(STSCommitHandle *pTSCh);
static int tsdbWriteBlockIdx(STSCommitHandle *pTSCh);
static int tsdbSetCommitTable(STSCommitHandle *pTSCh, STable *pTable);
static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid);
static int tsdbCopyBlocks(STSCommitHandle *pTSCh, int sidx, int eidx);
static int tsdbAppendCommit(STSCommitHandle *pTSCh);
static int tsdbMergeCommit(STSCommitHandle *pTSCh, SBlock *pBlock);
static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, int ftype, SDataCols *pDataCols, SBlock *pBlock,
bool isSuperBlock);
static int tsdbEncodeBlockIdxArray(STSCommitHandle *pTSCh);
static int tsdbUpdateFileGroupInfo(SFileGroup *pFileGroup);
static int tsdbAppendBlockIdx(STSCommitHandle *pTSCh);
static int tsdbCopyBlock(STSCommitHandle *pTSCh, SBlock *pBlock);
static int compareKeyBlock(const void *arg1, const void *arg2);
static int tsdbAddSuperBlock(STSCommitHandle *pTSCh, SBlock *pBlock);
static int tsdbAddSubBlocks(STSCommitHandle *pTSCh, SBlock *pBlocks, int nBlocks);
static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock);
static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock);
static void tsdbLoadMergeFromCache(STSCommitHandle *pTSCh, TSKEY maxKey);
static int tsdbInsertSubBlock(STSCommitHandle *pTSCh, SBlock *pBlock);
int tsdbCommitData(STsdbRepo *pRepo) { int tsdbCommitData(STsdbRepo *pRepo) {
ASSERT(pRepo->commit == 1 && pRepo->imem != NULL); ASSERT(pRepo->commit == 1 && pRepo->imem != NULL);
...@@ -103,7 +151,7 @@ static int tsdbStartCommit(SCommitHandle *pCommitH) { ...@@ -103,7 +151,7 @@ static int tsdbStartCommit(SCommitHandle *pCommitH) {
pCommitH->fd = -1; pCommitH->fd = -1;
tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_MANIFEST, pCfg->tsdbId, 0, 0, pCommitH->fname); tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_MANIFEST, pCfg->tsdbId, 0, 0, &(pCommitH->fname));
pCommitH->fd = open(pCommitH->fname, O_CREAT | O_WRONLY | O_APPEND, 0755); pCommitH->fd = open(pCommitH->fname, O_CREAT | O_WRONLY | O_APPEND, 0755);
if (pCommitH->fd < 0) { if (pCommitH->fd < 0) {
tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), pCommitH->fname, strerror(errno)); tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), pCommitH->fname, strerror(errno));
...@@ -450,7 +498,7 @@ static int tsdbEncodeFileChange(void **buf, STsdbFileChange *pChange) { ...@@ -450,7 +498,7 @@ static int tsdbEncodeFileChange(void **buf, STsdbFileChange *pChange) {
tsize += taosEncodeString(buf, pMetaChange->oname); tsize += taosEncodeString(buf, pMetaChange->oname);
tsize += taosEncodeString(buf, pMetaChange->nname); tsize += taosEncodeString(buf, pMetaChange->nname);
tsize += tdEncodeStoreInfo(buf, pMetaChange->info); tsize += tdEncodeStoreInfo(buf, &(pMetaChange->info));
} else if (pChange->type == TSDB_DATA_FILE_CHANGE) { } else if (pChange->type == TSDB_DATA_FILE_CHANGE) {
SDataFileChange *pDataChange = (SDataFileChange *)pChange->change; SDataFileChange *pDataChange = (SDataFileChange *)pChange->change;
...@@ -463,7 +511,7 @@ static int tsdbEncodeFileChange(void **buf, STsdbFileChange *pChange) { ...@@ -463,7 +511,7 @@ static int tsdbEncodeFileChange(void **buf, STsdbFileChange *pChange) {
return tsize; return tsize;
} }
static void *tsdbDecodeFileChange(void *buf, STsdbFileChange *pChange) { static UNUSED_FUNC void *tsdbDecodeFileChange(void *buf, STsdbFileChange *pChange) {
// TODO // TODO
return buf; return buf;
} }
...@@ -534,7 +582,7 @@ static int tsdbLogRetentionChange(SCommitHandle *pCommitH, int mfid) { ...@@ -534,7 +582,7 @@ static int tsdbLogRetentionChange(SCommitHandle *pCommitH, int mfid) {
STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbFileH *pFileH = pRepo->tsdbFileH;
for (int i = 0; i < pFileH->nFGroups; i++) { for (int i = 0; i < pFileH->nFGroups; i++) {
SFileGroup *pFGroup = pFileH->pFGroup[i]; SFileGroup *pFGroup = pFileH->pFGroup + i;
if (pFGroup->fileId < mfid) { if (pFGroup->fileId < mfid) {
SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(STsdbFileChange) + sizeof(SDataFileChange)); SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(STsdbFileChange) + sizeof(SDataFileChange));
if (pNode == NULL) { if (pNode == NULL) {
...@@ -546,7 +594,7 @@ static int tsdbLogRetentionChange(SCommitHandle *pCommitH, int mfid) { ...@@ -546,7 +594,7 @@ static int tsdbLogRetentionChange(SCommitHandle *pCommitH, int mfid) {
pChange->type = TSDB_DATA_FILE_CHANGE; pChange->type = TSDB_DATA_FILE_CHANGE;
SDataFileChange *pDataFileChange = (SDataFileChange *)pChange->change; SDataFileChange *pDataFileChange = (SDataFileChange *)pChange->change;
pDataFileChange->ofgroup = pFGroup; pDataFileChange->ofgroup = *pFGroup;
if (tsdbLogFileChange(pCommitH, pChange) < 0) { if (tsdbLogFileChange(pCommitH, pChange) < 0) {
free(pNode); free(pNode);
...@@ -566,7 +614,7 @@ static int tsdbApplyFileChange(STsdbFileChange *pChange, bool isCommitEnd) { ...@@ -566,7 +614,7 @@ static int tsdbApplyFileChange(STsdbFileChange *pChange, bool isCommitEnd) {
SMetaFileChange *pMetaChange = (SMetaFileChange *)pChange->change; SMetaFileChange *pMetaChange = (SMetaFileChange *)pChange->change;
if (isCommitEnd) { if (isCommitEnd) {
if (strncmp(pMetaChange->oname, pMetaChange->nname) != 0) { if (strncmp(pMetaChange->oname, pMetaChange->nname, TSDB_FILENAME_LEN) != 0) {
(void)remove(pMetaChange->oname); (void)remove(pMetaChange->oname);
} }
} else { // roll back } else { // roll back
...@@ -618,7 +666,7 @@ static void *tsdbDecodeSFileGroup(void *buf, SFileGroup *pFGroup) { ...@@ -618,7 +666,7 @@ static void *tsdbDecodeSFileGroup(void *buf, SFileGroup *pFGroup) {
return buf; return buf;
} }
static void tsdbGetNextCommitFileGroup(SFileGroup *pOldGroup, SFileGroup *pNewGroup) { static void tsdbGetNextCommitFileGroup(STsdbRepo *pRepo, int vid, SFileGroup *pOldGroup, SFileGroup *pNewGroup) {
pNewGroup->fileId = pOldGroup->fileId; pNewGroup->fileId = pOldGroup->fileId;
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
...@@ -1243,7 +1291,7 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ...@@ -1243,7 +1291,7 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
if (pBlock->numOfRows + pDataCols->numOfRows < pCfg->minRowsPerFileBlock && if (pBlock->numOfRows + pDataCols->numOfRows < pCfg->minRowsPerFileBlock &&
pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && true /*TODO: check if same file*/) { pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && true /*TODO: check if same file*/) {
if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_LAST, pDataCols, &newBlock, false) < 0) return -1; if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_LAST, pDataCols, &newBlock, false) < 0) return -1;
// TODO: refactor code here if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1;
if (tsdbInsertSubBlock(pTSCh, &newBlock) < 0) return -1; if (tsdbInsertSubBlock(pTSCh, &newBlock) < 0) return -1;
} else { } else {
if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1;
...@@ -1270,13 +1318,13 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ...@@ -1270,13 +1318,13 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
ASSERT(pDataCols->numOfRows == rows); ASSERT(pDataCols->numOfRows == rows);
if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_LAST, pDataCols, &newBlock, false) < 0) return -1; if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_LAST, pDataCols, &newBlock, false) < 0) return -1;
if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1; if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1;
if (tsdbInsertSubBlock() < 0) return -1; // TODO if (tsdbInsertSubBlock(pTSCh, &newBlock) < 0) return -1;
} else { } else {
if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1;
while (true) { while (true) {
tdResetDataCols(pDataCols); pTSCh->miter = 0;
rows = tsdbLoadMergeFromCache(pTSCh, pTSCh->maxKey); tsdbLoadMergeFromCache(pTSCh, pTSCh->maxKey);
if (rows == 0) break; if (pDataCols->numOfRows == 0) break;
if (tsdbWriteBlockToRightFile(pTSCh, pDataCols, &newBlock) < 0) return -1; if (tsdbWriteBlockToRightFile(pTSCh, pDataCols, &newBlock) < 0) return -1;
if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1;
} }
...@@ -1342,13 +1390,13 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ...@@ -1342,13 +1390,13 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
ASSERT(pDataCols->numOfRows == rows); ASSERT(pDataCols->numOfRows == rows);
if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_DATA, pDataCols, &newBlock, false) < 0) return -1; if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_DATA, pDataCols, &newBlock, false) < 0) return -1;
if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1; if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1;
if (tsdbInsertSubBlock() < 0) return -1; // TODO if (tsdbInsertSubBlock(pTSCh, &newBlock) < 0) return -1;
} else { } else {
if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1;
while (true) { while (true) {
tdResetDataCols(pDataCols); pTSCh->miter = 0;
rows = tsdbLoadMergeFromCache(pTSCh, keyLimit); tsdbLoadMergeFromCache(pTSCh, keyLimit);
if (rows == 0) break; if (pDataCols->numOfRows == 0) break;
if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_DATA, pDataCols, &newBlock, true) < 0) return -1; if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_DATA, pDataCols, &newBlock, true) < 0) return -1;
if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1;
} }
...@@ -1357,7 +1405,93 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ...@@ -1357,7 +1405,93 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
return 0; return 0;
} }
static int tsdbLoadMergeFromCache(STSCommitHandle *pTSCh, TSKEY maxKey) { static void tsdbLoadMergeFromCache(STSCommitHandle *pTSCh, TSKEY maxKey) {
// TODO SReadHandle *pReadH = pTSCh->pReadH;
STsdbRepo * pRepo = pReadH->pRepo;
SDataCols * pMCols = pReadH->pDataCols[0];
SDataCols * pDataCols = pTSCh->pDataCols;
int dbrows = TSDB_DEFAULT_ROWS_TO_COMMIT(pRepo->config.maxRowsPerFileBlock);
SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pReadH->pTable);
TSKEY key1 = 0;
TSKEY key2 = 0;
SDataRow row = NULL;
TSKEY keyNext = 0;
STSchema * pSchema = NULL;
tdResetDataCols(pDataCols);
if (pTSCh->miter >= pMCols->numOfRows) {
key1 = INT64_MAX;
} else {
key1 = dataColsKeyAt(pMCols, pTSCh->miter);
}
keyNext = tsdbNextIterKey(pIter->pIter);
if (TSDB_KEY_BEYOND_RANGE(keyNext, maxKey)) {
key2 = INT64_MAX;
} else {
row = tsdbNextIterRow(pIter->pIter);
key2 = keyNext;
}
while (true) {
if ((key1 == INT64_MAX && key2 == INT64_MAX) || pDataCols->numOfRows >= dbrows) break;
if (key1 <= key2) {
for (int i = 0; i < pMCols->numOfCols; i++) {
dataColAppendVal(pDataCols->cols + i, tdGetColDataOfRow(pMCols->cols + i, pTSCh->miter), pDataCols->numOfRows,
pDataCols->maxPoints);
}
pDataCols->numOfRows++;
pTSCh->miter++;
if (key1 == key2) {
tSkipListIterNext(pIter->pIter);
keyNext = tsdbNextIterKey(pIter->pIter);
if (TSDB_KEY_BEYOND_RANGE(keyNext, maxKey)) {
key2 = INT64_MAX;
} else {
row = tsdbNextIterRow(pIter->pIter);
key2 = keyNext;
}
}
} else {
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pIter->pTable, false, false, dataRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendDataRowToDataCol(row, pSchema, pDataCols);
tSkipListIterNext(pIter->pIter);
// update row and key2
keyNext = tsdbNextIterKey(pIter->pIter);
if (TSDB_KEY_BEYOND_RANGE(keyNext, maxKey)) {
key2 = INT64_MAX;
} else {
row = tsdbNextIterRow(pIter->pIter);
key2 = keyNext;
}
}
}
}
static int tsdbInsertSubBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks == 0 && pTSCh->nBlocks > 0);
SBlock *pSuperBlock = pTSCh->pBlockInfo->blocks + pTSCh->nBlocks - 1;
ASSERT(pSuperBlock->numOfSubBlocks > 0 && pSuperBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS);
if (pSuperBlock->numOfSubBlocks == 1) {
SBlock oBlock = *pSuperBlock;
oBlock.numOfSubBlocks = 0;
pSuperBlock->offset = sizeof(SBlock) * pTSCh->nSubBlocks;
if (tsdbAddSubBlocks(pTSCh, &oBlock, 1) < 0) return -1;
}
pSuperBlock->numOfSubBlocks++;
pSuperBlock->numOfRows += pBlock->numOfRows;
pSuperBlock->keyFirst = MIN(pSuperBlock->keyFirst, pBlock->keyFirst);
pSuperBlock->keyLast = MAX(pSuperBlock->keyLast, pBlock->keyLast);
if (tsdbAddSubBlocks(pTSCh, pBlock, 1) < 0) return -1;
return 0; return 0;
} }
\ No newline at end of file
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include <errno.h>
#include <regex.h> #include <regex.h>
#define TAOS_RANDOM_FILE_FAIL_TEST #define TAOS_RANDOM_FILE_FAIL_TEST
...@@ -29,7 +30,6 @@ static int compFGroup(const void *arg1, const void *arg2); ...@@ -29,7 +30,6 @@ static int compFGroup(const void *arg1, const void *arg2);
static int keyFGroupCompFunc(const void *key, const void *fgroup); static int keyFGroupCompFunc(const void *key, const void *fgroup);
static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo); static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo);
static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep); static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep);
static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days);
// ---------------- INTERNAL FUNCTIONS ---------------- // ---------------- INTERNAL FUNCTIONS ----------------
STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) { STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
...@@ -518,6 +518,9 @@ _err: ...@@ -518,6 +518,9 @@ _err:
*size = 0; *size = 0;
} }
int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) {
return (int)(TSDB_KEY_FILEID(tsdbGetCurrMinKey(precision, keep), days, precision));
}
// ---------------- LOCAL FUNCTIONS ---------------- // ---------------- LOCAL FUNCTIONS ----------------
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
uint32_t version; uint32_t version;
...@@ -590,7 +593,3 @@ static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo) { ...@@ -590,7 +593,3 @@ static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo) {
static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep) { static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep) {
return (TSKEY)(taosGetTimestamp(precision) - keep * tsMsPerDay[precision]); return (TSKEY)(taosGetTimestamp(precision) - keep * tsMsPerDay[precision]);
} }
static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) {
return (int)(TSDB_KEY_FILEID(tsdbGetCurrMinKey(precision, keep), days, precision));
}
\ No newline at end of file
...@@ -24,14 +24,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable); ...@@ -24,14 +24,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void tsdbFreeTableData(STableData *pTableData); static void tsdbFreeTableData(STableData *pTableData);
static char * tsdbGetTsTupleKey(const void *data); static char * tsdbGetTsTupleKey(const void *data);
static void * tsdbCommitData(void *arg);
static int tsdbCommitMeta(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo);
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
// ---------------- INTERNAL FUNCTIONS ---------------- // ---------------- INTERNAL FUNCTIONS ----------------
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
......
...@@ -31,7 +31,8 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH); ...@@ -31,7 +31,8 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH);
static int tsdbVerifyBlockInfo(SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx); static int tsdbVerifyBlockInfo(SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx);
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows, static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows,
int maxPoints, char *buffer, int bsize); int maxPoints, char *buffer, int bsize);
static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol); static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SBlockCol *pBlockCol,
SDataCol *pDataCol);
SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) { SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) {
SReadHandle *pReadH = (SReadHandle *)calloc(1, sizeof(*pReadH)); SReadHandle *pReadH = (SReadHandle *)calloc(1, sizeof(*pReadH));
...@@ -75,7 +76,6 @@ int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { ...@@ -75,7 +76,6 @@ int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) {
ASSERT(pReadH != NULL && pFGroup != NULL); ASSERT(pReadH != NULL && pFGroup != NULL);
STsdbRepo *pRepo = pReadH->pRepo; STsdbRepo *pRepo = pReadH->pRepo;
STsdbCfg * pCfg = &(pRepo->config);
pReadH->fGroup = *pFGroup; pReadH->fGroup = *pFGroup;
......
...@@ -17,6 +17,11 @@ ...@@ -17,6 +17,11 @@
#define TAOS_RANDOM_FILE_FAIL_TEST #define TAOS_RANDOM_FILE_FAIL_TEST
#include <errno.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include "os.h" #include "os.h"
#include "hash.h" #include "hash.h"
#include "taoserror.h" #include "taoserror.h"
...@@ -40,7 +45,6 @@ typedef struct { ...@@ -40,7 +45,6 @@ typedef struct {
static int tdInitKVStoreHeader(int fd, char *fname); static int tdInitKVStoreHeader(int fd, char *fname);
static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH); static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH);
static char * tdGetKVStoreSnapshotFname(char *fdata);
static char * tdGetKVStoreNewFname(char *fdata); static char * tdGetKVStoreNewFname(char *fdata);
static void tdFreeKVStore(SKVStore *pStore); static void tdFreeKVStore(SKVStore *pStore);
static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo);
...@@ -420,17 +424,6 @@ static void tdFreeKVStore(SKVStore *pStore) { ...@@ -420,17 +424,6 @@ static void tdFreeKVStore(SKVStore *pStore) {
} }
} }
static char *tdGetKVStoreSnapshotFname(char *fdata) {
size_t size = strlen(fdata) + strlen(TD_KVSTORE_SNAP_SUFFIX) + 1;
char * fname = malloc(size);
if (fname == NULL) {
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
return NULL;
}
sprintf(fname, "%s%s", fdata, TD_KVSTORE_SNAP_SUFFIX);
return fname;
}
static char *tdGetKVStoreNewFname(char *fdata) { static char *tdGetKVStoreNewFname(char *fdata) {
size_t size = strlen(fdata) + strlen(TD_KVSTORE_NEW_SUFFIX) + 1; size_t size = strlen(fdata) + strlen(TD_KVSTORE_NEW_SUFFIX) + 1;
char * fname = malloc(size); char * fname = malloc(size);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册