未验证 提交 3464140e 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1806 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -115,6 +115,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QUERY_ID, 0, 255, "invalid query i ...@@ -115,6 +115,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QUERY_ID, 0, 255, "invalid query i
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_STREAM_ID, 0, 256, "invalid stream id") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_STREAM_ID, 0, 256, "invalid stream id")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CONNECTION, 0, 257, "invalid connection") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CONNECTION, 0, 257, "invalid connection")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_ERROR, 0, 258, "sdb error") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_ERROR, 0, 258, "sdb error")
TAOS_DEFINE_ERROR(TSDB_CODE_TIMESTAMP_OUT_OF_RANGE, 0, 259, "timestamp is out of range")
// acct // acct
TAOS_DEFINE_ERROR(TSDB_CODE_ACCT_ALREADY_EXIST, 0, 300, "accounts already exist") TAOS_DEFINE_ERROR(TSDB_CODE_ACCT_ALREADY_EXIST, 0, 300, "accounts already exist")
...@@ -172,6 +173,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 462, "invalid value") ...@@ -172,6 +173,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 462, "invalid value")
// others // others
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 500, "invalid file format") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 500, "invalid file format")
// TSDB
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CONFIG, 0, 550, "invalid TSDB configuration")
#ifdef TAOS_ERROR_C #ifdef TAOS_ERROR_C
}; };
......
...@@ -160,6 +160,7 @@ typedef struct { ...@@ -160,6 +160,7 @@ typedef struct {
typedef struct { typedef struct {
int64_t index; int64_t index;
int numOfCacheBlocks;
SList * memPool; SList * memPool;
} STsdbCachePool; } STsdbCachePool;
...@@ -227,13 +228,13 @@ typedef struct { ...@@ -227,13 +228,13 @@ typedef struct {
int maxFGroups; int maxFGroups;
int numOfFGroups; int numOfFGroups;
SFileGroup fGroup[]; SFileGroup *fGroup;
} STsdbFileH; } STsdbFileH;
#define TSDB_MIN_FILE_ID(fh) (fh)->fGroup[0].fileId #define TSDB_MIN_FILE_ID(fh) (fh)->fGroup[0].fileId
#define TSDB_MAX_FILE_ID(fh) (fh)->fGroup[(fh)->numOfFGroups - 1].fileId #define TSDB_MAX_FILE_ID(fh) (fh)->fGroup[(fh)->numOfFGroups - 1].fileId
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles); STsdbFileH *tsdbInitFileH(char *dataDir, STsdbCfg *pCfg);
void tsdbCloseFileH(STsdbFileH *pFileH); void tsdbCloseFileH(STsdbFileH *pFileH);
int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, int maxTables, SFile *pFile, int writeHeader, int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, int maxTables, SFile *pFile, int writeHeader,
int toClose); int toClose);
...@@ -261,11 +262,12 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter); ...@@ -261,11 +262,12 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter);
typedef struct { typedef struct {
int32_t len; int32_t len;
int32_t offset; int32_t offset;
int32_t padding; // For padding purpose
int32_t hasLast : 1; int32_t hasLast : 1;
int32_t numOfBlocks : 31; int32_t numOfBlocks : 31;
int32_t checksum; int64_t uid;
TSKEY maxKey; TSKEY maxKey;
} SCompIdx; /* sizeof(SCompIdx) = 24 */ } SCompIdx; /* sizeof(SCompIdx) = 28 */
/** /**
* if numOfSubBlocks == 0, then the SCompBlock is a sub-block * if numOfSubBlocks == 0, then the SCompBlock is a sub-block
...@@ -485,6 +487,11 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper); ...@@ -485,6 +487,11 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper);
int tsdbWriteCompInfo(SRWHelper *pHelper); int tsdbWriteCompInfo(SRWHelper *pHelper);
int tsdbWriteCompIdx(SRWHelper *pHelper); int tsdbWriteCompIdx(SRWHelper *pHelper);
// --------- Other functions need to further organize
void tsdbFitRetention(STsdbRepo *pRepo);
int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
void tsdbAdjustCacheBlocks(STsdbCache *pCache);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
static int tsdbAllocBlockFromPool(STsdbCache *pCache); static int tsdbAllocBlockFromPool(STsdbCache *pCache);
static void tsdbFreeBlockList(SList *list); static void tsdbFreeBlockList(SList *list);
static void tsdbFreeCacheMem(SCacheMem *mem); static void tsdbFreeCacheMem(SCacheMem *mem);
static int tsdbAddCacheBlockToPool(STsdbCache *pCache);
STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo) { STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo) {
STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache)); STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache));
...@@ -40,13 +41,7 @@ STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo) ...@@ -40,13 +41,7 @@ STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo)
if (pPool->memPool == NULL) goto _err; if (pPool->memPool == NULL) goto _err;
for (int i = 0; i < totalBlocks; i++) { for (int i = 0; i < totalBlocks; i++) {
STsdbCacheBlock *pBlock = (STsdbCacheBlock *)malloc(sizeof(STsdbCacheBlock) + cacheBlockSize); if (tsdbAddCacheBlockToPool(pCache) < 0) goto _err;
if (pBlock == NULL) {
goto _err;
}
pBlock->offset = 0;
pBlock->remain = cacheBlockSize;
tdListAppend(pPool->memPool, (void *)(&pBlock));
} }
pCache->mem = NULL; pCache->mem = NULL;
...@@ -142,4 +137,70 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) { ...@@ -142,4 +137,70 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
tsdbUnLockRepo(pCache->pRepo); tsdbUnLockRepo(pCache->pRepo);
return 0; return 0;
}
int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
STsdbCache *pCache = pRepo->tsdbCache;
int oldNumOfBlocks = pCache->totalCacheBlocks;
tsdbLockRepo((TsdbRepoT *)pRepo);
ASSERT(pCache->totalCacheBlocks != totalBlocks);
if (pCache->totalCacheBlocks < totalBlocks) {
ASSERT(pCache->totalCacheBlocks == pCache->pool.numOfCacheBlocks);
int blocksToAdd = pCache->totalCacheBlocks - totalBlocks;
pCache->totalCacheBlocks = totalBlocks;
for (int i = 0; i < blocksToAdd; i++) {
if (tsdbAddCacheBlockToPool(pCache) < 0) {
tsdbUnLockRepo((TsdbRepoT *)pRepo);
tsdbError("tsdbId %d: failed to add cache block to cache pool", pRepo->config.tsdbId);
return -1;
}
}
} else {
pCache->totalCacheBlocks = totalBlocks;
tsdbAdjustCacheBlocks(pCache);
}
tsdbUnLockRepo((TsdbRepoT *)pRepo);
tsdbTrace("tsdbId %d: tsdb total cache blocks changed from %d to %d", pRepo->config.tsdbId, oldNumOfBlocks, totalBlocks);
return 0;
}
static int tsdbAddCacheBlockToPool(STsdbCache *pCache) {
STsdbCachePool *pPool = &pCache->pool;
STsdbCacheBlock *pBlock = malloc(sizeof(STsdbCacheBlock) + pCache->cacheBlockSize);
if (pBlock == NULL) return -1;
pBlock->offset = 0;
pBlock->remain = pCache->cacheBlockSize;
tdListAppend(pPool->memPool, (void *)(&pBlock));
pPool->numOfCacheBlocks++;
return 0;
}
static int tsdbRemoveCacheBlockFromPool(STsdbCache *pCache) {
STsdbCachePool *pPool = &pCache->pool;
STsdbCacheBlock *pBlock = NULL;
ASSERT(pCache->totalCacheBlocks >= 0);
SListNode *node = tdListPopHead(pPool->memPool);
if (node == NULL) return -1;
tdListNodeGetData(pPool->memPool, node, &pBlock);
free(pBlock);
listNodeFree(node);
pPool->numOfCacheBlocks--;
return 0;
}
void tsdbAdjustCacheBlocks(STsdbCache *pCache) {
while (pCache->totalCacheBlocks < pCache->pool.numOfCacheBlocks) {
if (tsdbRemoveCacheBlockFromPool(pCache) < 0) break;
}
} }
\ No newline at end of file
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "tchecksum.h" #include "tchecksum.h"
#include "tsdbMain.h" #include "tsdbMain.h"
#include "tutil.h" #include "tutil.h"
#include "ttime.h"
const char *tsdbFileSuffix[] = { const char *tsdbFileSuffix[] = {
".head", // TSDB_FILE_TYPE_HEAD ".head", // TSDB_FILE_TYPE_HEAD
...@@ -40,13 +41,19 @@ static int tsdbWriteFileHead(SFile *pFile); ...@@ -40,13 +41,19 @@ static int tsdbWriteFileHead(SFile *pFile);
static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables);
static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid); static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid);
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { STsdbFileH *tsdbInitFileH(char *dataDir, STsdbCfg *pCfg) {
STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles); STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH));
if (pFileH == NULL) { // TODO: deal with ERROR here if (pFileH == NULL) { // TODO: deal with ERROR here
return NULL; return NULL;
} }
pFileH->maxFGroups = maxFiles; pFileH->maxFGroups = pCfg->keep / pCfg->daysPerFile + 3;
pFileH->fGroup = (SFileGroup *)calloc(pFileH->maxFGroups, sizeof(SFileGroup));
if (pFileH->fGroup == NULL) {
free(pFileH);
return NULL;
}
DIR *dir = opendir(dataDir); DIR *dir = opendir(dataDir);
if (dir == NULL) { if (dir == NULL) {
...@@ -69,7 +76,12 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { ...@@ -69,7 +76,12 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) {
return pFileH; return pFileH;
} }
void tsdbCloseFileH(STsdbFileH *pFileH) { free(pFileH); } void tsdbCloseFileH(STsdbFileH *pFileH) {
if (pFileH) {
tfree(pFileH->fGroup);
free(pFileH);
}
}
static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile) { static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile) {
tsdbGetFileName(dataDir, fid, suffix, pFile->fname); tsdbGetFileName(dataDir, fid, suffix, pFile->fname);
...@@ -161,6 +173,18 @@ void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direct ...@@ -161,6 +173,18 @@ void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direct
} }
} }
void tsdbFitRetention(STsdbRepo *pRepo) {
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pGroup = pFileH->fGroup;
int mfid =
tsdbGetKeyFileId(taosGetTimestamp(pRepo->config.precision), pRepo->config.daysPerFile, pRepo->config.precision);
while (pGroup[0].fileId < mfid) {
tsdbRemoveFileGroup(pFileH, pGroup[0].fileId);
}
}
void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
if (pIter->numOfFGroups == 0) { if (pIter->numOfFGroups == 0) {
assert(pIter->pFileGroup == NULL); assert(pIter->pFileGroup == NULL);
...@@ -252,43 +276,6 @@ int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInf ...@@ -252,43 +276,6 @@ int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInf
return 0; return 0;
} }
// int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) {
// SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
// if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
// if (read(pFile->fd, buf, sizeof(SCompIdx) * maxTables) < 0) return -1;
// // TODO: need to check the correctness
// return 0;
// }
// int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) {
// SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
// if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) return -1;
// if (read(pFile->fd, buf, pIdx->len) < 0) return -1;
// // TODO: need to check the correctness
// return 0;
// }
// int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf) {
// // assert(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
// if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) return -1;
// size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols;
// if (read(pFile->fd, buf, size) < 0) return -1;
// return 0;
// }
// int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf) {
// if (lseek(pFile->fd, blockBaseOffset + pCol->offset, SEEK_SET) < 0) return -1;
// if (read(pFile->fd, buf, pCol->len) < 0) return -1;
// return 0;
// }
static int compFGroupKey(const void *key, const void *fgroup) { static int compFGroupKey(const void *key, const void *fgroup) {
int fid = *(int *)key; int fid = *(int *)key;
SFileGroup *pFGroup = (SFileGroup *)fgroup; SFileGroup *pFGroup = (SFileGroup *)fgroup;
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include "tsdbMain.h" #include "tsdbMain.h"
#include "tscompression.h" #include "tscompression.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "ttime.h"
int tsdbDebugFlag = 135; int tsdbDebugFlag = 135;
...@@ -27,7 +28,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); ...@@ -27,7 +28,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo); static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo);
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo); static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo);
// static int tsdbOpenMetaFile(char *tsdbDir); // static int tsdbOpenMetaFile(char *tsdbDir);
static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock); static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now);
static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg);
static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname);
static void * tsdbCommitData(void *arg); static void * tsdbCommitData(void *arg);
...@@ -35,8 +36,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **i ...@@ -35,8 +36,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **i
SDataCols *pDataCols); SDataCols *pDataCols);
static TSKEY tsdbNextIterKey(SSkipListIterator *pIter); static TSKEY tsdbNextIterKey(SSkipListIterator *pIter);
static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey); static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey);
// static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression);
// int64_t uid); static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep);
static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name) #define TSDB_GET_TABLE_BY_NAME(pRepo, name)
...@@ -214,7 +216,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) { ...@@ -214,7 +216,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
} }
tsdbGetDataDirName(pRepo, dataDir); tsdbGetDataDirName(pRepo, dataDir);
pRepo->tsdbFileH = tsdbInitFileH(dataDir, pRepo->config.maxTables); pRepo->tsdbFileH = tsdbInitFileH(dataDir, &(pRepo->config));
if (pRepo->tsdbFileH == NULL) { if (pRepo->tsdbFileH == NULL) {
tsdbFreeCache(pRepo->tsdbCache); tsdbFreeCache(pRepo->tsdbCache);
tsdbFreeMeta(pRepo->tsdbMeta); tsdbFreeMeta(pRepo->tsdbMeta);
...@@ -297,10 +299,23 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo) { ...@@ -297,10 +299,23 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo) {
*/ */
int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg) { int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbCfg * pRCfg = &pRepo->config;
pRepo->config = *pCfg; if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return TSDB_CODE_INVALID_CONFIG;
// TODO
return 0; ASSERT(pRCfg->tsdbId == pCfg->tsdbId);
ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize);
ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile);
ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock);
ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock);
ASSERT(pRCfg->precision == pCfg->precision);
if (pRCfg->compression != pCfg->compression) tsdbAlterCompression(pRepo, pCfg->compression);
if (pRCfg->keep != pCfg->keep) tsdbAlterKeep(pRepo, pCfg->keep);
if (pRCfg->totalBlocks != pCfg->totalBlocks) tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks);
if (pRCfg->maxTables != pCfg->maxTables) tsdbAlterMaxTables(pRepo, pCfg->maxTables);
return TSDB_CODE_SUCCESS;
} }
int32_t tsdbTriggerCommit(TsdbRepoT *repo) { int32_t tsdbTriggerCommit(TsdbRepoT *repo) {
...@@ -394,13 +409,16 @@ STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tableId) { ...@@ -394,13 +409,16 @@ STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tableId) {
// TODO: need to return the number of data inserted // TODO: need to return the number of data inserted
int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg) { int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg) {
SSubmitMsgIter msgIter; SSubmitMsgIter msgIter;
STsdbRepo *pRepo = (STsdbRepo *)repo;
tsdbInitSubmitMsgIter(pMsg, &msgIter); tsdbInitSubmitMsgIter(pMsg, &msgIter);
SSubmitBlk *pBlock = NULL; SSubmitBlk *pBlock = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
TSKEY now = taosGetTimestamp(pRepo->config.precision);
while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) { while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) {
if ((code = tsdbInsertDataToTable(repo, pBlock)) != TSDB_CODE_SUCCESS) { if ((code = tsdbInsertDataToTable(repo, pBlock, now)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
...@@ -787,21 +805,31 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -787,21 +805,31 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
return 0; return 0;
} }
static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock) { static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid}; STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid};
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId); STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId);
if (pTable == NULL) { if (pTable == NULL) {
uError("failed to get table for insert, uid:%" PRIu64 ", tid:%d", tableId.uid, tableId.tid); tsdbError("failed to get table for insert, uid:%" PRIu64 ", tid:%d", tableId.uid, tableId.tid);
return TSDB_CODE_INVALID_TABLE_ID; return TSDB_CODE_INVALID_TABLE_ID;
} }
SSubmitBlkIter blkIter; SSubmitBlkIter blkIter = {0};
SDataRow row; SDataRow row = NULL;
TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep;
TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
tsdbInitSubmitBlkIter(pBlock, &blkIter); tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) { while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (dataRowKey(row) < minKey || dataRowKey(row) > maxKey) {
tsdbError(
"tsdbId: %d, table tid: %d, talbe uid: %ld timestamp is out of range. now: %ld maxKey: %ld, minKey: %ld",
pRepo->config.tsdbId, pTable->tableId.tid, pTable->tableId.uid, now, minKey, maxKey);
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
}
if (tdInsertRowToTable(pRepo, row, pTable) < 0) { if (tdInsertRowToTable(pRepo, row, pTable) < 0) {
return -1; return -1;
} }
...@@ -903,6 +931,9 @@ static void *tsdbCommitData(void *arg) { ...@@ -903,6 +931,9 @@ static void *tsdbCommitData(void *arg) {
} }
} }
// Do retention actions
tsdbFitRetention(pRepo);
_exit: _exit:
tdFreeDataCols(pDataCols); tdFreeDataCols(pDataCols);
tsdbDestroyTableIters(iters, pCfg->maxTables); tsdbDestroyTableIters(iters, pCfg->maxTables);
...@@ -910,6 +941,7 @@ _exit: ...@@ -910,6 +941,7 @@ _exit:
tsdbLockRepo(arg); tsdbLockRepo(arg);
tdListMove(pCache->imem->list, pCache->pool.memPool); tdListMove(pCache->imem->list, pCache->pool.memPool);
tsdbAdjustCacheBlocks(pCache);
tdListFree(pCache->imem->list); tdListFree(pCache->imem->list);
free(pCache->imem); free(pCache->imem);
pCache->imem = NULL; pCache->imem = NULL;
...@@ -1028,4 +1060,27 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK ...@@ -1028,4 +1060,27 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK
if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1; if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
} }
return 0; return 0;
}
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) {
pRepo->config.compression = compression;
}
static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) {
STsdbCfg *pCfg = &pRepo->config;
int maxFiles = keep / pCfg->maxTables + 3;
if (pRepo->config.keep > keep) {
pRepo->tsdbFileH->maxFGroups = maxFiles;
} else {
pRepo->tsdbFileH->fGroup = realloc(pRepo->tsdbFileH->fGroup, sizeof(SFileGroup));
if (pRepo->tsdbFileH->fGroup == NULL) {
// TODO: deal with the error
}
pRepo->tsdbFileH->maxFGroups = maxFiles;
}
}
static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) {
// TODO
} }
\ No newline at end of file
...@@ -414,6 +414,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { ...@@ -414,6 +414,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
ASSERT((pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0); ASSERT((pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0);
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len); taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len);
pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END); pIdx->offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
pIdx->uid = pHelper->tableInfo.uid;
if (pIdx->offset < 0) return -1; if (pIdx->offset < 0) return -1;
ASSERT(pIdx->offset >= tsizeof(pHelper->pCompIdx)); ASSERT(pIdx->offset >= tsizeof(pHelper->pCompIdx));
......
...@@ -22,22 +22,37 @@ extern "C" { ...@@ -22,22 +22,37 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <time.h> #include <time.h>
#include "tutil.h"
//@return timestamp in second //@return timestamp in second
int32_t taosGetTimestampSec(); int32_t taosGetTimestampSec();
//@return timestamp in millisecond //@return timestamp in millisecond
int64_t taosGetTimestampMs(); static FORCE_INLINE int64_t taosGetTimestampMs() {
struct timeval systemTime;
gettimeofday(&systemTime, NULL);
return (int64_t)systemTime.tv_sec * 1000L + (uint64_t)systemTime.tv_usec / 1000;
}
//@return timestamp in microsecond //@return timestamp in microsecond
int64_t taosGetTimestampUs(); static FORCE_INLINE int64_t taosGetTimestampUs() {
struct timeval systemTime;
gettimeofday(&systemTime, NULL);
return (int64_t)systemTime.tv_sec * 1000000L + (uint64_t)systemTime.tv_usec;
}
/* /*
* @return timestamp decided by global conf variable, tsTimePrecision * @return timestamp decided by global conf variable, tsTimePrecision
* if precision == TSDB_TIME_PRECISION_MICRO, it returns timestamp in microsecond. * if precision == TSDB_TIME_PRECISION_MICRO, it returns timestamp in microsecond.
* precision == TSDB_TIME_PRECISION_MILLI, it returns timestamp in millisecond. * precision == TSDB_TIME_PRECISION_MILLI, it returns timestamp in millisecond.
*/ */
int64_t taosGetTimestamp(int32_t precision); static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) {
if (precision == TSDB_TIME_PRECISION_MICRO) {
return taosGetTimestampUs();
} else {
return taosGetTimestampMs();
}
}
int32_t getTimestampInUsFromStr(char* token, int32_t tokenlen, int64_t* ts); int32_t getTimestampInUsFromStr(char* token, int32_t tokenlen, int64_t* ts);
......
...@@ -121,30 +121,6 @@ static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec); ...@@ -121,30 +121,6 @@ static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec);
int32_t taosGetTimestampSec() { return (int32_t)time(NULL); } int32_t taosGetTimestampSec() { return (int32_t)time(NULL); }
int64_t taosGetTimestampMs() {
struct timeval systemTime;
gettimeofday(&systemTime, NULL);
return (int64_t)systemTime.tv_sec * 1000L + (uint64_t)systemTime.tv_usec / 1000;
}
int64_t taosGetTimestampUs() {
struct timeval systemTime;
gettimeofday(&systemTime, NULL);
return (int64_t)systemTime.tv_sec * 1000000L + (uint64_t)systemTime.tv_usec;
}
/*
* If tsTimePrecision == 1, taosGetTimestamp will return timestamp in microsecond.
* Otherwise, it will return timestamp in millisecond.
*/
int64_t taosGetTimestamp(int32_t precision) {
if (precision == TSDB_TIME_PRECISION_MICRO) {
return taosGetTimestampUs();
} else {
return taosGetTimestampMs();
}
}
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec) { int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec) {
/* parse datatime string in with tz */ /* parse datatime string in with tz */
if (strnchr(timestr, 'T', len, false) != NULL) { if (strnchr(timestr, 'T', len, false) != NULL) {
......
...@@ -111,11 +111,13 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe ...@@ -111,11 +111,13 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
int32_t code = 0; int32_t code = 0;
dTrace("pVnode:%p vgId:%d, table:%s, start to create", pVnode, pVnode->vgId, pTable->tableId); dTrace("pVnode:%p vgId:%d, table:%s, start to create", pVnode, pVnode->vgId, pTable->tableId);
int16_t numOfColumns = htons(pTable->numOfColumns); int16_t numOfColumns = htons(pTable->numOfColumns);
int16_t numOfTags = htons(pTable->numOfTags); int16_t numOfTags = htons(pTable->numOfTags);
int32_t sid = htonl(pTable->sid); int32_t sid = htonl(pTable->sid);
uint64_t uid = htobe64(pTable->uid); uint64_t uid = htobe64(pTable->uid);
SSchema *pSchema = (SSchema *) pTable->data; SSchema * pSchema = (SSchema *)pTable->data;
STSchema *pDestTagSchema = NULL;
SDataRow dataRow = NULL;
int32_t totalCols = numOfColumns + numOfTags; int32_t totalCols = numOfColumns + numOfTags;
...@@ -130,7 +132,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe ...@@ -130,7 +132,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
tsdbTableSetName(&tCfg, pTable->tableId, false); tsdbTableSetName(&tCfg, pTable->tableId, false);
if (numOfTags != 0) { if (numOfTags != 0) {
STSchema *pDestTagSchema = tdNewSchema(numOfTags); pDestTagSchema = tdNewSchema(numOfTags);
for (int i = numOfColumns; i < totalCols; i++) { for (int i = numOfColumns; i < totalCols; i++) {
tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
} }
...@@ -140,7 +142,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe ...@@ -140,7 +142,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
char *pTagData = pTable->data + totalCols * sizeof(SSchema); char *pTagData = pTable->data + totalCols * sizeof(SSchema);
int accumBytes = 0; int accumBytes = 0;
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); dataRow = tdNewDataRowFromSchema(pDestTagSchema);
for (int i = 0; i < numOfTags; i++) { for (int i = 0; i < numOfTags; i++) {
STColumn *pTCol = schemaColAt(pDestTagSchema, i); STColumn *pTCol = schemaColAt(pDestTagSchema, i);
...@@ -151,6 +153,8 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe ...@@ -151,6 +153,8 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
} }
code = tsdbCreateTable(pVnode->tsdb, &tCfg); code = tsdbCreateTable(pVnode->tsdb, &tCfg);
tdFreeDataRow(dataRow);
tfree(pDestTagSchema);
tfree(pDestSchema); tfree(pDestSchema);
dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code); dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册