提交 f4ca77f7 编写于 作者: H Hongze Cheng

TD-353

上级 ea86b6e8
......@@ -78,9 +78,9 @@ STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo);
// --------- TSDB REPOSITORY DEFINITION
int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg);
int32_t tsdbDropRepo(TSDB_REPO_T *repo);
int32_t tsdbDropRepo(char *rootDir);
TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH);
int32_t tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit);
void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit);
int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg);
// --------- TSDB TABLE DEFINITION
......
......@@ -50,7 +50,7 @@ typedef struct STable {
uint64_t suid;
struct STable* pSuper; // super table pointer
uint8_t numOfSchemas;
STSchema schema[TSDB_MAX_TABLE_SCHEMAS];
STSchema* schema[TSDB_MAX_TABLE_SCHEMAS];
STSchema* tagSchema;
SKVRow tagVal;
void* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
......@@ -316,9 +316,9 @@ int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg);
void tsdbFreeFileH(STsdbFileH* pFileH);
int* tsdbOpenFileH(STsdbRepo* pRepo);
int tsdbOpenFileH(STsdbRepo* pRepo);
void tsdbCloseFileH(STsdbRepo* pRepo);
SFileGroup* tsdbCreateFGroupIfNeed(STsdbFileH* pFileH, char* dataDir, int fid, int maxTables);
SFileGroup* tsdbCreateFGroupIfNeed(STsdbRepo* pRepo, char* dataDir, int fid, int maxTables);
void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction);
void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid);
SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter);
......@@ -327,6 +327,7 @@ void tsdbCloseFile(SFile* pFile);
int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type);
SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags);
void tsdbFitRetention(STsdbRepo* pRepo);
int tsdbUpdateFileHeader(SFile* pFile, uint32_t version);
// ------------------ tsdbRWHelper.c
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
......@@ -345,6 +346,27 @@ void tsdbFitRetention(STsdbRepo* pRepo);
#define helperRepo(h) (h)->pRepo
#define helperState(h) (h)->state
int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
void tsdbDestroyHelper(SRWHelper* pHelper);
void tsdbResetHelper(SRWHelper* pHelper);
int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError);
void tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
int tsdbWriteDataBlock(SRWHelper* pHelper, SDataCols* pDataCols);
int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
int tsdbWriteCompInfo(SRWHelper* pHelper);
int tsdbWriteCompIdx(SRWHelper* pHelper);
int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
int tsdbloadcompdata(srwhelper* phelper, scompblock* pcompblock, void* target);
void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
int tsdbLoadBlockDataCols(SRWHelper* pHelper, SDataCols* pDataCols, int blkIdx, int16_t* colIds, int numOfColIds);
int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SDataCols* target);
int tsdbUpdateFileHeader(SFile* pFile, uint32_t version);
void* tsdbEncodeSFileInfo(void* buf, const STsdbFileInfo* pInfo);
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
// ------------------ tsdbMain.c
#define REPO_ID(r) (r)->config.tsdbId
#define IS_REPO_LOCKED(r) (r)->repoLocked
......@@ -353,6 +375,9 @@ char* tsdbGetMetaFileName(char* rootDir);
char* tsdbGetDataFileName(STsdbRepo* pRepo, int fid, int type);
int tsdbLockRepo(STsdbRepo* pRepo);
int tsdbUnlockRepo(STsdbRepo* pRepo);
char* tsdbGetDataDirName(char* rootDir);
STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo);
STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo);
#if 0
......@@ -389,8 +414,6 @@ void tsdbAdjustCacheBlocks(STsdbCache *pCache);
int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version);
int compFGroupKey(const void *key, const void *fgroup);
#endif
#ifdef __cplusplus
......
......@@ -36,6 +36,8 @@ static void tsdbDestroyFile(SFile *pFile);
static int compFGroup(const void *arg1, const void *arg2);
static int keyFGroupCompFunc(const void *key, const void *fgroup);
static void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup);
static void *tsdbEncodeSFileInfo(void *buf, const STsdbFileInfo *pInfo);
static void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo);
// ---------------- INTERNAL FUNCTIONS ----------------
STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
......@@ -75,7 +77,7 @@ void tsdbFreeFileH(STsdbFileH *pFileH) {
}
}
int *tsdbOpenFileH(STsdbRepo *pRepo) {
int tsdbOpenFileH(STsdbRepo *pRepo) {
ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL);
char *tDataDir = NULL;
......@@ -83,7 +85,7 @@ int *tsdbOpenFileH(STsdbRepo *pRepo) {
int fid = 0;
SFileGroup fileGroup = {0};
STsdbFileH pFileH = pRepo->tsdbFileH;
STsdbFileH *pFileH = pRepo->tsdbFileH;
tDataDir = tsdbGetDataDirName(pRepo->rootDir);
if (tDataDir == NULL) {
......@@ -91,7 +93,7 @@ int *tsdbOpenFileH(STsdbRepo *pRepo) {
goto _err;
}
DIR *dir = opendir(tDataDir);
dir = opendir(tDataDir);
if (dir == NULL) {
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), tDataDir, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -105,7 +107,7 @@ int *tsdbOpenFileH(STsdbRepo *pRepo) {
if (tsdbSearchFGroup(pRepo->tsdbFileH, fid, TD_EQ) != NULL) return 0;
fileGroup = {0};
memset((void *)(&fileGroup), 0, sizeof(SFileGroup));
fileGroup.fileId = fid;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
if (tsdbInitFile(&fileGroup.files[type], pRepo, fid, type) < 0) {
......@@ -116,7 +118,7 @@ int *tsdbOpenFileH(STsdbRepo *pRepo) {
tsdbTrace("vgId:%d file group %d init", REPO_ID(pRepo), fid);
pFileH->[pFileH->nFGroups++] = fileGroup;
pFileH->pFGroup[pFileH->nFGroups++] = fileGroup;
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
}
......@@ -128,7 +130,7 @@ _err:
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]);
tfree(tDataDir);
if (dir != NULL) closedir(tDataDir);
if (dir != NULL) closedir(dir);
tsdbCloseFileH(pRepo);
return -1;
}
......@@ -139,28 +141,30 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
for (int i = 0; i < pFileH->nFGroups; i++) {
SFileGroup *pFGroup = pFileH->pFGroup + i;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
tsdbDestroyFile(pFGroup->files[type]);
tsdbDestroyFile(&pFGroup->files[type]);
}
}
}
SFileGroup *tsdbCreateFGroupIfNeed(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) {
if (pFileH->numOfFGroups >= pFileH->maxFGroups) return NULL;
SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int maxTables) {
STsdbFileH *pFileH = pRepo->tsdbFileH;
if (pFileH->nFGroups >= pFileH->maxFGroups) return NULL;
SFileGroup fGroup;
SFileGroup *pFGroup = &fGroup;
SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid);
SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ);
if (pGroup == NULL) { // if not exists, create one
pFGroup->fileId = fid;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], &(pFGroup->files[type])) < 0)
if (tsdbCreateFile(&pFGroup->files[type], pRepo, fid, type) < 0)
goto _err;
}
pFileH->fGroup[pFileH->numOfFGroups++] = fGroup;
qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup);
return tsdbSearchFGroup(pFileH, fid);
pFileH->pFGroup[pFileH->nFGroups++] = fGroup;
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
return tsdbSearchFGroup(pFileH, fid, TD_EQ);
}
return pGroup;
......@@ -172,15 +176,15 @@ _err:
void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { // TODO
pIter->direction = direction;
pIter->base = pFileH->fGroup;
pIter->numOfFGroups = pFileH->numOfFGroups;
if (pFileH->numOfFGroups == 0) {
pIter->base = pFileH->pFGroup;
pIter->numOfFGroups = pFileH->nFGroups;
if (pFileH->nFGroups == 0) {
pIter->pFileGroup = NULL;
} else {
if (direction == TSDB_FGROUP_ITER_FORWARD) {
pIter->pFileGroup = pFileH->fGroup;
pIter->pFileGroup = pFileH->pFGroup;
} else {
pIter->pFileGroup = pFileH->fGroup + pFileH->numOfFGroups - 1;
pIter->pFileGroup = pFileH->pFGroup + pFileH->nFGroups - 1;
}
}
}
......@@ -274,7 +278,7 @@ _err:
SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) {
void *ptr =
taosbsearch((void *)(&fid), (void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc);
taosbsearch((void *)(&fid), (void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags);
if (ptr == NULL) return NULL;
return (SFileGroup *)ptr;
}
......@@ -289,13 +293,35 @@ void tsdbFitRetention(STsdbRepo *pRepo) {
pthread_rwlock_wrlock(&(pFileH->fhlock));
while (pFileH->numOfFGroups > 0 && pGroup[0].fileId < mfid) {
tsdbRemoveFileGroup(pFileH, pGroup);
while (pFileH->nFGroups > 0 && pGroup[0].fileId < mfid) {
tsdbRemoveFileGroup(pRepo, pGroup);
}
pthread_rwlock_unlock(&(pFileH->fhlock))
pthread_rwlock_unlock(&(pFileH->fhlock));
}
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
void *pBuf = (void *)buf;
pBuf = taosEncodeFixedU32(pBuf, version);
pBuf = tsdbEncodeSFileInfo(pBuf, &(pFile->info));
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
if (lseek(pFile->fd, 0, SEEK_SET) < 0) {
tsdbError("failed to lseek file %s since %s", pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (twrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
// ---------------- LOCAL FUNCTIONS ----------------
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
......@@ -379,3 +405,25 @@ static void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
tsdbDestroyFile(&fileGroup.files[type]);
}
}
static void *tsdbEncodeSFileInfo(void *buf, const STsdbFileInfo *pInfo) {
buf = taosEncodeFixedU32(buf, pInfo->offset);
buf = taosEncodeFixedU32(buf, pInfo->len);
buf = taosEncodeFixedU64(buf, pInfo->size);
buf = taosEncodeFixedU64(buf, pInfo->tombSize);
buf = taosEncodeFixedU32(buf, pInfo->totalBlocks);
buf = taosEncodeFixedU32(buf, pInfo->totalSubBlocks);
return buf;
}
static void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->offset));
buf = taosDecodeFixedU32(buf, &(pInfo->len));
buf = taosDecodeFixedU64(buf, &(pInfo->size));
buf = taosDecodeFixedU64(buf, &(pInfo->tombSize));
buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks));
return buf;
}
\ No newline at end of file
......@@ -29,6 +29,10 @@
#define TSDB_DATA_DIR_NAME "data"
#define TSDB_META_FILE_NAME "meta"
#define TSDB_META_FILE_INDEX 10000000
#define IS_VALID_PRECISION(precision) \
(((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO))
#define TSDB_DEFAULT_COMPRESSION TWO_STAGE_COMP
#define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP))
typedef struct {
int32_t totalLen;
......@@ -42,6 +46,24 @@ typedef struct {
SSubmitBlk *pBlock;
} SSubmitMsgIter;
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static int32_t tsdbSetRepoEnv(char *rootDir, STsdbCfg *pCfg);
static int32_t tsdbUnsetRepoEnv(char *rootDir);
static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg);
static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg);
static char * tsdbGetCfgFname(char *rootDir);
static STsdbRepo * tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg);
static void tsdbFreeRepo(STsdbRepo *pRepo);
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows);
static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter);
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
static int tsdbRestoreInfo(STsdbRepo *pRepo);
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression);
static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep);
static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables);
// Function declaration
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
if (mkdir(rootDir, 0755) < 0) {
......@@ -122,8 +144,6 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
tsdbCloseBufPool(pRepo);
tsdbCloseMeta(pRepo);
tsdbTrace("vgId:%d repository is closed", REPO_ID(pRepo));
return 0;
}
int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
......@@ -136,7 +156,6 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
}
SSubmitBlk *pBlock = NULL;
int32_t code = TSDB_CODE_SUCCESS;
int32_t affectedrows = 0;
TSKEY now = taosGetTimestamp(pRepo->config.precision);
......@@ -156,7 +175,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
// STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
uint32_t magic = 0;
char fname[256] = "\0";
char *fname = NULL;
struct stat fState;
......@@ -169,9 +188,9 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
if (name[0] == 0) { // get the file from index or after, but not larger than eindex
int fid = (*index) / 3;
if (pFileH->numOfFGroups == 0 || fid > pFileH->fGroup[pFileH->numOfFGroups - 1].fileId) {
if (pFileH->nFGroups == 0 || fid > pFileH->pFGroup[pFileH->nFGroups - 1].fileId) {
if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) {
tsdbGetMetaFileName(pRepo->rootDir, fname);
fname = tsdbGetMetaFileName(pRepo->rootDir);
*index = TSDB_META_FILE_INDEX;
} else {
tfree(sdup);
......@@ -179,7 +198,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
}
} else {
SFileGroup *pFGroup =
taosbsearch(&fid, pFileH->fGroup, pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey, TD_GE);
taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), compFGroupKey, TD_GE);
if (pFGroup->fileId == fid) {
strcpy(fname, pFGroup->files[(*index) % 3].fname);
} else {
......@@ -195,10 +214,10 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
strcpy(name, fname + strlen(prefix));
} else { // get the named file at the specified index. If not there, return 0
if (*index == TSDB_META_FILE_INDEX) { // get meta file
tsdbGetMetaFileName(pRepo->rootDir, fname);
fname = tsdbGetMetaFileName(pRepo->rootDir);
} else {
int fid = (*index) / 3;
SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid);
SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ);
if (pFGroup == NULL) { // not found
tfree(sdup);
return 0;
......@@ -218,6 +237,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
*size = fState.st_size;
magic = *size;
tfree(fname);
return magic;
}
......@@ -229,7 +249,7 @@ void tsdbStartStream(TSDB_REPO_T *repo) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TALBE_UID(pTable), TABLE_TID(pTable), pTable->sql,
tsdbGetTableSchema(pMeta, pTable));
tsdbGetTableSchema(pTable));
}
}
}
......@@ -270,7 +290,7 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) {
tsdbAlterMaxTables(pRepo, pCfg->maxTables);
}
if (configChanged) tsdbSaveConfig(pRepo);
if (configChanged) tsdbSaveConfig(pRepo->rootDir, &pRepo->config);
return TSDB_CODE_SUCCESS;
}
......@@ -302,7 +322,7 @@ char *tsdbGetDataFileName(STsdbRepo *pRepo, int fid, int type) {
char *fname = malloc(tlen);
if (fname == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
return NULL;
}
sprintf(fname, "%s/%s/v%df%d.%s", pRepo->rootDir, TSDB_DATA_DIR_NAME, REPO_ID(pRepo), fid, tsdbFileSuffix[type]);
......@@ -331,6 +351,18 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) {
return 0;
}
char *tsdbGetDataDirName(char *rootDir) {
int tlen = strlen(rootDir) + strlen(TSDB_DATA_DIR_NAME) + 2;
char *fname = calloc(1, tlen);
if (fname == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
snprintf(fname, tlen, "%s/%s", rootDir, TSDB_DATA_DIR_NAME);
return fname;
}
STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; }
STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; }
STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; }
......@@ -413,7 +445,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
}
if (pCfg->minRowsPerFileBlock > pCfg->maxRowsPerFileBlock) {
tsdbError("vgId:%d invalid configuration! minRowsPerFileBlock %d maxRowsPerFileBlock %d" pCfg->tsdbId,
tsdbError("vgId:%d invalid configuration! minRowsPerFileBlock %d maxRowsPerFileBlock %d", pCfg->tsdbId,
pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock);
goto _err;
}
......@@ -488,7 +520,7 @@ static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) {
if (fd < 0) {
tsdbError("vgId:%d failed to open file %s since %s", pCfg->tsdbId, fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err
goto _err;
}
if (twrite(fd, (void *)pCfg, sizeof(STsdbCfg)) < sizeof(STsdbCfg)) {
......@@ -560,17 +592,6 @@ static char *tsdbGetCfgFname(char *rootDir) {
return fname;
}
static char *tsdbGetDataDirName(char *rootDir) {
int tlen = strlen(rootDir) + strlen(TSDB_DATA_DIR_NAME) + 2;
char *fname = calloc(1, tlen);
if (fname == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
snprintf(fname, tlen, "%s/%s", rootDir, TSDB_DATA_DIR_NAME);
return fname;
}
static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
STsdbRepo *pRepo = (STsdbRepo *)calloc(1, sizeof(STsdbRepo));
......@@ -626,8 +647,8 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) {
tsdbFreeFileH(pRepo->tsdbFileH);
tsdbFreeBufPool(pRepo->pPool);
tsdbFreeMeta(pRepo->tsdbMeta);
tsdbFreeMemTable(pRepo->mem);
tsdbFreeMemTable(pRepo->imem);
// tsdbFreeMemTable(pRepo->mem);
// tsdbFreeMemTable(pRepo->imem);
tfree(pRepo->rootDir);
pthread_mutex_destroy(&pRepo->mutex);
free(pRepo);
......@@ -660,7 +681,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
STsdbMeta *pMeta = pRepo->tsdbMeta;
int64_t points = 0;
STable *pTable == tsdbGetTableByUid(pMeta, pBlock->uid);
STable *pTable = tsdbGetTableByUid(pMeta, pBlock->uid);
if (pTable == NULL || TABLE_TID(pTable) != pBlock->tid) {
tsdbError("vgId:%d failed to get table to insert data, uid " PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
......@@ -676,7 +697,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
// Check schema version
int32_t tversion = pBlock->sversion;
STSchema *pSchema = tsdbGetTableSchema(pMeta, pTable);
STSchema *pSchema = tsdbGetTableSchema(pTable);
ASSERT(pSchema != NULL);
int16_t nversion = schemaVersion(pSchema);
if (tversion > nversion) {
......@@ -701,9 +722,9 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
tsdbClearTableCfg(pTableCfg);
rpcFreeCont(msg);
pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, tversion);
pSchema = tsdbGetTableSchemaByVersion(pTable, tversion);
} else if (tversion < nversion) {
pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, tversion);
pSchema = tsdbGetTableSchemaByVersion(pTable, tversion);
if (pSchema == NULL) {
tsdbError("vgId:%d table %s tid %d invalid schema version %d from client", REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), tversion);
......@@ -829,8 +850,8 @@ static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) {
pRepo->tsdbFileH->maxFGroups = maxFiles;
} else {
pRepo->config.keep = keep;
pRepo->tsdbFileH->fGroup = realloc(pRepo->tsdbFileH->fGroup, sizeof(SFileGroup));
if (pRepo->tsdbFileH->fGroup == NULL) {
pRepo->tsdbFileH->pFGroup = realloc(pRepo->tsdbFileH->pFGroup, sizeof(SFileGroup));
if (pRepo->tsdbFileH->pFGroup == NULL) {
// TODO: deal with the error
}
pRepo->tsdbFileH->maxFGroups = maxFiles;
......@@ -846,7 +867,6 @@ static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
pMeta->maxTables = maxTables;
pMeta->tables = realloc(pMeta->tables, maxTables * sizeof(STable *));
memset(&pMeta->tables[oldMaxTables], 0, sizeof(STable *) * (maxTables - oldMaxTables));
pRepo->config.maxTables = maxTables;
......
......@@ -92,7 +92,7 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
pRepo->mem->tData[TABLE_TID(pTable)] = pTableData;
}
ASSERT(pTableData != NULL) && pTableData->uid == TALBE_UID(pTable);
ASSERT((pTableData != NULL) && pTableData->uid == TALBE_UID(pTable));
if (tSkipListPut(pTableData->pData, pNode) == NULL) {
tsdbFreeBytes(pRepo, (void *)pNode, bytes);
......@@ -107,7 +107,7 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
ASSERT(pTableData->numOfRows == tSkipListGetSize(pTableData->pData));
STSchema *pSchema = tsdbGetTableSchema(pTable);
if (schemaNCols(pSchema) > pMemTable->maxCols) pMemTable->maxCols = schemaNCols;
if (schemaNCols(pSchema) > pMemTable->maxCols) pMemTable->maxCols = schemaNCols(pSchema);
if (schemaTLen(pSchema) > pMemTable->maxRowBytes) pMemTable->maxRowBytes = schemaTLen(pSchema);
}
......@@ -167,6 +167,8 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) {
tsdbRefMemTable(pRepo, *pIMem);
if (tsdbUnlockRepo(pRepo) < 0) return -1;
return 0;
}
// ---------------- LOCAL FUNCTIONS ----------------
......@@ -174,11 +176,11 @@ static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) {
ASSERT(pRepo != NULL);
if (pRepo->mem == NULL) return NULL;
SListNode *pNode = listTail(pRepo->mem);
SListNode *pNode = listTail(pRepo->mem->bufBlockList);
if (pNode == NULL) return NULL;
STsdbBufBlock *pBufBlock = NULL;
tdListNodeGetData(pMemTable->bufBlockList, pNode, (void *)(&pBufBlock));
tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void *)(&pBufBlock));
return pBufBlock;
}
......@@ -189,7 +191,7 @@ static void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
int code = 0;
if (pBufBlock != NULL && pBufBlock->remain < bytes) {
if (listNEles(pRepo->mem) >= pCfg->totalBlocks / 2) { // need to commit mem
if (listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 2) { // need to commit mem
if (pRepo->imem) {
code = pthread_join(pRepo->commitThread, NULL);
if (code != 0) {
......@@ -358,6 +360,7 @@ static void *tsdbCommitData(void *arg) {
STsdbCfg * pCfg = &pRepo->config;
SDataCols * pDataCols = NULL;
SCommitIter *iters = NULL;
SRWHelper whelper = {0};
ASSERT(pRepo->commit == 1);
ASSERT(pMem != NULL);
......@@ -379,7 +382,7 @@ static void *tsdbCommitData(void *arg) {
if ((pDataCols = tdNewDataCols(pMem->maxRowBytes, pMem->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
REPO_ID(pRepo), pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
REPO_ID(pRepo), pMem->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
goto _exit;
}
......
......@@ -42,7 +42,6 @@ static int tsdbTableSetSName(STableCfg *config, char *sname, bool dup);
static int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid);
static int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup);
static int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup);
static void tsdbClearTableCfg(STableCfg *config);
static void * tsdbEncodeTableName(void *buf, tstr *name);
static void * tsdbDecodeTableName(void *buf, tstr **name);
static void * tsdbEncodeTable(void *buf, STable *pTable);
......@@ -122,7 +121,7 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) {
tsdbTrace("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, varDataVal(pTable->name),
tableId.tid, tableId.uid);
if (tsdbRemoveTableFromMeta(pMeta, pTable, true) < 0) return -1;
tsdbRemoveTableFromMeta(pMeta, pTable, true);
return 0;
}
......@@ -132,7 +131,7 @@ void *tsdbGetTableTagVal(TSDB_REPO_T *repo, const STableId *id, int32_t colId, i
STsdbMeta *pMeta = tsdbGetMeta(repo);
STable * pTable = tsdbGetTableByUid(pMeta, id->uid);
STSchema *pSchema = tsdbGetTableTagSchema(pMeta, pTable);
STSchema *pSchema = tsdbGetTableTagSchema(pTable);
STColumn *pCol = tdGetColOfID(pSchema, colId);
if (pCol == NULL) {
return NULL; // No matched tag volumn
......@@ -255,7 +254,7 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
return -1;
}
if (TABLE_TID(pTable) != htonl(pMsg->tid)) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
......@@ -457,7 +456,7 @@ int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg) {
isChanged = true;
}
STSchema *pTSchema = tsdbGetTableSchema(pMeta, pTable);
STSchema *pTSchema = tsdbGetTableSchema(pTable);
if (schemaVersion(pTSchema) < schemaVersion(pCfg->schema)) {
if (pTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) {
pTable->schema[pTable->numOfSchemas++] = tdDupSchema(pCfg->schema);
......@@ -475,8 +474,8 @@ int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg) {
if (isChanged) {
char *buf = malloc(1024 * 1024);
int bufLen = 0;
tsdbEncodeTable(pTable, buf, &bufLen);
tsdbInsertMetaRecord(pMeta->mfh, pTable->tableId.uid, buf, bufLen);
tsdbEncodeTable(buf, pTable);
// tsdbInsertMetaRecord(pMeta->mfh, pTable->tableId.uid, buf, bufLen);
free(buf);
}
......@@ -966,7 +965,7 @@ static int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup) {
return 0;
}
static void tsdbClearTableCfg(STableCfg *config) {
void tsdbClearTableCfg(STableCfg *config) {
if (config) {
if (config->schema) tdFreeSchema(config->schema);
if (config->tagSchema) tdFreeSchema(config->tagSchema);
......@@ -985,7 +984,7 @@ static void *tsdbEncodeTableName(void *buf, tstr *name) {
memcpy(pBuf, name->data, name->len);
pBuf = POINTER_SHIFT(pBuf, name->len);
return POINTER_DISTANCE(pBuf, buf);
return pBuf;
}
static void *tsdbDecodeTableName(void *buf, tstr **name) {
......
......@@ -20,6 +20,39 @@
#include "tscompression.h"
#include "tsdbMain.h"
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper);
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
SCompBlock *pCompBlock, bool isLast, bool isSuperBlock);
static int compareKeyBlock(const void *arg1, const void *arg2);
static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols);
static int compTSKEY(const void *key1, const void *key2);
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize);
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded);
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx);
static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey);
static void tsdbResetHelperFileImpl(SRWHelper *pHelper);
static int tsdbInitHelperFile(SRWHelper *pHelper);
static void tsdbDestroyHelperFile(SRWHelper *pHelper);
static void tsdbResetHelperTableImpl(SRWHelper *pHelper);
static void tsdbResetHelperTable(SRWHelper *pHelper);
static void tsdbInitHelperTable(SRWHelper *pHelper);
static void tsdbDestroyHelperTable(SRWHelper *pHelper);
static void tsdbResetHelperBlockImpl(SRWHelper *pHelper);
static void tsdbResetHelperBlock(SRWHelper *pHelper);
static int tsdbInitHelperBlock(SRWHelper *pHelper);
static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type);
static int comparColIdCompCol(const void *arg1, const void *arg2);
static int comparColIdDataCol(const void *arg1, const void *arg2);
static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf);
static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds,
SDataCols *pDataCols);
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows,
int maxPoints, char *buffer, int bufferSize);
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols);
static void *tsdbEncodeSCompIdx(void *buf, SCompIdx *pIdx);
static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx);
// ---------------------- INTERNAL FUNCTIONS ----------------------
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
return tsdbInitHelper(pHelper, pRepo, TSDB_READ_HELPER);
......@@ -69,7 +102,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
pHelper->files.headF = pGroup->files[TSDB_FILE_TYPE_HEAD];
pHelper->files.dataF = pGroup->files[TSDB_FILE_TYPE_DATA];
pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST];
if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) {
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
char *fnameDup = strdup(pHelper->files.headF.fname);
if (fnameDup == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
......@@ -437,7 +470,7 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
return 0;
}
int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
int tsdbloadcompdata(srwhelper *phelper, scompblock *pcompblock, void *target) {
ASSERT(pCompBlock->numOfSubBlocks <= 1);
int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
......
......@@ -589,7 +589,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
}
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj));
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj));
if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, NULL) == 0) {
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
......@@ -836,7 +836,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
char* pData = NULL;
// the schema version info is embeded in SDataRow
STSchema* pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, dataRowVersion(row));
STSchema* pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
int32_t numOfRowCols = schemaNCols(pSchema);
int32_t i = 0, j = 0;
......
......@@ -31,7 +31,7 @@ typedef void (*_ref_fn_t)(const void* pObj);
_ref_fn_t end; \
} _ref_func = {.begin = (s), .end = (e)};
#define T_REF_INC(x) (atomic_add_fetch_16(&((x)->_ref.val), 1));
#define T_REF_INC(x) (atomic_add_fetch_16(&((x)->_ref.val), 1))
#define T_REF_INC_WITH_CB(x, p) \
do { \
......@@ -41,7 +41,7 @@ typedef void (*_ref_fn_t)(const void* pObj);
} \
} while (0)
#define T_REF_DEC(x) (atomic_sub_fetch_16(&((x)->_ref.val), 1));
#define T_REF_DEC(x) (atomic_sub_fetch_16(&((x)->_ref.val), 1))
#define T_REF_DEC_WITH_CB(x, p) \
do { \
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册