From 15974edfdc7299bd0113394d643e68f965dbc8c8 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 11 Oct 2020 21:03:07 +0800 Subject: [PATCH] more code --- src/tsdb/src/tsdbCommit.c | 99 +++++++++++++++++++++++++-------------- 1 file changed, 64 insertions(+), 35 deletions(-) diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index dfe3d59380..3ac10f26d4 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -61,8 +61,6 @@ int tsdbCommitData(STsdbRepo *pRepo) { if (tsdbCommitMetaData(pCommitH) < 0) goto _err; - if (tsdbApplyRetention(pCommitH) < 0) goto _err; - tsdbEndCommit(pCommitH, false); return 0; @@ -80,7 +78,7 @@ static int tsdbStartCommit(SCommitHandle *pCommitH) { tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows); - pCommitH->pModLog = tdListNew(sizeof(void *)); + pCommitH->pModLog = tdListNew(0); if (pCommitH->pModLog == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; @@ -119,11 +117,10 @@ static void tsdbEndCommit(SCommitHandle *pCommitH, bool hasError) { SListNode *pNode = NULL; while ((pNode = tdListPopHead(pCommitH->pModLog)) != NULL) { - STsdbFileChange *pChange = (STsdbFileChange *)(*(void **)pNode->data); + STsdbFileChange *pChange = (STsdbFileChange *)pNode->data; tsdbApplyFileChange(pChange, !hasError); free(pNode); - free(pChange); } close(pCommitH->fd); @@ -141,22 +138,7 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) { STsdbFileH *pFileH = pRepo->tsdbFileH; int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile); - for (int i = 0; i < pFileH->nFGroups; i++) { - SFileGroup *pFGroup = pFileH->pFGroup[i]; - if (pFGroup->fileId < mfid) { - STsdbFileChange *pChange = (STsdbFileChange *)calloc(1, sizeof(STsdbFileChange) + sizeof(STsdbFileChange)); - if (pChange == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - pChange->type = TSDB_DATA_FILE_CHANGE; - SDataFileChange *pDataFileChange = (SDataFileChange *)pChange->change; - pDataFileChange->ofgroup = pFGroup; - } else { - break; - } - } + if (tsdbLogRetentionChange(pCommitH, mfid) < 0) return -1; if (pMem->numOfRows <= 0) return 0; @@ -173,14 +155,20 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) { tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); if (fid < mfid) { - // TODO: skip data in this file beyond retentioin and continue; + // Skip data in files beyond retention + tsdbSeekTSCommitHandle(&tsCommitH, maxKey); continue; } if (!tsdbHasDataToCommit(tsCommitH.pIters, pMem->maxTables, minKey, maxKey)) continue; - { - // TODO: manifest log file group action + { // TODO: Log file change + SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ); + if (pFGroup == NULL) { + + } else { + + } } if (tsdbCommitToFile(pRepo, fid, &tsCommitH) < 0) { @@ -248,11 +236,6 @@ static int tsdbCommitMetaData(SCommitHandle *pCommitH) { return 0; } -static int tsdbApplyRetention(SCommitHandle *pCommitH) { - // TODO - return 0; -} - static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) { SMemTable *pMem = pRepo->imem; STsdbMeta *pMeta = pRepo->tsdbMeta; @@ -415,7 +398,10 @@ _err: static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { for (int i = 0; i < nIters; i++) { - TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); + SCommitIter *pIter = iters + i; + if (pIter->pTable == NULL) continue; + + TSKEY nextKey = tsdbNextIterKey(pIter->pIter); if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1; } return 0; @@ -524,27 +510,60 @@ static int tsdbLogMetaFileChange(SCommitHandle *pCommitH) { STsdbRepo *pRepo = pCommitH->pRepo; SKVStore * pStore = pRepo->tsdbMeta->pStore; - STsdbFileChange *pChange = (STsdbFileChange *)calloc(1, sizeof(*pChange) + sizeof(SMetaFileChange)); - if (pChange == NULL) { + SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(STsdbFileChange) + sizeof(SMetaFileChange)); + if (pNode == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } + STsdbFileChange *pChange = pNode->data; pChange->type = TSDB_META_FILE_CHANGE; + SMetaFileChange *pMetaChange = (SMetaFileChange *)(pChange->change); strncpy(pMetaChange->oname, pStore->fname, TSDB_FILENAME_LEN); strncpy(pMetaChange->nname, pStore->fname, TSDB_FILENAME_LEN); pMetaChange->info = pStore->info; + if (tsdbLogFileChange(pCommitH, pChange) < 0) { - free(pChange); + free(pNode); return -1; } - tdListPrepend(pCommitH->pModLog, &pChange); + tdListPrependNode(pCommitH->pModLog, pNode); return 0; } -static int +static int tsdbLogRetentionChange(SCommitHandle *pCommitH, int mfid) { + STsdbRepo * pRepo = pCommitH->pRepo; + STsdbFileH *pFileH = pRepo->tsdbFileH; + + for (int i = 0; i < pFileH->nFGroups; i++) { + SFileGroup *pFGroup = pFileH->pFGroup[i]; + if (pFGroup->fileId < mfid) { + SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(STsdbFileChange) + sizeof(SDataFileChange)); + if (pNode == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + STsdbFileChange *pChange = (STsdbFileChange *)pNode->data; + pChange->type = TSDB_DATA_FILE_CHANGE; + + SDataFileChange *pDataFileChange = (SDataFileChange *)pChange->change; + pDataFileChange->ofgroup = pFGroup; + + if (tsdbLogFileChange(pCommitH, pChange) < 0) { + free(pNode); + return -1; + } + tdListPrependNode(pCommitH->pModLog, &pChange); + } else { + break; + } + } + + return 0; +} static int tsdbApplyFileChange(STsdbFileChange *pChange, bool isCommitEnd) { if (pChange->type == TSDB_META_FILE_CHANGE) { @@ -564,4 +583,14 @@ static int tsdbApplyFileChange(STsdbFileChange *pChange, bool isCommitEnd) { } return 0; +} + +static void tsdbSeekTSCommitHandle(STSCommitHandle *pTSCh, TSKEY key) { + for (int tid = 1; tid < pTSCh->maxIters; tid++) { + SCommitIter *pIter = pTSCh->pIters + tid; + if (pIter->pTable == NULL) continue; + + while (tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key, INT32_MAX, NULL, NULL, 0) != 0) { + } + } } \ No newline at end of file -- GitLab