提交 15974edf 编写于 作者: H Hongze Cheng

more code

上级 e27b797f
......@@ -61,8 +61,6 @@ int tsdbCommitData(STsdbRepo *pRepo) {
if (tsdbCommitMetaData(pCommitH) < 0) goto _err;
if (tsdbApplyRetention(pCommitH) < 0) goto _err;
tsdbEndCommit(pCommitH, false);
return 0;
......@@ -80,7 +78,7 @@ static int tsdbStartCommit(SCommitHandle *pCommitH) {
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
pCommitH->pModLog = tdListNew(sizeof(void *));
pCommitH->pModLog = tdListNew(0);
if (pCommitH->pModLog == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
......@@ -119,11 +117,10 @@ static void tsdbEndCommit(SCommitHandle *pCommitH, bool hasError) {
SListNode *pNode = NULL;
while ((pNode = tdListPopHead(pCommitH->pModLog)) != NULL) {
STsdbFileChange *pChange = (STsdbFileChange *)(*(void **)pNode->data);
STsdbFileChange *pChange = (STsdbFileChange *)pNode->data;
tsdbApplyFileChange(pChange, !hasError);
free(pNode);
free(pChange);
}
close(pCommitH->fd);
......@@ -141,22 +138,7 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) {
STsdbFileH *pFileH = pRepo->tsdbFileH;
int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile);
for (int i = 0; i < pFileH->nFGroups; i++) {
SFileGroup *pFGroup = pFileH->pFGroup[i];
if (pFGroup->fileId < mfid) {
STsdbFileChange *pChange = (STsdbFileChange *)calloc(1, sizeof(STsdbFileChange) + sizeof(STsdbFileChange));
if (pChange == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
pChange->type = TSDB_DATA_FILE_CHANGE;
SDataFileChange *pDataFileChange = (SDataFileChange *)pChange->change;
pDataFileChange->ofgroup = pFGroup;
} else {
break;
}
}
if (tsdbLogRetentionChange(pCommitH, mfid) < 0) return -1;
if (pMem->numOfRows <= 0) return 0;
......@@ -173,14 +155,20 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) {
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
if (fid < mfid) {
// TODO: skip data in this file beyond retentioin and continue;
// Skip data in files beyond retention
tsdbSeekTSCommitHandle(&tsCommitH, maxKey);
continue;
}
if (!tsdbHasDataToCommit(tsCommitH.pIters, pMem->maxTables, minKey, maxKey)) continue;
{
// TODO: manifest log file group action
{ // TODO: Log file change
SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ);
if (pFGroup == NULL) {
} else {
}
}
if (tsdbCommitToFile(pRepo, fid, &tsCommitH) < 0) {
......@@ -248,11 +236,6 @@ static int tsdbCommitMetaData(SCommitHandle *pCommitH) {
return 0;
}
static int tsdbApplyRetention(SCommitHandle *pCommitH) {
// TODO
return 0;
}
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta;
......@@ -415,7 +398,10 @@ _err:
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
for (int i = 0; i < nIters; i++) {
TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
SCommitIter *pIter = iters + i;
if (pIter->pTable == NULL) continue;
TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
}
return 0;
......@@ -524,27 +510,60 @@ static int tsdbLogMetaFileChange(SCommitHandle *pCommitH) {
STsdbRepo *pRepo = pCommitH->pRepo;
SKVStore * pStore = pRepo->tsdbMeta->pStore;
STsdbFileChange *pChange = (STsdbFileChange *)calloc(1, sizeof(*pChange) + sizeof(SMetaFileChange));
if (pChange == NULL) {
SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(STsdbFileChange) + sizeof(SMetaFileChange));
if (pNode == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
STsdbFileChange *pChange = pNode->data;
pChange->type = TSDB_META_FILE_CHANGE;
SMetaFileChange *pMetaChange = (SMetaFileChange *)(pChange->change);
strncpy(pMetaChange->oname, pStore->fname, TSDB_FILENAME_LEN);
strncpy(pMetaChange->nname, pStore->fname, TSDB_FILENAME_LEN);
pMetaChange->info = pStore->info;
if (tsdbLogFileChange(pCommitH, pChange) < 0) {
free(pChange);
free(pNode);
return -1;
}
tdListPrepend(pCommitH->pModLog, &pChange);
tdListPrependNode(pCommitH->pModLog, pNode);
return 0;
}
static int
static int tsdbLogRetentionChange(SCommitHandle *pCommitH, int mfid) {
STsdbRepo * pRepo = pCommitH->pRepo;
STsdbFileH *pFileH = pRepo->tsdbFileH;
for (int i = 0; i < pFileH->nFGroups; i++) {
SFileGroup *pFGroup = pFileH->pFGroup[i];
if (pFGroup->fileId < mfid) {
SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(STsdbFileChange) + sizeof(SDataFileChange));
if (pNode == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
STsdbFileChange *pChange = (STsdbFileChange *)pNode->data;
pChange->type = TSDB_DATA_FILE_CHANGE;
SDataFileChange *pDataFileChange = (SDataFileChange *)pChange->change;
pDataFileChange->ofgroup = pFGroup;
if (tsdbLogFileChange(pCommitH, pChange) < 0) {
free(pNode);
return -1;
}
tdListPrependNode(pCommitH->pModLog, &pChange);
} else {
break;
}
}
return 0;
}
static int tsdbApplyFileChange(STsdbFileChange *pChange, bool isCommitEnd) {
if (pChange->type == TSDB_META_FILE_CHANGE) {
......@@ -564,4 +583,14 @@ static int tsdbApplyFileChange(STsdbFileChange *pChange, bool isCommitEnd) {
}
return 0;
}
static void tsdbSeekTSCommitHandle(STSCommitHandle *pTSCh, TSKEY key) {
for (int tid = 1; tid < pTSCh->maxIters; tid++) {
SCommitIter *pIter = pTSCh->pIters + tid;
if (pIter->pTable == NULL) continue;
while (tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key, INT32_MAX, NULL, NULL, 0) != 0) {
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册