提交 7d3a57f5 编写于 作者: H Hongze Cheng

more

上级 724a3710
...@@ -54,6 +54,7 @@ typedef struct STsdbCfg { ...@@ -54,6 +54,7 @@ typedef struct STsdbCfg {
int32_t keep1; int32_t keep1;
int32_t keep2; int32_t keep2;
int8_t update; int8_t update;
int8_t compression;
} STsdbCfg; } STsdbCfg;
// STsdb // STsdb
......
...@@ -62,6 +62,37 @@ static void tsdbDestroyCommitH(SCommitH *pCommith); ...@@ -62,6 +62,37 @@ static void tsdbDestroyCommitH(SCommitH *pCommith);
static int tsdbCreateCommitIters(SCommitH *pCommith); static int tsdbCreateCommitIters(SCommitH *pCommith);
static void tsdbDestroyCommitIters(SCommitH *pCommith); static void tsdbDestroyCommitIters(SCommitH *pCommith);
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static void tsdbResetCommitFile(SCommitH *pCommith);
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
// static int tsdbCommitMeta(STsdbRepo *pRepo);
// static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact);
// static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid);
// static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile);
// static void tsdbStartCommit(STsdbRepo *pRepo);
// static void tsdbEndCommit(STsdbRepo *pRepo, int eno);
// static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
// static int tsdbCreateCommitIters(SCommitH *pCommith);
// static void tsdbDestroyCommitIters(SCommitH *pCommith);
// static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key);
// static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo);
// static void tsdbDestroyCommitH(SCommitH *pCommith);
// static int tsdbGetFidLevel(int fid, SRtn *pRtn);
// static int tsdbNextCommitFid(SCommitH *pCommith);
static int tsdbCommitToTable(SCommitH *pCommith, int tid);
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
// static int tsdbWriteBlockInfo(SCommitH *pCommih);
// static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
// static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
static int tsdbMoveBlock(SCommitH *pCommith, int bidx);
static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks);
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
bool isLastOneBlock);
static void tsdbResetCommitTable(SCommitH *pCommith);
static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
// static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
// static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
// TSKEY maxKey, int maxRows, int8_t update);
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
SDiskID did; SDiskID did;
...@@ -141,7 +172,6 @@ int tsdbCommit(STsdb *pRepo) { ...@@ -141,7 +172,6 @@ int tsdbCommit(STsdb *pRepo) {
// Loop to commit to each file // Loop to commit to each file
fid = tsdbNextCommitFid(&(commith)); fid = tsdbNextCommitFid(&(commith));
#if 0
while (true) { while (true) {
// Loop over both on disk and memory // Loop over both on disk and memory
if (pSet == NULL && fid == TSDB_IVLD_FID) break; if (pSet == NULL && fid == TSDB_IVLD_FID) break;
...@@ -179,7 +209,6 @@ int tsdbCommit(STsdb *pRepo) { ...@@ -179,7 +209,6 @@ int tsdbCommit(STsdb *pRepo) {
fid = tsdbNextCommitFid(&commith); fid = tsdbNextCommitFid(&commith);
} }
} }
#endif
tsdbDestroyCommitH(&commith); tsdbDestroyCommitH(&commith);
tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS); tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS);
...@@ -314,61 +343,62 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) { ...@@ -314,61 +343,62 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) {
tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
} }
// static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
// STsdb * pRepo = TSDB_COMMIT_REPO(pCommith); STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
// STsdbCfg *pCfg = REPO_CFG(pRepo); STsdbCfg *pCfg = REPO_CFG(pRepo);
// ASSERT(pSet == NULL || pSet->fid == fid); ASSERT(pSet == NULL || pSet->fid == fid);
// tsdbResetCommitFile(pCommith); tsdbResetCommitFile(pCommith);
// tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey)); tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey));
// // Set and open files // Set and open files
// if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) { if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) {
// return -1; return -1;
// } }
// // Loop to commit each table data // Loop to commit each table data
// for (int tid = 1; tid < pCommith->niters; tid++) { for (int tid = 1; tid < pCommith->niters; tid++) {
// SCommitIter *pIter = pCommith->iters + tid; SCommitIter *pIter = pCommith->iters + tid;
// if (pIter->pTable == NULL) continue; if (pIter->pTable == NULL) continue;
// if (tsdbCommitToTable(pCommith, tid) < 0) { if (tsdbCommitToTable(pCommith, tid) < 0) {
// tsdbCloseCommitFile(pCommith, true); tsdbCloseCommitFile(pCommith, true);
// // revert the file change // revert the file change
// tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
// return -1; return -1;
// } }
// } }
#if 0
// if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) <
// < 0) {
// 0) { tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
// tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); tsdbCloseCommitFile(pCommith, true);
// tsdbCloseCommitFile(pCommith, true); // revert the file change
// // revert the file change tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
// tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); return -1;
// return -1; }
// }
// if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) { if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) {
// tsdbError("vgId:%d failed to update FSET %d header since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); tsdbError("vgId:%d failed to update FSET %d header since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
// tsdbCloseCommitFile(pCommith, true); tsdbCloseCommitFile(pCommith, true);
// // revert the file change // revert the file change
// tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
// return -1; return -1;
// } }
// // Close commit file // Close commit file
// tsdbCloseCommitFile(pCommith, false); tsdbCloseCommitFile(pCommith, false);
// if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) { if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) {
// return -1; return -1;
// } }
// return 0; #endif
// } return 0;
}
static int tsdbCreateCommitIters(SCommitH *pCommith) { static int tsdbCreateCommitIters(SCommitH *pCommith) {
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith); STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
...@@ -416,479 +446,569 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) { ...@@ -416,479 +446,569 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) {
pCommith->niters = 0; pCommith->niters = 0;
} }
#if 0 static void tsdbResetCommitFile(SCommitH *pCommith) {
#include "tsdbint.h" pCommith->isRFileSet = false;
pCommith->isDFileSame = false;
extern int32_t tsTsdbMetaCompactRatio; pCommith->isLFileSame = false;
taosArrayClear(pCommith->aBlkIdx);
}
static int tsdbCommitMeta(STsdbRepo *pRepo);
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact);
static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid);
static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile);
static void tsdbStartCommit(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo, int eno);
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static int tsdbCreateCommitIters(SCommitH *pCommith);
static void tsdbDestroyCommitIters(SCommitH *pCommith);
static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key);
static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo);
static void tsdbDestroyCommitH(SCommitH *pCommith);
static int tsdbGetFidLevel(int fid, SRtn *pRtn);
static int tsdbNextCommitFid(SCommitH *pCommith);
static int tsdbCommitToTable(SCommitH *pCommith, int tid);
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
static int tsdbWriteBlockInfo(SCommitH *pCommih);
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
static int tsdbMoveBlock(SCommitH *pCommith, int bidx);
static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks);
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
bool isLastOneBlock);
static void tsdbResetCommitFile(SCommitH *pCommith);
static void tsdbResetCommitTable(SCommitH *pCommith);
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update);
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf,
SBlockIdx *pIdx) {
size_t nSupBlocks;
size_t nSubBlocks;
uint32_t tlen;
SBlockInfo *pBlkInfo;
int64_t offset;
SBlock * pBlock;
memset(pIdx, 0, sizeof(*pIdx));
nSupBlocks = taosArrayGetSize(pSupA); static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA); SDiskID did;
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
if (nSupBlocks <= 0) { tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &(did.level), &(did.id));
// No data (data all deleted) if (did.level == TFS_UNDECIDED_LEVEL) {
return 0; terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
} }
tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM)); // Open read FSET
if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; if (pSet) {
pBlkInfo = *ppBuf; if (tsdbSetAndOpenReadFSet(&(pCommith->readh), pSet) < 0) {
return -1;
pBlkInfo->delimiter = TSDB_FILE_DELIMITER;
pBlkInfo->tid = TABLE_TID(pTable);
pBlkInfo->uid = TABLE_UID(pTable);
memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock));
if (nSubBlocks > 0) {
memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock));
for (int i = 0; i < nSupBlocks; i++) {
pBlock = pBlkInfo->blocks + i;
if (pBlock->numOfSubBlocks > 1) {
pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks);
}
}
} }
taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen); pCommith->isRFileSet = true;
if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) { if (tsdbLoadBlockIdx(&(pCommith->readh)) < 0) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1; return -1;
} }
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM))); tsdbDebug("vgId:%d FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo), TSDB_FSET_FID(pSet),
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
// Set pIdx } else {
pBlock = taosArrayGetLast(pSupA); pCommith->isRFileSet = false;
pIdx->tid = TABLE_TID(pTable);
pIdx->uid = TABLE_UID(pTable);
pIdx->hasLast = pBlock->last ? 1 : 0;
pIdx->maxKey = pBlock->keyLast;
pIdx->numOfBlocks = (uint32_t)nSupBlocks;
pIdx->len = tlen;
pIdx->offset = (uint32_t)offset;
return 0;
}
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
SBlockIdx *pBlkIdx;
size_t nidx = taosArrayGetSize(pIdxA);
int tlen = 0, size;
int64_t offset;
if (nidx <= 0) {
// All data are deleted
pHeadf->info.offset = 0;
pHeadf->info.len = 0;
return 0;
} }
for (size_t i = 0; i < nidx; i++) { // Set and open commit FSET
pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i); if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) {
// Create a new FSET to write data
size = tsdbEncodeSBlockIdx(NULL, pBlkIdx); tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)));
if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1;
void *ptr = POINTER_SHIFT(*ppBuf, tlen);
tsdbEncodeSBlockIdx(&ptr, pBlkIdx);
tlen += size; if (tsdbCreateDFileSet(pWSet, true) < 0) {
tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo),
TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet), tstrerror(terrno));
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
} }
tlen += sizeof(TSCKSUM);
if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen);
if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) {
return -1; return -1;
} }
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM))); pCommith->isDFileSame = false;
pHeadf->info.offset = (uint32_t)offset; pCommith->isLFileSame = false;
pHeadf->info.len = tlen;
return 0;
}
tsdbDebug("vgId:%d FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet),
TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet));
} else {
did.level = TSDB_FSET_LEVEL(pSet);
did.id = TSDB_FSET_ID(pSet);
// =================== Commit Meta Data pCommith->wSet.fid = fid;
static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile* pMf, bool open) { pCommith->wSet.state = 0;
STsdbFS * pfs = REPO_FS(pRepo);
SMFile * pOMFile = pfs->cstatus->pmf;
SDiskID did;
// Create/Open a meta file or open the existing file // TSDB_FILE_HEAD
if (pOMFile == NULL) { SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
// Create a new meta file tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
did.level = TFS_PRIMARY_LEVEL; if (tsdbCreateDFile(pWHeadf, true) < 0) {
did.id = TFS_PRIMARY_ID; tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf),
tsdbInitMFile(pMf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); tstrerror(terrno));
if (open && tsdbCreateMFile(pMf, true) < 0) { if (pCommith->isRFileSet) {
tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1; return -1;
} }
}
tsdbInfo("vgId:%d meta file %s is created to commit", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMf)); // TSDB_FILE_DATA
} else { SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh));
tsdbInitMFileEx(pMf, pOMFile); SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith);
if (open && tsdbOpenMFile(pMf, O_WRONLY) < 0) { tsdbInitDFileEx(pWDataf, pRDataf);
tsdbError("vgId:%d failed to open META file since %s", REPO_ID(pRepo), tstrerror(terrno)); if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWDataf),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1; return -1;
} }
} }
pCommith->isDFileSame = true;
return 0; // TSDB_FILE_LAST
} SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh));
SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith);
if (pRLastf->info.size < 32 * 1024) {
tsdbInitDFileEx(pWLastf, pRLastf);
pCommith->isLFileSame = true;
static int tsdbCommitMeta(STsdbRepo *pRepo) { if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) {
STsdbFS * pfs = REPO_FS(pRepo); tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
SMemTable *pMem = pRepo->imem; tstrerror(terrno));
SMFile * pOMFile = pfs->cstatus->pmf;
SMFile mf;
SActObj * pAct = NULL;
SActCont * pCont = NULL;
SListNode *pNode = NULL;
ASSERT(pOMFile != NULL || listNEles(pMem->actList) > 0);
if (listNEles(pMem->actList) <= 0) {
// no meta data to commit, just keep the old meta file
tsdbUpdateMFile(pfs, pOMFile);
if (tsTsdbMetaCompactRatio > 0) {
if (tsdbInitCommitMetaFile(pRepo, &mf, false) < 0) {
return -1;
}
int ret = tsdbCompactMetaFile(pRepo, pfs, &mf);
if (ret < 0) tsdbError("compact meta file error");
return ret; tsdbCloseDFileSet(pWSet);
} tsdbRemoveDFile(pWHeadf);
return 0; if (pCommith->isRFileSet) {
} else { tsdbCloseAndUnsetFSet(&(pCommith->readh));
if (tsdbInitCommitMetaFile(pRepo, &mf, true) < 0) {
return -1; return -1;
} }
} }
} else {
tsdbInitDFile(pWLastf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
pCommith->isLFileSame = false;
// Loop to write if (tsdbCreateDFile(pWLastf, true) < 0) {
while ((pNode = tdListPopHead(pMem->actList)) != NULL) { tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
pAct = (SActObj *)pNode->data;
if (pAct->act == TSDB_UPDATE_META) {
pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len, false) < 0) {
tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
tstrerror(terrno));
tsdbCloseMFile(&mf);
(void)tsdbApplyMFileChange(&mf, pOMFile);
// TODO: need to reload metaCache
return -1;
}
} else if (pAct->act == TSDB_DROP_META) {
if (tsdbDropMetaRecord(pfs, &mf, pAct->uid) < 0) {
tsdbError("vgId:%d failed to drop META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
tstrerror(terrno)); tstrerror(terrno));
tsdbCloseMFile(&mf);
tsdbApplyMFileChange(&mf, pOMFile); tsdbCloseDFileSet(pWSet);
// TODO: need to reload metaCache (void)tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1; return -1;
} }
} else {
ASSERT(false);
}
} }
if (tsdbUpdateMFileHeader(&mf) < 0) {
tsdbError("vgId:%d failed to update META file header since %s, revert it", REPO_ID(pRepo), tstrerror(terrno));
tsdbApplyMFileChange(&mf, pOMFile);
// TODO: need to reload metaCache
return -1;
} }
TSDB_FILE_FSYNC(&mf);
tsdbCloseMFile(&mf);
tsdbUpdateMFile(pfs, &mf);
if (tsTsdbMetaCompactRatio > 0 && tsdbCompactMetaFile(pRepo, pfs, &mf) < 0) {
tsdbError("compact meta file error");
} }
return 0; return 0;
} }
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord) { // extern int32_t tsTsdbMetaCompactRatio;
int tlen = 0;
tlen += taosEncodeFixedU64(buf, pRecord->uid);
tlen += taosEncodeFixedI64(buf, pRecord->offset);
tlen += taosEncodeFixedI64(buf, pRecord->size);
return tlen;
}
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) { // int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf,
buf = taosDecodeFixedU64(buf, &(pRecord->uid)); // SBlockIdx *pIdx) {
buf = taosDecodeFixedI64(buf, &(pRecord->offset)); // size_t nSupBlocks;
buf = taosDecodeFixedI64(buf, &(pRecord->size)); // size_t nSubBlocks;
// uint32_t tlen;
// SBlockInfo *pBlkInfo;
// int64_t offset;
// SBlock * pBlock;
return buf; // memset(pIdx, 0, sizeof(*pIdx));
}
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact) { // nSupBlocks = taosArrayGetSize(pSupA);
char buf[64] = "\0"; // nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA);
void * pBuf = buf;
SKVRecord rInfo;
int64_t offset;
// Seek to end of meta file // if (nSupBlocks <= 0) {
offset = tsdbSeekMFile(pMFile, 0, SEEK_END); // // No data (data all deleted)
if (offset < 0) { // return 0;
return -1; // }
}
rInfo.offset = offset; // tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM));
rInfo.uid = uid; // if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
rInfo.size = contLen; // pBlkInfo = *ppBuf;
int tlen = tsdbEncodeKVRecord((void **)(&pBuf), &rInfo); // pBlkInfo->delimiter = TSDB_FILE_DELIMITER;
if (tsdbAppendMFile(pMFile, buf, tlen, NULL) < tlen) { // pBlkInfo->tid = TABLE_TID(pTable);
return -1; // pBlkInfo->uid = TABLE_UID(pTable);
}
if (tsdbAppendMFile(pMFile, cont, contLen, NULL) < contLen) { // memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock));
return -1; // if (nSubBlocks > 0) {
} // memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock));
tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM))); // for (int i = 0; i < nSupBlocks; i++) {
// pBlock = pBlkInfo->blocks + i;
SHashObj* cache = compact ? pfs->metaCacheComp : pfs->metaCache; // if (pBlock->numOfSubBlocks > 1) {
// pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks);
// }
// }
// }
pMFile->info.nRecords++; // taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen);
SKVRecord *pRecord = taosHashGet(cache, (void *)&uid, sizeof(uid)); // if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) {
if (pRecord != NULL) { // return -1;
pMFile->info.tombSize += (pRecord->size + sizeof(SKVRecord)); // }
} else {
pMFile->info.nRecords++;
}
taosHashPut(cache, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo));
return 0; // tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM)));
}
static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) { // // Set pIdx
SKVRecord rInfo = {0}; // pBlock = taosArrayGetLast(pSupA);
char buf[128] = "\0";
SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)(&uid), sizeof(uid)); // pIdx->tid = TABLE_TID(pTable);
if (pRecord == NULL) { // pIdx->uid = TABLE_UID(pTable);
tsdbError("failed to drop META record with key %" PRIu64 " since not find", uid); // pIdx->hasLast = pBlock->last ? 1 : 0;
return -1; // pIdx->maxKey = pBlock->keyLast;
} // pIdx->numOfBlocks = (uint32_t)nSupBlocks;
// pIdx->len = tlen;
// pIdx->offset = (uint32_t)offset;
rInfo.offset = -pRecord->offset; // return 0;
rInfo.uid = pRecord->uid; // }
rInfo.size = pRecord->size;
void *pBuf = buf; // int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
tsdbEncodeKVRecord(&pBuf, &rInfo); // SBlockIdx *pBlkIdx;
// size_t nidx = taosArrayGetSize(pIdxA);
// int tlen = 0, size;
// int64_t offset;
if (tsdbAppendMFile(pMFile, buf, sizeof(SKVRecord), NULL) < 0) { // if (nidx <= 0) {
return -1; // // All data are deleted
} // pHeadf->info.offset = 0;
// pHeadf->info.len = 0;
// return 0;
// }
pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t *)buf, sizeof(SKVRecord)); // for (size_t i = 0; i < nidx; i++) {
pMFile->info.nDels++; // pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i);
pMFile->info.nRecords--;
pMFile->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
taosHashRemove(pfs->metaCache, (void *)(&uid), sizeof(uid)); // size = tsdbEncodeSBlockIdx(NULL, pBlkIdx);
return 0; // if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1;
}
static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) { // void *ptr = POINTER_SHIFT(*ppBuf, tlen);
float delPercent = (float)(pMFile->info.nDels) / (float)(pMFile->info.nRecords); // tsdbEncodeSBlockIdx(&ptr, pBlkIdx);
float tombPercent = (float)(pMFile->info.tombSize) / (float)(pMFile->info.size);
float compactRatio = (float)(tsTsdbMetaCompactRatio)/100;
if (delPercent < compactRatio && tombPercent < compactRatio) { // tlen += size;
return 0; // }
}
if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) { // tlen += sizeof(TSCKSUM);
tsdbError("open meta file %s compact fail", pMFile->f.rname); // if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
return -1; // taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen);
}
tsdbInfo("begin compact tsdb meta file, ratio:%d, nDels:%" PRId64 ",nRecords:%" PRId64 ",tombSize:%" PRId64 ",size:%" PRId64, // if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) {
tsTsdbMetaCompactRatio, pMFile->info.nDels,pMFile->info.nRecords,pMFile->info.tombSize,pMFile->info.size); // return -1;
// }
SMFile mf; // tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM)));
SDiskID did; // pHeadf->info.offset = (uint32_t)offset;
// pHeadf->info.len = tlen;
// first create tmp meta file // return 0;
did.level = TFS_PRIMARY_LEVEL; // }
did.id = TFS_PRIMARY_ID;
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)) + 1);
if (tsdbCreateMFile(&mf, true) < 0) { // // =================== Commit Meta Data
tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); // static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile* pMf, bool open) {
return -1; // STsdbFS * pfs = REPO_FS(pRepo);
} // SMFile * pOMFile = pfs->cstatus->pmf;
// SDiskID did;
// // Create/Open a meta file or open the existing file
// if (pOMFile == NULL) {
// // Create a new meta file
// did.level = TFS_PRIMARY_LEVEL;
// did.id = TFS_PRIMARY_ID;
// tsdbInitMFile(pMf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)));
// if (open && tsdbCreateMFile(pMf, true) < 0) {
// tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno));
// return -1;
// }
tsdbInfo("vgId:%d meta file %s is created to compact meta data", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf)); // tsdbInfo("vgId:%d meta file %s is created to commit", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMf));
// } else {
// tsdbInitMFileEx(pMf, pOMFile);
// if (open && tsdbOpenMFile(pMf, O_WRONLY) < 0) {
// tsdbError("vgId:%d failed to open META file since %s", REPO_ID(pRepo), tstrerror(terrno));
// return -1;
// }
// }
// second iterator metaCache // return 0;
int code = -1; // }
int64_t maxBufSize = 1024;
SKVRecord *pRecord;
void *pBuf = NULL;
pBuf = malloc((size_t)maxBufSize); // static int tsdbCommitMeta(STsdbRepo *pRepo) {
if (pBuf == NULL) { // STsdbFS * pfs = REPO_FS(pRepo);
goto _err; // SMemTable *pMem = pRepo->imem;
} // SMFile * pOMFile = pfs->cstatus->pmf;
// SMFile mf;
// SActObj * pAct = NULL;
// SActCont * pCont = NULL;
// SListNode *pNode = NULL;
// ASSERT(pOMFile != NULL || listNEles(pMem->actList) > 0);
// if (listNEles(pMem->actList) <= 0) {
// // no meta data to commit, just keep the old meta file
// tsdbUpdateMFile(pfs, pOMFile);
// if (tsTsdbMetaCompactRatio > 0) {
// if (tsdbInitCommitMetaFile(pRepo, &mf, false) < 0) {
// return -1;
// }
// int ret = tsdbCompactMetaFile(pRepo, pfs, &mf);
// if (ret < 0) tsdbError("compact meta file error");
// init Comp // return ret;
assert(pfs->metaCacheComp == NULL); // }
pfs->metaCacheComp = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); // return 0;
if (pfs->metaCacheComp == NULL) { // } else {
goto _err; // if (tsdbInitCommitMetaFile(pRepo, &mf, true) < 0) {
} // return -1;
// }
// }
pRecord = taosHashIterate(pfs->metaCache, NULL); // // Loop to write
while (pRecord) { // while ((pNode = tdListPopHead(pMem->actList)) != NULL) {
if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) { // pAct = (SActObj *)pNode->data;
tsdbError("vgId:%d failed to seek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), // if (pAct->act == TSDB_UPDATE_META) {
tstrerror(terrno)); // pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
goto _err; // if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len, false) < 0) {
} // tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
if (pRecord->size > maxBufSize) { // tstrerror(terrno));
maxBufSize = pRecord->size; // tsdbCloseMFile(&mf);
void* tmp = realloc(pBuf, (size_t)maxBufSize); // (void)tsdbApplyMFileChange(&mf, pOMFile);
if (tmp == NULL) { // // TODO: need to reload metaCache
goto _err; // return -1;
} // }
pBuf = tmp; // } else if (pAct->act == TSDB_DROP_META) {
} // if (tsdbDropMetaRecord(pfs, &mf, pAct->uid) < 0) {
int nread = (int)tsdbReadMFile(pMFile, pBuf, pRecord->size); // tsdbError("vgId:%d failed to drop META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
if (nread < 0) { // tstrerror(terrno));
tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), // tsdbCloseMFile(&mf);
tstrerror(terrno)); // tsdbApplyMFileChange(&mf, pOMFile);
goto _err; // // TODO: need to reload metaCache
} // return -1;
// }
// } else {
// ASSERT(false);
// }
// }
if (nread < pRecord->size) { // if (tsdbUpdateMFileHeader(&mf) < 0) {
tsdbError("vgId:%d failed to read file %s since file corrupted, expected read:%" PRId64 " actual read:%d", // tsdbError("vgId:%d failed to update META file header since %s, revert it", REPO_ID(pRepo), tstrerror(terrno));
REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), pRecord->size, nread); // tsdbApplyMFileChange(&mf, pOMFile);
goto _err; // // TODO: need to reload metaCache
} // return -1;
// }
if (tsdbUpdateMetaRecord(pfs, &mf, pRecord->uid, pBuf, (int)pRecord->size, true) < 0) { // TSDB_FILE_FSYNC(&mf);
tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pRecord->uid, // tsdbCloseMFile(&mf);
tstrerror(terrno)); // tsdbUpdateMFile(pfs, &mf);
goto _err;
}
pRecord = taosHashIterate(pfs->metaCache, pRecord); // if (tsTsdbMetaCompactRatio > 0 && tsdbCompactMetaFile(pRepo, pfs, &mf) < 0) {
} // tsdbError("compact meta file error");
code = 0; // }
_err: // return 0;
if (code == 0) TSDB_FILE_FSYNC(&mf); // }
tsdbCloseMFile(&mf);
tsdbCloseMFile(pMFile);
if (code == 0) { // int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord) {
// rename meta.tmp -> meta // int tlen = 0;
tsdbInfo("vgId:%d meta file rename %s -> %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf), TSDB_FILE_FULL_NAME(pMFile)); // tlen += taosEncodeFixedU64(buf, pRecord->uid);
taosRename(mf.f.aname,pMFile->f.aname); // tlen += taosEncodeFixedI64(buf, pRecord->offset);
tstrncpy(mf.f.aname, pMFile->f.aname, TSDB_FILENAME_LEN); // tlen += taosEncodeFixedI64(buf, pRecord->size);
tstrncpy(mf.f.rname, pMFile->f.rname, TSDB_FILENAME_LEN);
// update current meta file info
pfs->nstatus->pmf = NULL;
tsdbUpdateMFile(pfs, &mf);
taosHashCleanup(pfs->metaCache); // return tlen;
pfs->metaCache = pfs->metaCacheComp; // }
pfs->metaCacheComp = NULL;
} else {
// remove meta.tmp file
remove(mf.f.aname);
taosHashCleanup(pfs->metaCacheComp);
pfs->metaCacheComp = NULL;
}
tfree(pBuf); // void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) {
// buf = taosDecodeFixedU64(buf, &(pRecord->uid));
// buf = taosDecodeFixedI64(buf, &(pRecord->offset));
// buf = taosDecodeFixedI64(buf, &(pRecord->size));
ASSERT(mf.info.nDels == 0); // return buf;
ASSERT(mf.info.tombSize == 0); // }
tsdbInfo("end compact tsdb meta file,code:%d,nRecords:%" PRId64 ",size:%" PRId64, // static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact) {
code,mf.info.nRecords,mf.info.size); // char buf[64] = "\0";
return code; // void * pBuf = buf;
} // SKVRecord rInfo;
// int64_t offset;
// =================== Commit Time-Series Data // // Seek to end of meta file
#if 0 // offset = tsdbSeekMFile(pMFile, 0, SEEK_END);
static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { // if (offset < 0) {
for (int i = 0; i < nIters; i++) { // return -1;
TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); // }
if (nextKey != TSDB_DATA_TIMESTAMP_NULL && (nextKey >= minKey && nextKey <= maxKey)) return true;
} // rInfo.offset = offset;
return false; // rInfo.uid = uid;
} // rInfo.size = contLen;
#endif
// int tlen = tsdbEncodeKVRecord((void **)(&pBuf), &rInfo);
// if (tsdbAppendMFile(pMFile, buf, tlen, NULL) < tlen) {
// return -1;
// }
// if (tsdbAppendMFile(pMFile, cont, contLen, NULL) < contLen) {
// return -1;
// }
// tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)));
// SHashObj* cache = compact ? pfs->metaCacheComp : pfs->metaCache;
// pMFile->info.nRecords++;
// SKVRecord *pRecord = taosHashGet(cache, (void *)&uid, sizeof(uid));
// if (pRecord != NULL) {
// pMFile->info.tombSize += (pRecord->size + sizeof(SKVRecord));
// } else {
// pMFile->info.nRecords++;
// }
// taosHashPut(cache, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo));
// return 0;
// }
// static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) {
// SKVRecord rInfo = {0};
// char buf[128] = "\0";
// SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)(&uid), sizeof(uid));
// if (pRecord == NULL) {
// tsdbError("failed to drop META record with key %" PRIu64 " since not find", uid);
// return -1;
// }
// rInfo.offset = -pRecord->offset;
// rInfo.uid = pRecord->uid;
// rInfo.size = pRecord->size;
// void *pBuf = buf;
// tsdbEncodeKVRecord(&pBuf, &rInfo);
// if (tsdbAppendMFile(pMFile, buf, sizeof(SKVRecord), NULL) < 0) {
// return -1;
// }
// pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t *)buf, sizeof(SKVRecord));
// pMFile->info.nDels++;
// pMFile->info.nRecords--;
// pMFile->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
// taosHashRemove(pfs->metaCache, (void *)(&uid), sizeof(uid));
// return 0;
// }
// static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) {
// float delPercent = (float)(pMFile->info.nDels) / (float)(pMFile->info.nRecords);
// float tombPercent = (float)(pMFile->info.tombSize) / (float)(pMFile->info.size);
// float compactRatio = (float)(tsTsdbMetaCompactRatio)/100;
// if (delPercent < compactRatio && tombPercent < compactRatio) {
// return 0;
// }
// if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) {
// tsdbError("open meta file %s compact fail", pMFile->f.rname);
// return -1;
// }
// tsdbInfo("begin compact tsdb meta file, ratio:%d, nDels:%" PRId64 ",nRecords:%" PRId64 ",tombSize:%" PRId64
// ",size:%" PRId64,
// tsTsdbMetaCompactRatio, pMFile->info.nDels,pMFile->info.nRecords,pMFile->info.tombSize,pMFile->info.size);
// SMFile mf;
// SDiskID did;
// // first create tmp meta file
// did.level = TFS_PRIMARY_LEVEL;
// did.id = TFS_PRIMARY_ID;
// tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)) + 1);
// if (tsdbCreateMFile(&mf, true) < 0) {
// tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno));
// return -1;
// }
// tsdbInfo("vgId:%d meta file %s is created to compact meta data", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf));
// // second iterator metaCache
// int code = -1;
// int64_t maxBufSize = 1024;
// SKVRecord *pRecord;
// void *pBuf = NULL;
// pBuf = malloc((size_t)maxBufSize);
// if (pBuf == NULL) {
// goto _err;
// }
// // init Comp
// assert(pfs->metaCacheComp == NULL);
// pfs->metaCacheComp = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
// if (pfs->metaCacheComp == NULL) {
// goto _err;
// }
// pRecord = taosHashIterate(pfs->metaCache, NULL);
// while (pRecord) {
// if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) {
// tsdbError("vgId:%d failed to seek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile),
// tstrerror(terrno));
// goto _err;
// }
// if (pRecord->size > maxBufSize) {
// maxBufSize = pRecord->size;
// void* tmp = realloc(pBuf, (size_t)maxBufSize);
// if (tmp == NULL) {
// goto _err;
// }
// pBuf = tmp;
// }
// int nread = (int)tsdbReadMFile(pMFile, pBuf, pRecord->size);
// if (nread < 0) {
// tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile),
// tstrerror(terrno));
// goto _err;
// }
// if (nread < pRecord->size) {
// tsdbError("vgId:%d failed to read file %s since file corrupted, expected read:%" PRId64 " actual read:%d",
// REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), pRecord->size, nread);
// goto _err;
// }
// if (tsdbUpdateMetaRecord(pfs, &mf, pRecord->uid, pBuf, (int)pRecord->size, true) < 0) {
// tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pRecord->uid,
// tstrerror(terrno));
// goto _err;
// }
// pRecord = taosHashIterate(pfs->metaCache, pRecord);
// }
// code = 0;
// _err:
// if (code == 0) TSDB_FILE_FSYNC(&mf);
// tsdbCloseMFile(&mf);
// tsdbCloseMFile(pMFile);
// if (code == 0) {
// // rename meta.tmp -> meta
// tsdbInfo("vgId:%d meta file rename %s -> %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf),
// TSDB_FILE_FULL_NAME(pMFile)); taosRename(mf.f.aname,pMFile->f.aname); tstrncpy(mf.f.aname, pMFile->f.aname,
// TSDB_FILENAME_LEN); tstrncpy(mf.f.rname, pMFile->f.rname, TSDB_FILENAME_LEN);
// // update current meta file info
// pfs->nstatus->pmf = NULL;
// tsdbUpdateMFile(pfs, &mf);
// taosHashCleanup(pfs->metaCache);
// pfs->metaCache = pfs->metaCacheComp;
// pfs->metaCacheComp = NULL;
// } else {
// // remove meta.tmp file
// remove(mf.f.aname);
// taosHashCleanup(pfs->metaCacheComp);
// pfs->metaCacheComp = NULL;
// }
// tfree(pBuf);
// ASSERT(mf.info.nDels == 0);
// ASSERT(mf.info.tombSize == 0);
// tsdbInfo("end compact tsdb meta file,code:%d,nRecords:%" PRId64 ",size:%" PRId64,
// code,mf.info.nRecords,mf.info.size);
// return code;
// }
// // =================== Commit Time-Series Data
// #if 0
// static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
// for (int i = 0; i < nIters; i++) {
// TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
// if (nextKey != TSDB_DATA_TIMESTAMP_NULL && (nextKey >= minKey && nextKey <= maxKey)) return true;
// }
// return false;
// }
// #endif
static int tsdbCommitToTable(SCommitH *pCommith, int tid) { static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
SCommitIter *pIter = pCommith->iters + tid; SCommitIter *pIter = pCommith->iters + tid;
...@@ -896,17 +1016,13 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { ...@@ -896,17 +1016,13 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
tsdbResetCommitTable(pCommith); tsdbResetCommitTable(pCommith);
TSDB_RLOCK_TABLE(pIter->pTable);
// Set commit table // Set commit table
if (tsdbSetCommitTable(pCommith, pIter->pTable) < 0) { if (tsdbSetCommitTable(pCommith, pIter->pTable) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1; return -1;
} }
// No disk data and no memory data, just return // No disk data and no memory data, just return
if (pCommith->readh.pBlkIdx == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) { if (pCommith->readh.pBlkIdx == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return 0; return 0;
} }
...@@ -917,7 +1033,6 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { ...@@ -917,7 +1033,6 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
if (pCommith->readh.pBlkIdx) { if (pCommith->readh.pBlkIdx) {
if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) { if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1; return -1;
} }
...@@ -938,7 +1053,6 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { ...@@ -938,7 +1053,6 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) || if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) ||
(pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) { (pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) {
if (tsdbMoveBlock(pCommith, bidx) < 0) { if (tsdbMoveBlock(pCommith, bidx) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1; return -1;
} }
...@@ -949,43 +1063,41 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { ...@@ -949,43 +1063,41 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
pBlock = NULL; pBlock = NULL;
} }
} else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) { } else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) {
// merge pBlock data and memory data // // merge pBlock data and memory data
if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) { // if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable); // TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1; // return -1;
} // }
bidx++; // bidx++;
if (bidx < nBlocks) { // if (bidx < nBlocks) {
pBlock = pCommith->readh.pBlkInfo->blocks + bidx; // pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
} else { // } else {
pBlock = NULL; // pBlock = NULL;
} // }
nextKey = tsdbNextIterKey(pIter->pIter); // nextKey = tsdbNextIterKey(pIter->pIter);
} else { } else {
// Only commit memory data // // Only commit memory data
if (pBlock == NULL) { // if (pBlock == NULL) {
if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) { // if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable); // TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1; // return -1;
} // }
} else { // } else {
if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) { // if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable); // TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1; // return -1;
} // }
} // }
nextKey = tsdbNextIterKey(pIter->pIter); // nextKey = tsdbNextIterKey(pIter->pIter);
} }
} }
TSDB_RUNLOCK_TABLE(pIter->pTable); // if (tsdbWriteBlockInfo(pCommith) < 0) {
// tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith),
if (tsdbWriteBlockInfo(pCommith) < 0) { // TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno));
tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), // return -1;
TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); // }
return -1;
}
return 0; return 0;
} }
...@@ -1023,8 +1135,8 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { ...@@ -1023,8 +1135,8 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
} }
} }
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isLast, bool isSuper, void **ppBuf, void **ppCBuf) { bool isSuper, void **ppBuf, void **ppCBuf) {
STsdbCfg * pCfg = REPO_CFG(pRepo); STsdbCfg * pCfg = REPO_CFG(pRepo);
SBlockData *pBlockData; SBlockData *pBlockData;
int64_t offset = 0; int64_t offset = 0;
...@@ -1090,8 +1202,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo ...@@ -1090,8 +1202,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo
pBlockCol = pBlockData->cols + tcol; pBlockCol = pBlockData->cols + tcol;
tptr = POINTER_SHIFT(pBlockData, lsize); tptr = POINTER_SHIFT(pBlockData, lsize);
if (pCfg->compression == TWO_STAGE_COMP && if (pCfg->compression == TWO_STAGE_COMP && tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) {
tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) {
return -1; return -1;
} }
...@@ -1162,132 +1273,133 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo ...@@ -1162,132 +1273,133 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
(void **)(&(TSDB_COMMIT_COMP_BUF(pCommith)))); (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))));
} }
// static int tsdbWriteBlockInfo(SCommitH *pCommih) {
// SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
// SBlockIdx blkIdx;
// STable * pTable = TSDB_COMMIT_TABLE(pCommih);
static int tsdbWriteBlockInfo(SCommitH *pCommih) { // if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void
SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); // **)(&(TSDB_COMMIT_BUF(pCommih))),
SBlockIdx blkIdx; // &blkIdx) < 0) {
STable * pTable = TSDB_COMMIT_TABLE(pCommih); // return -1;
// }
if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))),
&blkIdx) < 0) {
return -1;
}
if (blkIdx.numOfBlocks == 0) {
return 0;
}
if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) { // if (blkIdx.numOfBlocks == 0) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); // return 0;
STsdbCfg * pCfg = REPO_CFG(pRepo); // }
SMergeInfo mInfo;
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
SDFile * pDFile;
bool isLast;
SBlock block;
while (true) { // if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) {
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0, // terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
pCfg->update, &mInfo); // return -1;
// }
if (pCommith->pDataCols->numOfRows <= 0) break; // return 0;
// }
if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { // static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
pDFile = TSDB_COMMIT_DATA_FILE(pCommith); // STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
isLast = false; // STsdbCfg * pCfg = REPO_CFG(pRepo);
} else { // SMergeInfo mInfo;
pDFile = TSDB_COMMIT_LAST_FILE(pCommith); // int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
isLast = true; // SDFile * pDFile;
} // bool isLast;
// SBlock block;
// while (true) {
// tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0,
// pCfg->update, &mInfo);
// if (pCommith->pDataCols->numOfRows <= 0) break;
// if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) {
// pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
// isLast = false;
// } else {
// pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
// isLast = true;
// }
if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; // if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) { // if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) {
return -1; // return -1;
} // }
} // }
return 0; // return 0;
} // }
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { // static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); // STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo); // STsdbCfg * pCfg = REPO_CFG(pRepo);
int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; // int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx; // SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
TSKEY keyLimit; // TSKEY keyLimit;
int16_t colId = 0; // int16_t colId = 0;
SMergeInfo mInfo; // SMergeInfo mInfo;
SBlock subBlocks[TSDB_MAX_SUBBLOCKS]; // SBlock subBlocks[TSDB_MAX_SUBBLOCKS];
SBlock block, supBlock; // SBlock block, supBlock;
SDFile * pDFile; // SDFile * pDFile;
if (bidx == nBlocks - 1) { // if (bidx == nBlocks - 1) {
keyLimit = pCommith->maxKey; // keyLimit = pCommith->maxKey;
} else { // } else {
keyLimit = pBlock[1].keyFirst - 1; // keyLimit = pBlock[1].keyFirst - 1;
} // }
SSkipListIterator titer = *(pIter->pIter); // SSkipListIterator titer = *(pIter->pIter);
if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1) < 0) return -1; // if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1) < 0) return -1;
tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData, // tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData,
pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo); // pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo);
if (mInfo.nOperations == 0) { // if (mInfo.nOperations == 0) {
// no new data to insert (all updates denied) // // no new data to insert (all updates denied)
if (tsdbMoveBlock(pCommith, bidx) < 0) { // if (tsdbMoveBlock(pCommith, bidx) < 0) {
return -1; // return -1;
} // }
*(pIter->pIter) = titer; // *(pIter->pIter) = titer;
} else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) { // } else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) {
// Ignore the block // // Ignore the block
ASSERT(0); // ASSERT(0);
*(pIter->pIter) = titer; // *(pIter->pIter) = titer;
} else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) { // } else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) {
// Add a sub-block // // Add a sub-block
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols, // tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols,
pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update, // pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows,
&mInfo); // pCfg->update, &mInfo);
if (pBlock->last) { // if (pBlock->last) {
pDFile = TSDB_COMMIT_LAST_FILE(pCommith); // pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
} else { // } else {
pDFile = TSDB_COMMIT_DATA_FILE(pCommith); // pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
} // }
if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1; // if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1;
if (pBlock->numOfSubBlocks == 1) { // if (pBlock->numOfSubBlocks == 1) {
subBlocks[0] = *pBlock; // subBlocks[0] = *pBlock;
subBlocks[0].numOfSubBlocks = 0; // subBlocks[0].numOfSubBlocks = 0;
} else { // } else {
memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), // memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset),
sizeof(SBlock) * pBlock->numOfSubBlocks); // sizeof(SBlock) * pBlock->numOfSubBlocks);
} // }
subBlocks[pBlock->numOfSubBlocks] = block; // subBlocks[pBlock->numOfSubBlocks] = block;
supBlock = *pBlock; // supBlock = *pBlock;
supBlock.keyFirst = mInfo.keyFirst; // supBlock.keyFirst = mInfo.keyFirst;
supBlock.keyLast = mInfo.keyLast; // supBlock.keyLast = mInfo.keyLast;
supBlock.numOfSubBlocks++; // supBlock.numOfSubBlocks++;
supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed; // supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed;
supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock); // supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock);
if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1; // if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1;
} else { // } else {
if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; // if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1;
if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -1; // if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return
} // -1;
// }
return 0; // return 0;
} // }
static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
...@@ -1342,113 +1454,107 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const ...@@ -1342,113 +1454,107 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const
return 0; return 0;
} }
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool isLastOneBlock) { // static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); // isLastOneBlock) {
STsdbCfg * pCfg = REPO_CFG(pRepo); // STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
SBlock block; // STsdbCfg * pCfg = REPO_CFG(pRepo);
SDFile * pDFile; // SBlock block;
bool isLast; // SDFile * pDFile;
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); // bool isLast;
// int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
int biter = 0;
while (true) { // int biter = 0;
tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows, // while (true) {
pCfg->update); // tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows,
// pCfg->update);
if (pCommith->pDataCols->numOfRows == 0) break;
// if (pCommith->pDataCols->numOfRows == 0) break;
if (isLastOneBlock) {
if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) { // if (isLastOneBlock) {
pDFile = TSDB_COMMIT_LAST_FILE(pCommith); // if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) {
isLast = true; // pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
} else { // isLast = true;
pDFile = TSDB_COMMIT_DATA_FILE(pCommith); // } else {
isLast = false; // pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
} // isLast = false;
} else { // }
pDFile = TSDB_COMMIT_DATA_FILE(pCommith); // } else {
isLast = false; // pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
} // isLast = false;
// }
if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1;
}
return 0;
}
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update) {
TSKEY key1 = INT64_MAX;
TSKEY key2 = INT64_MAX;
STSchema *pSchema = NULL;
ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); // if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
tdResetDataCols(pTarget); // if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1;
// }
while (true) { // return 0;
key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); // }
SMemRow row = tsdbNextIterRow(pCommitIter->pIter);
if (row == NULL || memRowKey(row) > maxKey) {
key2 = INT64_MAX;
} else {
key2 = memRowKey(row);
}
if (key1 == INT64_MAX && key2 == INT64_MAX) break; // static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
// TSKEY maxKey, int maxRows, int8_t update) {
// TSKEY key1 = INT64_MAX;
// TSKEY key2 = INT64_MAX;
// STSchema *pSchema = NULL;
// ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey);
// tdResetDataCols(pTarget);
// while (true) {
// key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
// SMemRow row = tsdbNextIterRow(pCommitIter->pIter);
// if (row == NULL || memRowKey(row) > maxKey) {
// key2 = INT64_MAX;
// } else {
// key2 = memRowKey(row);
// }
if (key1 < key2) { // if (key1 == INT64_MAX && key2 == INT64_MAX) break;
for (int i = 0; i < pDataCols->numOfCols; i++) {
//TODO: dataColAppendVal may fail
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints);
}
pTarget->numOfRows++; // if (key1 < key2) {
(*iter)++; // for (int i = 0; i < pDataCols->numOfCols; i++) {
} else if (key1 > key2) { // //TODO: dataColAppendVal may fail
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { // dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); // pTarget->maxPoints);
ASSERT(pSchema != NULL); // }
}
tdAppendMemRowToDataCol(row, pSchema, pTarget, true); // pTarget->numOfRows++;
// (*iter)++;
// } else if (key1 > key2) {
// if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
// pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
// ASSERT(pSchema != NULL);
// }
tSkipListIterNext(pCommitIter->pIter); // tdAppendMemRowToDataCol(row, pSchema, pTarget, true);
} else {
if (update != TD_ROW_OVERWRITE_UPDATE) {
//copy disk data
for (int i = 0; i < pDataCols->numOfCols; i++) {
//TODO: dataColAppendVal may fail
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints);
}
if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++; // tSkipListIterNext(pCommitIter->pIter);
} // } else {
if (update != TD_ROW_DISCARD_UPDATE) { // if (update != TD_ROW_OVERWRITE_UPDATE) {
//copy mem data // //copy disk data
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { // for (int i = 0; i < pDataCols->numOfCols; i++) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); // //TODO: dataColAppendVal may fail
ASSERT(pSchema != NULL); // dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
} // pTarget->maxPoints);
// }
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); // if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++;
} // }
(*iter)++; // if (update != TD_ROW_DISCARD_UPDATE) {
tSkipListIterNext(pCommitIter->pIter); // //copy mem data
} // if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
// pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
// ASSERT(pSchema != NULL);
// }
if (pTarget->numOfRows >= maxRows) break; // tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
} // }
} // (*iter)++;
// tSkipListIterNext(pCommitIter->pIter);
// }
static void tsdbResetCommitFile(SCommitH *pCommith) { // if (pTarget->numOfRows >= maxRows) break;
pCommith->isRFileSet = false; // }
pCommith->isDFileSame = false; // }
pCommith->isLFileSame = false;
taosArrayClear(pCommith->aBlkIdx);
}
static void tsdbResetCommitTable(SCommitH *pCommith) { static void tsdbResetCommitTable(SCommitH *pCommith) {
taosArrayClear(pCommith->aSubBlk); taosArrayClear(pCommith->aSubBlk);
...@@ -1456,131 +1562,6 @@ static void tsdbResetCommitTable(SCommitH *pCommith) { ...@@ -1456,131 +1562,6 @@ static void tsdbResetCommitTable(SCommitH *pCommith) {
pCommith->pTable = NULL; pCommith->pTable = NULL;
} }
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
SDiskID did;
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
}
// Open read FSET
if (pSet) {
if (tsdbSetAndOpenReadFSet(&(pCommith->readh), pSet) < 0) {
return -1;
}
pCommith->isRFileSet = true;
if (tsdbLoadBlockIdx(&(pCommith->readh)) < 0) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
tsdbDebug("vgId:%d FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo), TSDB_FSET_FID(pSet),
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
} else {
pCommith->isRFileSet = false;
}
// Set and open commit FSET
if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) {
// Create a new FSET to write data
tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)));
if (tsdbCreateDFileSet(pWSet, true) < 0) {
tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo),
TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet), tstrerror(terrno));
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
}
return -1;
}
pCommith->isDFileSame = false;
pCommith->isLFileSame = false;
tsdbDebug("vgId:%d FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet),
TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet));
} else {
did.level = TSDB_FSET_LEVEL(pSet);
did.id = TSDB_FSET_ID(pSet);
pCommith->wSet.fid = fid;
pCommith->wSet.state = 0;
// TSDB_FILE_HEAD
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFile(pWHeadf, true) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf),
tstrerror(terrno));
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
// TSDB_FILE_DATA
SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh));
SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith);
tsdbInitDFileEx(pWDataf, pRDataf);
if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWDataf),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
pCommith->isDFileSame = true;
// TSDB_FILE_LAST
SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh));
SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith);
if (pRLastf->info.size < 32 * 1024) {
tsdbInitDFileEx(pWLastf, pRLastf);
pCommith->isLFileSame = true;
if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
} else {
tsdbInitDFile(pWLastf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
pCommith->isLFileSame = false;
if (tsdbCreateDFile(pWLastf, true) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
(void)tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
}
}
return 0;
}
static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) { static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) {
if (pCommith->isRFileSet) { if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh)); tsdbCloseAndUnsetFSet(&(pCommith->readh));
...@@ -1592,46 +1573,45 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) { ...@@ -1592,46 +1573,45 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) {
tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
} }
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) { // static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); // STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo); // STsdbCfg * pCfg = REPO_CFG(pRepo);
int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed; // int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed;
ASSERT(mergeRows > 0);
if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) {
if (pBlock->last) {
if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true;
} else {
if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true;
}
}
return false; // ASSERT(mergeRows > 0);
}
int tsdbApplyRtn(STsdbRepo *pRepo) { // if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) {
SRtn rtn; // if (pBlock->last) {
SFSIter fsiter; // if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true;
STsdbFS * pfs = REPO_FS(pRepo); // } else {
SDFileSet *pSet; // if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true;
// }
// }
// Get retention snapshot // return false;
tsdbGetRtnSnap(pRepo, &rtn); // }
tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD); // int tsdbApplyRtn(STsdbRepo *pRepo) {
while ((pSet = tsdbFSIterNext(&fsiter))) { // SRtn rtn;
if (pSet->fid < rtn.minFid) { // SFSIter fsiter;
tsdbInfo("vgId:%d FSET %d at level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid, // STsdbFS * pfs = REPO_FS(pRepo);
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); // SDFileSet *pSet;
continue;
} // // Get retention snapshot
// tsdbGetRtnSnap(pRepo, &rtn);
// tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD);
// while ((pSet = tsdbFSIterNext(&fsiter))) {
// if (pSet->fid < rtn.minFid) {
// tsdbInfo("vgId:%d FSET %d at level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid,
// TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
// continue;
// }
if (tsdbApplyRtnOnFSet(pRepo, pSet, &rtn) < 0) { // if (tsdbApplyRtnOnFSet(pRepo, pSet, &rtn) < 0) {
return -1; // return -1;
} // }
} // }
return 0; // return 0;
} // }
#endif \ No newline at end of file
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册