提交 82d144a6 编写于 作者: H Hongze Cheng

more

上级 f318fdfd
......@@ -70,9 +70,6 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fi
// static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact);
// static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid);
// static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile);
// static void tsdbStartCommit(STsdbRepo *pRepo);
// static void tsdbEndCommit(STsdbRepo *pRepo, int eno);
// static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
// static int tsdbCreateCommitIters(SCommitH *pCommith);
// static void tsdbDestroyCommitIters(SCommitH *pCommith);
// static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key);
......@@ -80,11 +77,11 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fi
// static void tsdbDestroyCommitH(SCommitH *pCommith);
// static int tsdbGetFidLevel(int fid, SRtn *pRtn);
// static int tsdbNextCommitFid(SCommitH *pCommith);
static int tsdbCommitToTable(SCommitH *pCommith, int tid);
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
// static int tsdbWriteBlockInfo(SCommitH *pCommih);
// static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
static int tsdbCommitToTable(SCommitH *pCommith, int tid);
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
static int tsdbWriteBlockInfo(SCommitH *pCommih);
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
static int tsdbMoveBlock(SCommitH *pCommith, int bidx);
static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks);
......@@ -95,6 +92,7 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
SDiskID did;
......@@ -372,7 +370,6 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
return -1;
}
}
#if 0
if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) <
0) {
......@@ -398,7 +395,6 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
return -1;
}
#endif
return 0;
}
......@@ -582,107 +578,107 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// extern int32_t tsTsdbMetaCompactRatio;
// int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf,
// SBlockIdx *pIdx) {
// size_t nSupBlocks;
// size_t nSubBlocks;
// uint32_t tlen;
// SBlockInfo *pBlkInfo;
// int64_t offset;
// SBlock * pBlock;
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf,
SBlockIdx *pIdx) {
size_t nSupBlocks;
size_t nSubBlocks;
uint32_t tlen;
SBlockInfo *pBlkInfo;
int64_t offset;
SBlock * pBlock;
// memset(pIdx, 0, sizeof(*pIdx));
memset(pIdx, 0, sizeof(*pIdx));
// nSupBlocks = taosArrayGetSize(pSupA);
// nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA);
nSupBlocks = taosArrayGetSize(pSupA);
nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA);
// if (nSupBlocks <= 0) {
// // No data (data all deleted)
// return 0;
// }
if (nSupBlocks <= 0) {
// No data (data all deleted)
return 0;
}
// tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM));
// if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
// pBlkInfo = *ppBuf;
tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM));
if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
pBlkInfo = *ppBuf;
// pBlkInfo->delimiter = TSDB_FILE_DELIMITER;
// pBlkInfo->tid = TABLE_TID(pTable);
// pBlkInfo->uid = TABLE_UID(pTable);
pBlkInfo->delimiter = TSDB_FILE_DELIMITER;
pBlkInfo->tid = TABLE_TID(pTable);
pBlkInfo->uid = TABLE_UID(pTable);
// memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock));
// if (nSubBlocks > 0) {
// memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock));
memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock));
if (nSubBlocks > 0) {
memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock));
// for (int i = 0; i < nSupBlocks; i++) {
// pBlock = pBlkInfo->blocks + i;
for (int i = 0; i < nSupBlocks; i++) {
pBlock = pBlkInfo->blocks + i;
// if (pBlock->numOfSubBlocks > 1) {
// pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks);
// }
// }
// }
if (pBlock->numOfSubBlocks > 1) {
pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks);
}
}
}
// taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen);
taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen);
// if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) {
// return -1;
// }
if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) {
return -1;
}
// tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM)));
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM)));
// // Set pIdx
// pBlock = taosArrayGetLast(pSupA);
// Set pIdx
pBlock = taosArrayGetLast(pSupA);
// pIdx->tid = TABLE_TID(pTable);
// pIdx->uid = TABLE_UID(pTable);
// pIdx->hasLast = pBlock->last ? 1 : 0;
// pIdx->maxKey = pBlock->keyLast;
// pIdx->numOfBlocks = (uint32_t)nSupBlocks;
// pIdx->len = tlen;
// pIdx->offset = (uint32_t)offset;
pIdx->tid = TABLE_TID(pTable);
pIdx->uid = TABLE_UID(pTable);
pIdx->hasLast = pBlock->last ? 1 : 0;
pIdx->maxKey = pBlock->keyLast;
pIdx->numOfBlocks = (uint32_t)nSupBlocks;
pIdx->len = tlen;
pIdx->offset = (uint32_t)offset;
// return 0;
// }
return 0;
}
// int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
// SBlockIdx *pBlkIdx;
// size_t nidx = taosArrayGetSize(pIdxA);
// int tlen = 0, size;
// int64_t offset;
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
SBlockIdx *pBlkIdx;
size_t nidx = taosArrayGetSize(pIdxA);
int tlen = 0, size;
int64_t offset;
// if (nidx <= 0) {
// // All data are deleted
// pHeadf->info.offset = 0;
// pHeadf->info.len = 0;
// return 0;
// }
if (nidx <= 0) {
// All data are deleted
pHeadf->info.offset = 0;
pHeadf->info.len = 0;
return 0;
}
// for (size_t i = 0; i < nidx; i++) {
// pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i);
for (size_t i = 0; i < nidx; i++) {
pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i);
// size = tsdbEncodeSBlockIdx(NULL, pBlkIdx);
// if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1;
size = tsdbEncodeSBlockIdx(NULL, pBlkIdx);
if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1;
// void *ptr = POINTER_SHIFT(*ppBuf, tlen);
// tsdbEncodeSBlockIdx(&ptr, pBlkIdx);
void *ptr = POINTER_SHIFT(*ppBuf, tlen);
tsdbEncodeSBlockIdx(&ptr, pBlkIdx);
// tlen += size;
// }
tlen += size;
}
// tlen += sizeof(TSCKSUM);
// if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
// taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen);
tlen += sizeof(TSCKSUM);
if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen);
// if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) {
// return -1;
// }
if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) {
return -1;
}
// tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM)));
// pHeadf->info.offset = (uint32_t)offset;
// pHeadf->info.len = tlen;
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM)));
pHeadf->info.offset = (uint32_t)offset;
pHeadf->info.len = tlen;
// return 0;
// }
return 0;
}
// // =================== Commit Meta Data
// static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile* pMf, bool open) {
......@@ -1078,27 +1074,25 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
}
nextKey = tsdbNextIterKey(pIter->pIter);
} else {
// // Only commit memory data
// if (pBlock == NULL) {
// if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) {
// TSDB_RUNLOCK_TABLE(pIter->pTable);
// return -1;
// }
// } else {
// if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) {
// TSDB_RUNLOCK_TABLE(pIter->pTable);
// return -1;
// }
// }
// nextKey = tsdbNextIterKey(pIter->pIter);
// Only commit memory data
if (pBlock == NULL) {
if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) {
return -1;
}
} else {
if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) {
return -1;
}
}
nextKey = tsdbNextIterKey(pIter->pIter);
}
}
// if (tsdbWriteBlockInfo(pCommith) < 0) {
// tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
// TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
// return -1;
// }
if (tsdbWriteBlockInfo(pCommith) < 0) {
tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
return -1;
}
return 0;
}
......@@ -1274,61 +1268,60 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
(void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))));
}
// static int tsdbWriteBlockInfo(SCommitH *pCommih) {
// SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
// SBlockIdx blkIdx;
// STable * pTable = TSDB_COMMIT_TABLE(pCommih);
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
SBlockIdx blkIdx;
STable * pTable = TSDB_COMMIT_TABLE(pCommih);
// if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void
// **)(&(TSDB_COMMIT_BUF(pCommih))),
// &blkIdx) < 0) {
// return -1;
// }
if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))),
&blkIdx) < 0) {
return -1;
}
// if (blkIdx.numOfBlocks == 0) {
// return 0;
// }
if (blkIdx.numOfBlocks == 0) {
return 0;
}
// if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// return -1;
// }
if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
// return 0;
// }
return 0;
}
// static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
// STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
// STsdbCfg * pCfg = REPO_CFG(pRepo);
// SMergeInfo mInfo;
// int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
// SDFile * pDFile;
// bool isLast;
// SBlock block;
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SMergeInfo mInfo;
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
SDFile * pDFile;
bool isLast;
SBlock block;
// while (true) {
// tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0,
// pCfg->update, &mInfo);
while (true) {
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0,
pCfg->update, &mInfo);
// if (pCommith->pDataCols->numOfRows <= 0) break;
if (pCommith->pDataCols->numOfRows <= 0) break;
// if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) {
// pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
// isLast = false;
// } else {
// pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
// isLast = true;
// }
if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) {
pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
isLast = false;
} else {
pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
isLast = true;
}
// if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
// if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) {
// return -1;
// }
// }
if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) {
return -1;
}
}
// return 0;
// }
return 0;
}
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册