提交 5ff68c3c 编写于 作者: H Hongze Cheng

TD-353

上级 f4ca77f7
......@@ -117,6 +117,8 @@ typedef enum {
TSDB_FILE_TYPE_DATA,
TSDB_FILE_TYPE_LAST,
TSDB_FILE_TYPE_MAX,
TSDB_FILE_TYPE_NHEAD,
TSDB_FILE_TYPE_NLAST
} TSDB_FILE_TYPE;
typedef struct {
......@@ -272,7 +274,7 @@ typedef struct {
#define TABLE_TYPE(t) (t)->type
#define TABLE_NAME(t) (t)->name
#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data
#define TALBE_UID(t) (t)->tableId.uid
#define TABLE_UID(t) (t)->tableId.uid
#define TABLE_TID(t) (t)->tableId.tid
#define TABLE_SUID(t) (t)->suid
#define TABLE_LASTKEY(t) (t)->lastKey
......@@ -328,6 +330,9 @@ int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type);
SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags);
void tsdbFitRetention(STsdbRepo* pRepo);
int tsdbUpdateFileHeader(SFile* pFile, uint32_t version);
void* tsdbEncodeSFileInfo(void* buf, const STsdbFileInfo* pInfo);
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
int tsdbCpySFile(SFile* src, SFile* dst);
// ------------------ tsdbRWHelper.c
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
......@@ -359,17 +364,16 @@ int tsdbWriteCompInfo(SRWHelper* pHelper);
int tsdbWriteCompIdx(SRWHelper* pHelper);
int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
int tsdbloadcompdata(srwhelper* phelper, scompblock* pcompblock, void* target);
int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target);
void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
int tsdbLoadBlockDataCols(SRWHelper* pHelper, SDataCols* pDataCols, int blkIdx, int16_t* colIds, int numOfColIds);
int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SDataCols* target);
int tsdbUpdateFileHeader(SFile* pFile, uint32_t version);
void* tsdbEncodeSFileInfo(void* buf, const STsdbFileInfo* pInfo);
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
// ------------------ tsdbMain.c
#define REPO_ID(r) (r)->config.tsdbId
#define IS_REPO_LOCKED(r) (r)->repoLocked
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
char* tsdbGetMetaFileName(char* rootDir);
char* tsdbGetDataFileName(STsdbRepo* pRepo, int fid, int type);
......@@ -379,43 +383,6 @@ char* tsdbGetDataDirName(char* rootDir);
STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo);
STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo);
#if 0
// --------- Helper state
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo);
int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo);
void tsdbDestroyHelper(SRWHelper *pHelper);
void tsdbResetHelper(SRWHelper *pHelper);
// --------- For set operations
int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup);
void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo);
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError);
// --------- For read operations
int tsdbLoadCompIdx(SRWHelper *pHelper, void *target);
int tsdbLoadCompInfo(SRWHelper *pHelper, void *target);
int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target);
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds);
int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *target);
void tsdbGetDataStatis(SRWHelper *pHelper, SDataStatis *pStatis, int numOfCols);
// --------- For write operations
int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols);
int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper);
int tsdbWriteCompInfo(SRWHelper *pHelper);
int tsdbWriteCompIdx(SRWHelper *pHelper);
// --------- Other functions need to further organize
void tsdbFitRetention(STsdbRepo *pRepo);
int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
void tsdbAdjustCacheBlocks(STsdbCache *pCache);
int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
int tsdbUpdateFileHeader(SFile *pFile, uint32_t version);
#endif
#ifdef __cplusplus
}
#endif
......
......@@ -29,15 +29,13 @@
#include "tutil.h"
#include "ttime.h"
const char *tsdbFileSuffix[] = {".head", ".data", ".last"};
const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".h"};
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
static void tsdbDestroyFile(SFile *pFile);
static int compFGroup(const void *arg1, const void *arg2);
static int keyFGroupCompFunc(const void *key, const void *fgroup);
static void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup);
static void *tsdbEncodeSFileInfo(void *buf, const STsdbFileInfo *pInfo);
static void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo);
// ---------------- INTERNAL FUNCTIONS ----------------
STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
......@@ -323,6 +321,40 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
return 0;
}
void *tsdbEncodeSFileInfo(void *buf, const STsdbFileInfo *pInfo) {
buf = taosEncodeFixedU32(buf, pInfo->offset);
buf = taosEncodeFixedU32(buf, pInfo->len);
buf = taosEncodeFixedU64(buf, pInfo->size);
buf = taosEncodeFixedU64(buf, pInfo->tombSize);
buf = taosEncodeFixedU32(buf, pInfo->totalBlocks);
buf = taosEncodeFixedU32(buf, pInfo->totalSubBlocks);
return buf;
}
void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->offset));
buf = taosDecodeFixedU32(buf, &(pInfo->len));
buf = taosDecodeFixedU64(buf, &(pInfo->size));
buf = taosDecodeFixedU64(buf, &(pInfo->tombSize));
buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks));
return buf;
}
int tsdbCpySFile(SFile *src, SFile *dst) {
*dst = *src;
dst->fname = strdup(dst->fname);
if (dst->fname == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
return 0;
}
// ---------------- LOCAL FUNCTIONS ----------------
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
uint32_t version;
......@@ -404,26 +436,4 @@ static void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
remove(fileGroup.files[type].fname);
tsdbDestroyFile(&fileGroup.files[type]);
}
}
static void *tsdbEncodeSFileInfo(void *buf, const STsdbFileInfo *pInfo) {
buf = taosEncodeFixedU32(buf, pInfo->offset);
buf = taosEncodeFixedU32(buf, pInfo->len);
buf = taosEncodeFixedU64(buf, pInfo->size);
buf = taosEncodeFixedU64(buf, pInfo->tombSize);
buf = taosEncodeFixedU32(buf, pInfo->totalBlocks);
buf = taosEncodeFixedU32(buf, pInfo->totalSubBlocks);
return buf;
}
static void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->offset));
buf = taosDecodeFixedU32(buf, &(pInfo->len));
buf = taosDecodeFixedU64(buf, &(pInfo->size));
buf = taosDecodeFixedU64(buf, &(pInfo->tombSize));
buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks));
return buf;
}
\ No newline at end of file
......@@ -63,6 +63,8 @@ static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIt
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression);
static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep);
static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables);
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
static int keyFGroupCompFunc(const void *key, const void *fgroup);
// Function declaration
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
......@@ -198,7 +200,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
}
} else {
SFileGroup *pFGroup =
taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), compFGroupKey, TD_GE);
taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, TD_GE);
if (pFGroup->fileId == fid) {
strcpy(fname, pFGroup->files[(*index) % 3].fname);
} else {
......@@ -248,7 +250,7 @@ void tsdbStartStream(TSDB_REPO_T *repo) {
for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TALBE_UID(pTable), TABLE_TID(pTable), pTable->sql,
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
tsdbGetTableSchema(pTable));
}
}
......@@ -744,7 +746,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
if (dataRowKey(row) < minKey || dataRowKey(row) > maxKey) {
tsdbError("vgId:%d table %s tid %d uid %ld timestamp is out of range! now " PRId64 " maxKey " PRId64
" minKey " PRId64,
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TALBE_UID(pTable), now, minKey, maxKey);
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey);
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
return -1;
}
......@@ -802,7 +804,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
SFileGroup *pFGroup = NULL;
SFileGroupIter iter;
SRWHelper rhelper = {{0}};
SRWHelper rhelper = {0};
if (tsdbInitReadHelper(&rhelper, pRepo) < 0) goto _err;
......@@ -874,6 +876,47 @@ static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) {
tsdbTrace("vgId:%d, tsdb maxTables is changed from %d to %d!", pRepo->config.tsdbId, oldMaxTables, maxTables);
}
static int keyFGroupCompFunc(const void *key, const void *fgroup) {
int fid = *(int *)key;
SFileGroup *pFGroup = (SFileGroup *)fgroup;
if (fid == pFGroup->fileId) {
return 0;
} else {
return fid > pFGroup->fileId ? 1 : -1;
}
}
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
// TODO
// STsdbCache *pCache = pRepo->tsdbCache;
// int oldNumOfBlocks = pCache->totalCacheBlocks;
// tsdbLockRepo((TsdbRepoT *)pRepo);
// ASSERT(pCache->totalCacheBlocks != totalBlocks);
// if (pCache->totalCacheBlocks < totalBlocks) {
// ASSERT(pCache->totalCacheBlocks == pCache->pool.numOfCacheBlocks);
// int blocksToAdd = pCache->totalCacheBlocks - totalBlocks;
// pCache->totalCacheBlocks = totalBlocks;
// for (int i = 0; i < blocksToAdd; i++) {
// if (tsdbAddCacheBlockToPool(pCache) < 0) {
// tsdbUnLockRepo((TsdbRepoT *)pRepo);
// tsdbError("tsdbId:%d, failed to add cache block to cache pool", pRepo->config.tsdbId);
// return -1;
// }
// }
// } else {
// pCache->totalCacheBlocks = totalBlocks;
// tsdbAdjustCacheBlocks(pCache);
// }
// pRepo->config.totalBlocks = totalBlocks;
// tsdbUnLockRepo((TsdbRepoT *)pRepo);
// tsdbTrace("vgId:%d, tsdb total cache blocks changed from %d to %d", pRepo->config.tsdbId, oldNumOfBlocks, totalBlocks);
return 0;
}
#if 0
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) {
......
......@@ -51,14 +51,16 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
TSKEY key = dataRowKey(row);
SMemTable * pMemTable = pRepo->mem;
STableData *pTableData = NULL;
SSkipList * pSList = NULL;
int bytes = 0;
if (pMemTable != NULL && pMemTable->tData[TABLE_TID(pTable)] != NULL &&
pMemTable->tData[TABLE_TID(pTable)]->uid == TALBE_UID(pTable)) {
pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) {
pTableData = pMemTable->tData[TABLE_TID(pTable)];
pSList = pTableData->pData;
}
tSkipListNewNodeInfo(pTableData, &level, &headSize);
tSkipListNewNodeInfo(pSList, &level, &headSize);
bytes = headSize + dataRowLen(row);
SSkipListNode *pNode = tsdbAllocBytes(pRepo, bytes);
......@@ -75,7 +77,7 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
pMemTable = pRepo->mem;
pTableData = pMemTable->tData[TABLE_TID(pTable)];
if (pTableData == NULL || pTableData->uid != TALBE_UID(pTable)) {
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
if (pTableData != NULL) { // destroy the table skiplist (may have race condition problem)
pMemTable->tData[TABLE_TID(pTable)] = NULL;
tsdbFreeTableData(pTableData);
......@@ -92,7 +94,7 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
pRepo->mem->tData[TABLE_TID(pTable)] = pTableData;
}
ASSERT((pTableData != NULL) && pTableData->uid == TALBE_UID(pTable));
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
if (tSkipListPut(pTableData->pData, pNode) == NULL) {
tsdbFreeBytes(pRepo, (void *)pNode, bytes);
......@@ -112,7 +114,7 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
}
tsdbTrace("vgId:%d a row is inserted to table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TALBE_UID(pTable), key);
TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), key);
return 0;
}
......@@ -322,7 +324,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
goto _err;
}
pTableData->uid = TALBE_UID(pTable);
pTableData->uid = TABLE_UID(pTable);
pTableData->keyFirst = INT64_MAX;
pTableData->keyLast = 0;
pTableData->numOfRows = 0;
......@@ -355,7 +357,6 @@ static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(data); }
static void *tsdbCommitData(void *arg) {
STsdbRepo * pRepo = (STsdbRepo *)arg;
STsdbMeta * pMeta = pRepo->tsdbMeta;
SMemTable * pMem = pRepo->imem;
STsdbCfg * pCfg = &pRepo->config;
SDataCols * pDataCols = NULL;
......@@ -382,7 +383,7 @@ static void *tsdbCommitData(void *arg) {
if ((pDataCols = tdNewDataCols(pMem->maxRowBytes, pMem->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
REPO_ID(pRepo), pMem->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
REPO_ID(pRepo), pMem->maxRowBytes, pMem->maxCols, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
goto _exit;
}
......@@ -446,7 +447,6 @@ static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TS
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
char * dataDir = NULL;
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
STsdbCfg * pCfg = &pRepo->config;
SFileGroup *pGroup = NULL;
......@@ -467,7 +467,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
return -1;
}
if ((pGroup = tsdbCreateFGroupIfNeed(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) {
if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid, pCfg->maxTables)) == NULL) {
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
......@@ -485,7 +485,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
SCommitIter *pIter = iters + tid;
if (pIter->pTable == NULL) continue;
tsdbSetHelperTable(pHelper, pTable, pRepo);
tsdbSetHelperTable(pHelper, pIter->pTable, pRepo);
if (pIter->pIter != NULL) {
tdInitDataCols(pDataCols, tsdbGetTableSchema(pIter->pTable));
......@@ -574,7 +574,7 @@ static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) {
if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err;
for (int i = 0; i < pCfg->maxTables; i++) {
if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TALBE_UID(iters[i].pTable) == pMem->tData[i]->uid)) {
if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) {
if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
......@@ -624,7 +624,7 @@ static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIter
if (dataRowKey(row) > maxKey) break;
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, dataRowVersion(row));
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
if (pSchema == NULL) {
// TODO: deal with the error here
ASSERT(false);
......
......@@ -58,7 +58,7 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
STable *pTable = tsdbGetTableByUid(pMeta, pCfg->tableId.uid);
if (pTable != NULL) {
tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
TABLE_TID(pTable), TALBE_UID(pTable));
TABLE_TID(pTable), TABLE_UID(pTable));
return TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
}
......@@ -121,7 +121,7 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) {
tsdbTrace("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, varDataVal(pTable->name),
tableId.tid, tableId.uid);
tsdbRemoveTableFromMeta(pMeta, pTable, true);
tsdbRemoveTableFromMeta(pRepo, pTable, true);
return 0;
}
......@@ -473,7 +473,6 @@ int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg) {
if (isChanged) {
char *buf = malloc(1024 * 1024);
int bufLen = 0;
tsdbEncodeTable(buf, pTable);
// tsdbInsertMetaRecord(pMeta->mfh, pTable->tableId.uid, buf, bufLen);
free(buf);
......@@ -536,7 +535,6 @@ static int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
STsdbRepo *pRepo = (STsdbRepo *)pHandle;
STsdbMeta *pMeta = pRepo->tsdbMeta;
STable * pTable = NULL;
if (!taosCheckChecksumWhole((uint8_t *)cont, contLen)) {
......@@ -546,21 +544,22 @@ static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
tsdbDecodeTable(cont, &pTable);
if (tsdbAddTableToMeta(pMeta, pTable, false) < 0) {
if (tsdbAddTableToMeta(pRepo, pTable, false) < 0) {
tsdbFreeTable(pTable);
return -1;
}
tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is restored from file", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
TABLE_TID(pTable), TALBE_UID(pTable));
TABLE_TID(pTable), TABLE_UID(pTable));
return 0;
}
static void tsdbOrgMeta(void *pHandle) {
STsdbRepo *pRepo = (STsdbRepo *)pHandle;
STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbCfg * pCfg = &pRepo->config;
for (int i = 1; i < pMeta->maxTables; i++) {
for (int i = 1; i < pCfg->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable != NULL && pTable->type == TSDB_CHILD_TABLE) {
tsdbAddTableIntoIndex(pMeta, pTable);
......@@ -572,7 +571,7 @@ static char *getTagIndexKey(const void *pData) {
STable *pTable = *(STable **)pData;
STSchema *pSchema = tsdbGetTableTagSchema(pTable);
STColumn *pCol = schemaColAt(DEFAULT_TAG_INDEX_COLUMN);
STColumn *pCol = schemaColAt(pSchema, DEFAULT_TAG_INDEX_COLUMN);
void * res = tdGetKVRowValOfCol(pTable->tagVal, pCol->colId);
return res;
}
......@@ -596,7 +595,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
goto _err;
}
STR_WITH_SIZE_TO_VARSTR(pTable->name, pCfg->sname, tsize);
TALBE_UID(pTable) = pCfg->superUid;
TABLE_UID(pTable) = pCfg->superUid;
TABLE_TID(pTable) = -1;
TABLE_SUID(pTable) = -1;
pTable->pSuper = NULL;
......@@ -627,7 +626,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
goto _err;
}
STR_WITH_SIZE_TO_VARSTR(pTable->name, pCfg->name, tsize);
TALBE_UID(pTable) = pCfg->tableId.uid;
TABLE_UID(pTable) = pCfg->tableId.uid;
TABLE_TID(pTable) = pCfg->tableId.tid;
if (pCfg->type == TSDB_CHILD_TABLE) {
......@@ -741,7 +740,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
if (addIdx && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
TABLE_TID(pTable), TALBE_UID(pTable));
TABLE_TID(pTable), TABLE_UID(pTable));
return 0;
_err:
......@@ -778,7 +777,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
pMeta->nTables--;
}
taosHashRemove(pMeta->uid, (char *)(&(TALBE_UID(pTable))), sizeof(TALBE_UID(pTable)));
taosHashRemove(pMeta->uidMap, (char *)(&(TABLE_UID(pTable))), sizeof(TABLE_UID(pTable)));
if (rmFromIdx) tsdbUnlockRepoMeta(pRepo);
}
......@@ -804,8 +803,7 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
}
pNode->level = level;
SSkipList *list = pSTable->pIndex;
(STable *)(SL_GET_NODE_DATA(pNode)) = pTable;
memcpy(SL_GET_NODE_DATA(pNode), &pTable, sizeof(STable *));
tSkipListPut(pSTable->pIndex, pNode);
return 0;
......@@ -814,7 +812,7 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
ASSERT(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
STable *pSTable = tsdbGetTableByUid(pMeta, pTable->superUid);
STable *pSTable = pTable->pSuper;
ASSERT(pSTable != NULL);
STSchema *pSchema = tsdbGetTableTagSchema(pTable);
......@@ -1008,7 +1006,7 @@ static void *tsdbEncodeTable(void *buf, STable *pTable) {
buf = taosEncodeFixedU8(buf, pTable->type);
buf = tsdbEncodeTableName(buf, pTable->name);
buf = taosEncodeFixedU64(buf, TALBE_UID(pTable));
buf = taosEncodeFixedU64(buf, TABLE_UID(pTable));
buf = taosEncodeFixedI32(buf, TABLE_TID(pTable));
if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) {
......@@ -1038,10 +1036,12 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
uint8_t type = 0;
buf = taosDecodeFixedU8(buf, &(pTable->type));
buf = taosDecodeFixedU8(buf, &type);
pTable->type = type;
buf = tsdbDecodeTableName(buf, &(pTable->name));
buf = taosDecodeFixedU64(buf, &TALBE_UID(pTable));
buf = taosDecodeFixedU64(buf, &TABLE_UID(pTable));
buf = taosDecodeFixedI32(buf, &TABLE_TID(pTable));
if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) {
......
......@@ -52,6 +52,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols);
static void *tsdbEncodeSCompIdx(void *buf, SCompIdx *pIdx);
static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx);
static void tsdbDestroyHelperBlock(SRWHelper *pHelper);
// ---------------------- INTERNAL FUNCTIONS ----------------------
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
......@@ -99,20 +100,13 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
// Set the files
pHelper->files.fid = pGroup->fileId;
pHelper->files.headF = pGroup->files[TSDB_FILE_TYPE_HEAD];
pHelper->files.dataF = pGroup->files[TSDB_FILE_TYPE_DATA];
pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST];
tsdbCpySFile(&pHelper->files.headF, &pGroup->files[TSDB_FILE_TYPE_HEAD]);
tsdbCpySFile(&pHelper->files.dataF, &pGroup->files[TSDB_FILE_TYPE_DATA]);
tsdbCpySFile(&pHelper->files.lastF, &pGroup->files[TSDB_FILE_TYPE_LAST]);
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
char *fnameDup = strdup(pHelper->files.headF.fname);
if (fnameDup == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
char *dataDir = dirname(fnameDup);
tsdbGetFileName(dataDir, pHelper->files.fid, ".h", pHelper->files.nHeadF.fname);
tsdbGetFileName(dataDir, pHelper->files.fid, ".l", pHelper->files.nLastF.fname);
free((void *)fnameDup);
pHelper->files.nHeadF.fname = tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD);
pHelper->files.nLastF.fname = tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST);
}
// Open the files
......@@ -123,10 +117,10 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
// Create and open .h
if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) return -1;
// size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM);
// size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM);
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
TSDB_FILE_HEAD_SIZE, hpHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno));
TSDB_FILE_HEAD_SIZE, pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno));
errno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
......@@ -136,7 +130,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err;
if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE)
tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
TSDB_FILE_HEAD_SIZE, hpHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno));
TSDB_FILE_HEAD_SIZE, pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
......@@ -229,13 +223,15 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
* -1 for failure
*/
int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
ASSERT(pDataCols->numOfRows > 0);
SCompBlock compBlock;
int rowsToWrite = 0;
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
STsdbCfg *pCfg = &pHelper->pRepo->config;
ASSERT(helperHasState(pHelper, TSDB_HELPER_IDX_LOAD));
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; // for change purpose
......@@ -249,7 +245,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
SFile *pWFile = NULL;
bool isLast = false;
if (rowsToWrite >= pHelper->config.minRowsPerFileBlock) {
if (rowsToWrite >= pCfg->minRowsPerFileBlock) {
pWFile = &(pHelper->files.dataF);
} else {
isLast = true;
......@@ -293,7 +289,9 @@ _err:
}
int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
STsdbCfg *pCfg = &pHelper->pRepo->config;
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
SCompBlock compBlock;
if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) {
......@@ -305,7 +303,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
if (pCompBlock->numOfSubBlocks > 1) {
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows > 0 &&
pHelper->pDataCols[0]->numOfRows < pHelper->config.minRowsPerFileBlock);
pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0)
return -1;
......@@ -355,16 +353,17 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
}
int tsdbWriteCompIdx(SRWHelper *pHelper) {
ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
STsdbCfg *pCfg = &pHelper->pRepo->config;
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
off_t offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
if (offset < 0) return -1;
SFile *pFile = &(pHelper->files.nHeadF);
pFile->info.offset = offset;
// TODO: change the implementation of pHelper->pBuffer
void *buf = pHelper->pBuffer;
for (uint32_t i = 0; i < pHelper->config.maxTables; i++) {
for (uint32_t i = 0; i < pCfg->maxTables; i++) {
SCompIdx *pCompIdx = pHelper->pCompIdx + i;
if (pCompIdx->offset > 0) {
int drift = POINTER_DISTANCE(buf, pHelper->pBuffer);
......@@ -386,6 +385,8 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
}
int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
STsdbCfg *pCfg = &(pHelper->pRepo->config);
ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN);
if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) {
......@@ -425,7 +426,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
while (POINTER_DISTANCE(ptr, pHelper->pBuffer) < (pFile->info.len - sizeof(TSCKSUM))) {
uint32_t tid = 0;
if ((ptr = taosDecodeVariantU32(ptr, &tid)) == NULL) return -1;
ASSERT(tid > 0 && tid < pHelper->config.maxTables);
ASSERT(tid > 0 && tid < pCfg->maxTables);
if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1;
......@@ -470,7 +471,7 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
return 0;
}
int tsdbloadcompdata(srwhelper *phelper, scompblock *pcompblock, void *target) {
int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
ASSERT(pCompBlock->numOfSubBlocks <= 1);
int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd;
......@@ -614,12 +615,14 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite,
SCompBlock *pCompBlock, bool isLast, bool isSuperBlock) {
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pHelper->config.maxRowsPerFileBlock);
ASSERT(isLast ? rowsToWrite < pHelper->config.minRowsPerFileBlock : true);
STsdbCfg *pCfg = &(pHelper->pRepo->config);
SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
int64_t offset = 0;
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows && rowsToWrite <= pCfg->maxRowsPerFileBlock);
ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true);
offset = lseek(pFile->fd, 0, SEEK_END);
if (offset < 0) {
tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
......@@ -668,8 +671,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
if (pHelper->config.compress) {
if (pHelper->config.compress == TWO_STAGE_COMP) {
if (pCfg->compression) {
if (pCfg->compression == TWO_STAGE_COMP) {
pHelper->compBuffer = trealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES);
if (pHelper->compBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
......@@ -678,7 +681,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
}
pCompCol->len = (*(tDataTypeDesc[pDataCol->type].compFunc))(
(char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->pBuffer) - lsize, pHelper->config.compress,
(char *)pDataCol->pData, tlen, rowsToWrite, tptr, tsizeof(pHelper->pBuffer) - lsize, pCfg->compression,
pHelper->compBuffer, tsizeof(pHelper->compBuffer));
} else {
pCompCol->len = tlen;
......@@ -710,7 +713,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
// Update pCompBlock membership vairables
pCompBlock->last = isLast;
pCompBlock->offset = offset;
pCompBlock->algorithm = pHelper->config.compress;
pCompBlock->algorithm = pCfg->compression;
pCompBlock->numOfRows = rowsToWrite;
pCompBlock->sversion = pHelper->tableInfo.sversion;
pCompBlock->len = (int32_t)lsize;
......@@ -747,6 +750,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
// TODO: set pHelper->hasOldBlock
int rowsWritten = 0;
SCompBlock compBlock = {0};
STsdbCfg * pCfg = &pHelper->pRepo->config;
ASSERT(pDataCols->numOfRows > 0);
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
......@@ -760,13 +764,13 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
// ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock &&
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfRows < pCfg->minRowsPerFileBlock &&
blkIdx == pIdx->numOfBlocks - 1);
int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5; // TODO: make a interface
int defaultRowsToWrite = pCfg->maxRowsPerFileBlock * 4 / 5; // TODO: make a interface
rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows);
if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
(blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pHelper->config.minRowsPerFileBlock) &&
(blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pCfg->minRowsPerFileBlock) &&
(pHelper->files.nLastF.fd) < 0) {
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0)
goto _err;
......@@ -780,7 +784,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
// Write
SFile *pWFile = NULL;
bool isLast = false;
if (pHelper->pDataCols[0]->numOfRows >= pHelper->config.minRowsPerFileBlock) {
if (pHelper->pDataCols[0]->numOfRows >= pCfg->minRowsPerFileBlock) {
pWFile = &(pHelper->files.dataF);
} else {
isLast = true;
......@@ -804,7 +808,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
int rows1 =
tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast);
// rows2: max number of rows the block can have more
int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows;
int rows2 = pCfg->maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows;
// rows3: number of rows between this block and the next block
int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit);
......@@ -812,7 +816,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
if ((rows2 >= rows1) && (blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
((!blockAtIdx(pHelper, blkIdx)->last) ||
((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock) &&
((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pCfg->minRowsPerFileBlock) &&
(pHelper->files.nLastF.fd < 0)))) {
rowsWritten = rows1;
bool isLast = false;
......@@ -841,7 +845,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
while (true) {
if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) break;
tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pHelper->pDataCols[0]->numOfRows,
pDataCols, &iter2, rowsWritten, pHelper->config.maxRowsPerFileBlock * 4 / 5);
pDataCols, &iter2, rowsWritten, pCfg->maxRowsPerFileBlock * 4 / 5);
ASSERT(pHelper->pDataCols[1]->numOfRows > 0);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1],
pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
......
......@@ -2119,7 +2119,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
}
SArray* res = taosArrayInit(8, sizeof(STableId));
STSchema* pTagSchema = tsdbGetTableTagSchema(tsdbGetMeta(tsdb), pTable);
STSchema* pTagSchema = tsdbGetTableTagSchema(pTable);
// no tags and tbname condition, all child tables of this stable are involved
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册