提交 31824d9f 编写于 作者: H Hongze Cheng

get rid of maxTables in TSDB

上级 50b968c2
......@@ -199,6 +199,7 @@ typedef struct {
// ------------------ tsdbRWHelper.c
typedef struct {
int32_t tid;
uint32_t len;
uint32_t offset;
uint32_t hasLast : 2;
......@@ -262,9 +263,14 @@ typedef struct {
typedef struct {
uint64_t uid;
int32_t tid;
int32_t sversion;
} SHelperTable;
typedef struct {
SCompIdx* pIdxArray;
int numOfIdx;
int curIdx;
} SIdxH;
typedef struct {
tsdb_rw_helper_t type;
......@@ -272,7 +278,9 @@ typedef struct {
int8_t state;
// For file set usage
SHelperFile files;
SCompIdx* pCompIdx;
SIdxH idxH;
SCompIdx curCompIdx;
void* pWIdx;
// For table set usage
SHelperTable tableInfo;
SCompInfo* pCompInfo;
......@@ -284,7 +292,6 @@ typedef struct {
void* compBuffer; // Buffer for temperary compress/decompress purpose
} SRWHelper;
// Operations
// ------------------ tsdbMeta.c
#define TABLE_TYPE(t) (t)->type
......
......@@ -793,7 +793,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
for (int i = 1; i < pRepo->config.maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue;
SCompIdx *pIdx = &rhelper.pCompIdx[i];
SCompIdx *pIdx = &(rhelper.curCompIdx);
if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey;
}
......
......@@ -60,6 +60,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter
int *blkIdx);
static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows);
static int tsdbCompareTidIdx(const void *key1, const void *key2);
// ---------------------- INTERNAL FUNCTIONS ----------------------
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
......@@ -251,17 +252,35 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
pHelper->tableInfo.tid = pTable->tableId.tid;
pHelper->tableInfo.uid = pTable->tableId.uid;
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
pHelper->tableInfo.sversion = schemaVersion(pSchema);
tdInitDataCols(pHelper->pDataCols[0], pSchema);
tdInitDataCols(pHelper->pDataCols[1], pSchema);
SCompIdx *pIdx = pHelper->pCompIdx + pTable->tableId.tid;
if (pIdx->offset > 0) {
if (pIdx->uid != TABLE_UID(pTable)) {
memset((void *)pIdx, 0, sizeof(SCompIdx));
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
if (pHelper->idxH.numOfIdx > 0) {
if (pHelper->idxH.curIdx >= pHelper->idxH.numOfIdx) {
memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx));
} else {
SCompIdx *pIdx = &(pHelper->idxH.pIdxArray[pHelper->idxH.curIdx]);
if (pIdx->tid == TABLE_TID(pTable)) {
pHelper->curCompIdx = *pIdx;
pHelper->idxH.curIdx++;
} else {
ASSERT(pIdx->tid > TABLE_TID(pTable));
memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx));
}
}
} else {
memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx));
}
} else {
// TODO: make it more efficient
void *ptr = bsearch(&TABLE_TID(pTable), (void *)pHelper->idxH.pIdxArray, pHelper->idxH.numOfIdx, sizeof(SCompIdx),
tsdbCompareTidIdx);
if (ptr == NULL) {
memset(&(pHelper->curCompIdx), 0, sizeof(SCompIdx));
} else {
if (pIdx->hasLast) pHelper->hasOldLastBlock = true;
pHelper->curCompIdx = *(SCompIdx *)ptr;
}
}
......@@ -272,8 +291,8 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) {
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
SCompIdx * pIdx = &(pHelper->pCompIdx[TABLE_TID(pCommitIter->pTable)]);
int blkIdx = 0;
SCompIdx *pIdx = &(pHelper->curCompIdx);
int blkIdx = 0;
ASSERT(pIdx->offset == 0 || pIdx->uid == TABLE_UID(pCommitIter->pTable));
if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;
......@@ -298,7 +317,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
STsdbCfg *pCfg = &pHelper->pRepo->config;
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
SCompIdx * pIdx = &(pHelper->curCompIdx);
SCompBlock compBlock = {0};
if (TSDB_NLAST_FILE_OPENED(pHelper) && (pHelper->hasOldLastBlock)) {
if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;
......@@ -344,10 +363,11 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
}
int tsdbWriteCompInfo(SRWHelper *pHelper) {
off_t offset = 0;
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
if (pIdx->offset > 0) {
SCompIdx *pIdx = &(pHelper->curCompIdx);
off_t offset = 0;
if (pIdx->len > 0) {
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
offset = lseek(helperNewHeadF(pHelper)->fd, 0, SEEK_END);
if (offset < 0) {
tsdbError("vgId:%d failed to lseed file %s since %s", REPO_ID(pHelper->pRepo), helperNewHeadF(pHelper)->fname,
......@@ -357,6 +377,8 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
}
pIdx->offset = offset;
pIdx->uid = pHelper->tableInfo.uid;
pIdx->tid = pHelper->tableInfo.tid;
ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
if (tsendfile(helperNewHeadF(pHelper)->fd, helperHeadF(pHelper)->fd, NULL, pIdx->len) < pIdx->len) {
......@@ -365,9 +387,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
} else {
if (pIdx->len > 0) {
} else {
pHelper->pCompInfo->delimiter = TSDB_FILE_DELIMITER;
pHelper->pCompInfo->uid = pHelper->tableInfo.uid;
pHelper->pCompInfo->checksum = 0;
......@@ -383,6 +403,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
}
pIdx->offset = offset;
pIdx->uid = pHelper->tableInfo.uid;
pIdx->tid = pHelper->tableInfo.tid;
ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
if (twrite(helperNewHeadF(pHelper)->fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) {
......@@ -392,6 +413,17 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
return -1;
}
}
if (tsizeof(pHelper->pWIdx) < helperNewIdxF(pHelper)->info.len + sizeof(SCompIdx) + 12) {
pHelper->pWIdx = trealloc(pHelper->pWIdx, tsizeof(pHelper->pWIdx) == 0 ? 1024 : tsizeof(pHelper->pWIdx) * 2);
if (pHelper->pWIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
void *pBuf = POINTER_SHIFT(pHelper->pWIdx, helperNewIdxF(pHelper)->info.len);
helperNewIdxF(pHelper)->info.len += tsdbEncodeSCompIdx(&pBuf, &(pHelper->curCompIdx));
}
return 0;
......@@ -399,57 +431,43 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
int tsdbWriteCompIdx(SRWHelper *pHelper) {
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
STsdbCfg *pCfg = &pHelper->pRepo->config;
// STsdbCfg *pCfg = &pHelper->pRepo->config;
SFile *pFile = helperNewIdxF(pHelper);
void *buf = pHelper->pBuffer;
for (uint32_t i = 0; i < pCfg->maxTables; i++) {
SCompIdx *pCompIdx = pHelper->pCompIdx + i;
if (pCompIdx->offset > 0) {
int drift = POINTER_DISTANCE(buf, pHelper->pBuffer);
if (tsizeof(pHelper->pBuffer) - drift < 128) {
pHelper->pBuffer = trealloc(pHelper->pBuffer, tsizeof(pHelper->pBuffer) * 2);
if (pHelper->pBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
buf = POINTER_SHIFT(pHelper->pBuffer, drift);
taosEncodeVariantU32(&buf, i);
tsdbEncodeSCompIdx(&buf, pCompIdx);
pFile->info.len += sizeof(TSCKSUM);
if (tsizeof(pHelper->pWIdx) < pFile->info.len) {
pHelper->pWIdx = trealloc(pHelper->pWIdx, pFile->info.len);
if (pHelper->pWIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pWIdx, pFile->info.len);
int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize);
if (twrite(pFile->fd, (void *)pHelper->pBuffer, tsize) < tsize) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), tsize, pFile->fname,
strerror(errno));
if (twrite(pFile->fd, (void *)pHelper->pWIdx, pFile->info.len) < pFile->info.len) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pFile->info.len = tsize;
return 0;
}
int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
STsdbCfg *pCfg = &(pHelper->pRepo->config);
ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN);
SFile *pFile = helperIdxF(pHelper);
int fd = pFile->fd;
if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) {
// If not load from file, just load it in object
SFile *pFile = helperIdxF(pHelper);
int fd = pFile->fd;
memset(pHelper->pCompIdx, 0, tsizeof(pHelper->pCompIdx));
if (pFile->info.len > 0) {
if ((pHelper->pBuffer = trealloc(pHelper->pBuffer, pFile->info.len)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -462,6 +480,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) {
tsdbError("vgId:%d file %s SCompIdx part is corrupted. len %u", REPO_ID(pHelper->pRepo), pFile->fname,
pFile->info.len);
......@@ -470,27 +489,49 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
}
// Decode it
pHelper->idxH.numOfIdx = 0;
void *ptr = pHelper->pBuffer;
while (POINTER_DISTANCE(ptr, pHelper->pBuffer) < (pFile->info.len - sizeof(TSCKSUM))) {
uint32_t tid = 0;
if ((ptr = taosDecodeVariantU32(ptr, &tid)) == NULL) return -1;
ASSERT(tid > 0 && tid < pCfg->maxTables);
size_t tlen = tsizeof(pHelper->idxH.pIdxArray);
pHelper->idxH.numOfIdx++;
if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1;
if (tlen < pHelper->idxH.numOfIdx) {
pHelper->idxH.pIdxArray = (SCompIdx *)trealloc(pHelper->idxH.pIdxArray, (tlen == 0) ? 1024 : tlen * 2);
if (pHelper->idxH.pIdxArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
ptr = tsdbDecodeSCompIdx(ptr, &(pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 1]));
if (ptr == NULL) {
tsdbError("vgId:%d file %s SCompIdx part is corrupted. len %u", REPO_ID(pHelper->pRepo), pFile->fname,
pFile->info.len);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
ASSERT(pHelper->idxH.numOfIdx == 1 || pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 1].tid >
pHelper->idxH.pIdxArray[pHelper->idxH.numOfIdx - 2].tid);
ASSERT(POINTER_DISTANCE(ptr, pHelper->pBuffer) <= pFile->info.len - sizeof(TSCKSUM));
}
if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
// if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {
// terrno = TAOS_SYSTEM_ERROR(errno);
// return -1;
// }
}
}
helperSetState(pHelper, TSDB_HELPER_IDX_LOAD);
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
pFile->info.len = 0;
}
// Copy the memory for outside usage
if (target) memcpy(target, pHelper->pCompIdx, tsizeof(pHelper->pCompIdx));
if (target && pHelper->idxH.numOfIdx > 0)
memcpy(target, pHelper->idxH.pIdxArray, sizeof(SCompIdx) * pHelper->idxH.numOfIdx);
return 0;
}
......@@ -498,7 +539,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET));
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
SCompIdx *pIdx = &(pHelper->curCompIdx);
int fd = helperHeadF(pHelper)->fd;
......@@ -820,7 +861,7 @@ static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) {
}
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) {
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
SCompIdx *pIdx = &(pHelper->curCompIdx);
ASSERT(blkIdx >= 0 && blkIdx <= pIdx->numOfBlocks);
ASSERT(pCompBlock->numOfSubBlocks == 1);
......@@ -867,7 +908,7 @@ _err:
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded) {
ASSERT(pCompBlock->numOfSubBlocks == 0);
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
SCompIdx *pIdx = &(pHelper->curCompIdx);
ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks);
SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
......@@ -951,7 +992,7 @@ _err:
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) {
ASSERT(pCompBlock->numOfSubBlocks == 1);
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
SCompIdx *pIdx = &(pHelper->curCompIdx);
ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks);
......@@ -987,6 +1028,8 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
}
static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
pHelper->idxH.numOfIdx = 0;
pHelper->idxH.curIdx = 0;
memset((void *)&pHelper->files, 0, sizeof(pHelper->files));
helperIdxF(pHelper)->fd = -1;
helperHeadF(pHelper)->fd = -1;
......@@ -998,14 +1041,6 @@ static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
}
static int tsdbInitHelperFile(SRWHelper *pHelper) {
STsdbCfg *pCfg = &pHelper->pRepo->config;
size_t tsize = sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM);
pHelper->pCompIdx = (SCompIdx *)tmalloc(tsize);
if (pHelper->pCompIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tsdbResetHelperFileImpl(pHelper);
return 0;
}
......@@ -1013,7 +1048,8 @@ static int tsdbInitHelperFile(SRWHelper *pHelper) {
static void tsdbDestroyHelperFile(SRWHelper *pHelper) {
tsdbCloseHelperFile(pHelper, false);
tsdbResetHelperFileImpl(pHelper);
tzfree(pHelper->pCompIdx);
tzfree(pHelper->idxH.pIdxArray);
tzfree(pHelper->pWIdx);
}
// ---------- Operations on Helper Table part
......@@ -1331,6 +1367,7 @@ _err:
static int tsdbEncodeSCompIdx(void **buf, SCompIdx *pIdx) {
int tlen = 0;
tlen += taosEncodeVariantI32(buf, pIdx->tid);
tlen += taosEncodeVariantU32(buf, pIdx->len);
tlen += taosEncodeVariantU32(buf, pIdx->offset);
tlen += taosEncodeFixedU8(buf, pIdx->hasLast);
......@@ -1346,6 +1383,7 @@ static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) {
uint32_t numOfBlocks = 0;
uint64_t value = 0;
if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL;
if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL;
if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL;
if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL;
......@@ -1363,7 +1401,7 @@ static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) {
static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) {
STsdbCfg * pCfg = &(pHelper->pRepo->config);
STable * pTable = pCommitIter->pTable;
SCompIdx * pIdx = pHelper->pCompIdx + TABLE_TID(pTable);
SCompIdx * pIdx = &(pHelper->curCompIdx);
TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5;
SCompBlock compBlock = {0};
......@@ -1410,7 +1448,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
int *blkIdx) {
STsdbCfg * pCfg = &(pHelper->pRepo->config);
STable * pTable = pCommitIter->pTable;
SCompIdx * pIdx = pHelper->pCompIdx + TABLE_TID(pTable);
SCompIdx * pIdx = &(pHelper->curCompIdx);
SCompBlock compBlock = {0};
TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5;
......@@ -1605,4 +1643,14 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols,
if (tsdbWriteBlockToFile(pHelper, pFile, pDataCols, pCompBlock, isLast, true) < 0) return -1;
return 0;
}
static int tsdbCompareTidIdx(const void *key1, const void *key2) {
if (*(int32_t *)key1 > ((SCompIdx *)key2)->tid) {
return 1;
} else if (*(int32_t *)key1 < ((SCompIdx *)key2)->tid) {
return -1;
} else {
return 0;
}
}
\ No newline at end of file
......@@ -555,7 +555,9 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
pCheckInfo->numOfBlocks = 0;
SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid];
tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb);
SCompIdx* compIndex = &pQueryHandle->rhelper.curCompIdx;
// no data block in this file, try next file
if (compIndex->len == 0 || compIndex->numOfBlocks == 0 || compIndex->uid != pCheckInfo->tableId.uid) {
......@@ -572,8 +574,6 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
pCheckInfo->compSize = compIndex->len;
}
tsdbSetHelperTable(&pQueryHandle->rhelper, pCheckInfo->pTableObj, pQueryHandle->pTsdb);
tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo));
SCompInfo* pCompInfo = pCheckInfo->pCompInfo;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册