提交 608cd044 编写于 作者: H Hongze Cheng

refactor more code

上级 da910465
......@@ -181,8 +181,8 @@ typedef struct {
} SFile;
typedef struct {
int fileId;
int state; // 0 for health, 1 for problem
int32_t fileId;
int32_t state; // 0 for health, 1 for problem
SFile files[TSDB_FILE_TYPE_MAX];
} SFileGroup;
......
......@@ -107,6 +107,8 @@ _err:
}
static void tsdbEndCommit(SCommitHandle *pCommitH, bool hasError) {
STsdbRepo *pRepo = pCommitH->pRepo;
// TODO: append commit over flag
if (false /* tsdbLogCommitOver(pCommitH) < 0 */) {
hasError = true;
......@@ -127,6 +129,11 @@ static void tsdbEndCommit(SCommitHandle *pCommitH, bool hasError) {
pCommitH->fd = -1;
remove(pCommitH->fname);
tdListFree(pCommitH->pModLog);
// notify uplayer to delete WAL
if (!hasError && pRepo->appH.notifyStatus) {
pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
}
return;
}
......@@ -162,27 +169,20 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) {
if (!tsdbHasDataToCommit(tsCommitH.pIters, pMem->maxTables, minKey, maxKey)) continue;
{ // TODO: Log file change
SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ);
if (pFGroup == NULL) {
} else {
}
if (tsdbLogTSFileChange(pCommitH, fid) < 0) {
tsdbDestroyTSCommitHandle(&tsCommitH);
return -1;
}
if (tsdbCommitToFile(pRepo, fid, &tsCommitH) < 0) {
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
tsdbError("vgId:%d error occurs while committing to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
tsdbDestroyTSCommitHandle(&tsCommitH);
return -1;
}
}
tsdbDestroyTSCommitHandle(&tsCommitH);
return 0;
_err:
tsdbDestroyTSCommitHandle(&tsCommitH);
return -1;
}
// Function to commit meta data
......@@ -289,7 +289,44 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
free(iters);
}
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, STSCommitHandle *pTSCh) {
static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup, STSCommitHandle *pTSCh) {
SRWHelper * pWHelper = &(pTSCh->whelper);
SCommitIter *iters = pTSCh->pIters;
if (tsdbHelperOpenFile(pWHelper) < 0) return -1;
if (tsdbLoadCompIdx(pWHelper, NULL) < 0) {
tsdbHelperCloseFile(pWHelper, true /* hasError = false */);
return -1;
}
for (int tid = 1; tid < pTSCh->maxIters; tid++) {
if (tsdbCommitTableData(pTSCh, tid) < 0) {
tsdbHelperCloseFile(pWHelper, true /* hasError = false */);
return -1;
}
if (tsdbTryMoveLastBlock(pTSCh) < 0) {
tsdbHelperCloseFile(pWHelper, true /* hasError = false */);
return -1;
}
if (tsdbWriteBlockInfo(pWHelper) < 0) {
tsdbHelperCloseFile(pWHelper, true /* hasError = false */);
return -1;
}
}
if (tsdbWriteBlockIdx(pWHelper) < 0) {
tsdbHelperCloseFile(pWHelper, true /* hasError = false */);
return -1;
}
tsdbHelperCloseFile(pWHelper, false /* hasError = false */);
return 0;
}
static int tsdbCommitToFile(STsdbRepo *pRepo, SFileGroup *pOldFGroup, SFileGroup *pNewFGroup, STSCommitHandle *pTSCh) {
char * dataDir = NULL;
STsdbCfg * pCfg = &pRepo->config;
STsdbFileH * pFileH = pRepo->tsdbFileH;
......@@ -299,18 +336,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, STSCommitHandle *pTSCh) {
SCommitIter *iters = pTSCh->pIters;
SRWHelper * pHelper = &(pTSCh->whelper);
SDataCols * pDataCols = pTSCh->pDataCols;
int fid = pOldFGroup->fileId;
// Create and open files for commit
dataDir = tsdbGetDataDirName(pRepo->rootDir);
if (dataDir == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) {
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
ASSERT(pOldFGroup->fileId == pNewFGroup->fileId);
// Open files for write/read
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
......@@ -372,21 +400,21 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, STSCommitHandle *pTSCh) {
taosTFree(dataDir);
tsdbCloseHelperFile(pHelper, 0, pGroup);
pthread_rwlock_wrlock(&(pFileH->fhlock));
// pthread_rwlock_wrlock(&(pFileH->fhlock));
(void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname);
pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
// (void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname);
// pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
if (newLast) {
(void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname);
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
} else {
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info;
}
// if (newLast) {
// (void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname);
// pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
// } else {
// pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info;
// }
pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info;
// pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info;
pthread_rwlock_unlock(&(pFileH->fhlock));
// pthread_rwlock_unlock(&(pFileH->fhlock));
return 0;
......@@ -488,12 +516,15 @@ static int tsdbEncodeFileChange(void **buf, STsdbFileChange *pChange) {
int tsize = 0;
if (pChange->type == TSDB_META_FILE_CHANGE) {
SMetaFileChange *pMetaChange = (SMetaFileChange *)pChange->change;
tsize += taosEncodeString(buf, pMetaChange->oname);
tsize += taosEncodeString(buf, pMetaChange->nname);
tsize += tdEncodeStoreInfo(buf, pMetaChange->info);
} else if (pChange->type == TSDB_DATA_FILE_CHANGE) {
SDataFileChange *pDataChange = (SDataFileChange *)pChange->change;
// TODO
tsize += tsdbEncodeSFileGroup(buf, &(pDataChange->ofgroup));
tsize += tsdbEncodeSFileGroup(buf, &(pDataChange->nfgroup));
} else {
ASSERT(false);
}
......@@ -506,6 +537,40 @@ static void *tsdbDecodeFileChange(void *buf, STsdbFileChange *pChange) {
return buf;
}
static int tsdbLogTSFileChange(SCommitHandle *pCommitH, int fid) {
STsdbRepo * pRepo = pCommitH->pRepo;
STsdbFileH *pFileH = pRepo->tsdbFileH;
SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(STsdbFileChange) + sizeof(SDataFileChange));
if (pNode == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
STsdbFileChange *pChange = (STsdbFileChange *)pNode->data;
pChange->type = TSDB_DATA_FILE_CHANGE;
SDataFileChange *pDataFileChange = (SDataFileChange *)pChange->change;
SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ);
if (pFGroup == NULL) {
pDataFileChange->ofgroup.fileId = fid;
} else {
pDataFileChange->ofgroup = *pFGroup;
}
tsdbGetNextCommitFileGroup(&(pDataFileChange->ofgroup), &(pDataFileChange->nfgroup));
if (tsdbLogFileChange(pCommitH, pChange) < 0) {
free(pNode);
return -1;
}
tdListAppendNode(pCommitH->pModLog, pNode);
return 0;
}
static int tsdbLogMetaFileChange(SCommitHandle *pCommitH) {
STsdbRepo *pRepo = pCommitH->pRepo;
SKVStore * pStore = pRepo->tsdbMeta->pStore;
......@@ -528,7 +593,7 @@ static int tsdbLogMetaFileChange(SCommitHandle *pCommitH) {
free(pNode);
return -1;
}
tdListPrependNode(pCommitH->pModLog, pNode);
tdListAppendNode(pCommitH->pModLog, pNode);
return 0;
}
......@@ -556,7 +621,7 @@ static int tsdbLogRetentionChange(SCommitHandle *pCommitH, int mfid) {
free(pNode);
return -1;
}
tdListPrependNode(pCommitH->pModLog, &pChange);
tdListAppendNode(pCommitH->pModLog, &pChange);
} else {
break;
}
......@@ -594,3 +659,85 @@ static void tsdbSeekTSCommitHandle(STSCommitHandle *pTSCh, TSKEY key) {
}
}
}
static int tsdbEncodeSFileGroup(void **buf, SFileGroup *pFGroup) {
int tsize = 0;
tsize += taosEncodeVariantI32(buf, pFGroup->fileId);
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
SFile *pFile = &(pFGroup->files[type]);
tsize += taosEncodeString(buf, pFile->fname);
tsize += tsdbEncodeSFileInfo(buf, &pFile->info);
}
return tsize;
}
static void *tsdbDecodeSFileGroup(void *buf, SFileGroup *pFGroup) {
buf = taosDecodeVariantI32(buf, &(pFGroup->fileId));
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
SFile *pFile = &(pFGroup->files[type]);
buf = taosDecodeString(buf, &(pFile->fname));
buf = tsdbDecodeSFileInfo(buf, &(pFile->info));
}
return buf;
}
static void tsdbGetNextCommitFileGroup(SFileGroup *pOldGroup, SFileGroup *pNewGroup) {
pNewGroup->fileId = pOldGroup->fileId;
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
SFile *pOldFile = &(pOldGroup->files[type]);
SFile *pNewFile = &(pNewGroup->files[type]);
size_t len =strlen(pOldFile->fname);
if (len == 0 || pOldFile->fname[len - 1] == '1') {
tsdbGetFileName(pRepo->rootDir, type, vid, pOldGroup->fileId, 0, pNewFile->fname);
} else {
tsdbGetFileName(pRepo->rootDir, type, vid, pOldGroup->fileId, 1, pNewFile->fname);
}
}
}
static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) {
SCommitIter *pIter = pTSCh->pIters + tid;
if (pIter->pTable == NULL) return 0;
taosRLockLatch(&(pIter->pTable->latch));
if (pIter->pIter == NULL) {
// TODO
}
if (tdInitDataCols(pTSCh->pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
if (tsdbReadBlockInfo() < 0) {
goto _err;
}
while (true) {
TSKEY keyNext = tsdbNextIterKey(pIter->pIter);
if (keyNext < 0 || keyNext > maxKey) break;
if (/* no block info exists*/ || keyNext > pIdx->maxKey) {
if (tsdbProcessAppendCommit() < 0) goto _err;
} else {
if (tsdbProcessMergeCommit() < 0) goto _err;
}
}
taosRUnLockLatch(&(pIter->pTable->latch));
return 0;
_err:
taosRUnLockLatch(&(pIter->pTable->latch));
return -1;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <fcntl.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "tchecksum.h"
#include "tsdbMain.h"
#define TSDB_MANIFEST_FILE_VERSION 0
#define TSDB_MANIFEST_FILE_HEADER_SIZE 128
#define TSDB_MANIFEST_END "C0D09F476DEF4A32B694A6A9E7B7B240"
#define TSDB_MANIFEST_END_SIZE 32
#define TSDB_MANIFEST_END_RECORD 0
#define TSDB_MANIFEST_META_RECORD 1
#define TSDB_MANIFEST_DATA_RECORD 2
typedef struct {
int type;
int len;
} SManifestRecord;
int tsdbInitManifestHandle(STsdbRepo *pRepo, SManifestHandle *pManifest) {
STsdbCfg *pCfg = &(pRepo->config);
pManifest->pBuffer = NULL;
pManifest->contSize = 0;
tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_MANIFEST, pCfg->tsdbId, 0, 0, &(pManifest->fname));
pManifest->fd = open(pManifest->fname, O_CREAT | O_APPEND, 0755);
if (pManifest->fd < 0) {
tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (tsdbWriteManifestHeader(pRepo, pManifest) < 0) {
tsdbCloseManifestHandle(pRepo, pManifest);
return -1;
}
return 0;
}
void tsdbCloseManifestHandle(SManifestHandle *pManifest) {
if (pManifest != NULL && pManifest->fd > 0) {
close(pManifest->fd);
pManifest->fd = -1;
}
remove(pManifest->fname);
taosTZfree(pManifest->pBuffer);
pManifest->pBuffer = NULL;
pManifest->contSize = 0;
return 0;
}
int tsdbAppendManifestRecord(SManifestHandle *pManifest, STsdbRepo *pRepo, int type) {
ASSERT(pManifest->pBuffer != NULL && taosTSizeof(pManifest->pBuffer) >= pManifest->contSize);
if (pManifest->contSize > 0) {
if (tsdbManifestMakeMoreRoom(pManifest, sizeof(TSCKSUM)) < 0) return -1;
pManifest->contSize += sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)pManifest->pBuffer, pManifest->contSize);
}
SManifestRecord mRecord = {.type = type, .len = pManifest->contSize};
// Write mRecord part
if (taosTWrite(pManifest->fd, (void *)(&mRecord), sizeof(mRecord)) < sizeof(mRecord)) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pRepo), sizeof(mRecord), pManifest->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
// Write buffer part
if (pManifest->contSize > 0 && taosTWrite(pManifest->fd, pManifest->pBuffer, pManifest->contSize) < pManifest->contSize) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pRepo), pManifest->contSize,
pManifest->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (fsync(pManifest->fd) < 0) {
tsdbError("vgId:%d failed to fsync file %s since %s", REPO_ID(pRepo), pManifest->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
int tsdbAppendManifestEnd(SManifestHandle *pManifest, STsdbRepo *pRepo) {
pManifest->contSize = 0;
return tsdbAppendManifestRecord(pManifest, pRepo, TSDB_MANIFEST_END_RECORD);
}
int tsdbManifestMakeRoom(SManifestHandle *pManifest, int expectedSize) {
pManifest->pBuffer = taosTRealloc(pManifest->pBuffer, expectedSize);
if (pManifest->pBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
return 0;
}
int tsdbManifestMakeMoreRoom(SManifestHandle *pManifest, int moreSize) {
return tsdbManifestMakeRoom(pManifest, pManifest->contSize + moreSize);
}
// TODO
bool tsdbIsManifestEnd(SManifestHandle *pManifest) {
SManifestRecord mRecord;
if (lseek(pManifest->fd, sizeof(mRecord), SEEK_END) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pManifest->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return false;
}
if (taosTRead(pManifest->fd, (void *)(&mRecord), sizeof(mRecord)) < 0) {
tsdbError("vgId:%d failed to read manifest end from file %s since %s", REPO_ID(pRepo), pManifest->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return false;
}
return (mRecord.type == TSDB_MANII)
}
int tsdbManifestRollBackOrForward(SManifestHandle *pManifest, bool isManifestEnd, STsdbRepo *pRepo) {
SManifestRecord mRecord;
if (lseek(pManifest->fd, TSDB_MANIFEST_FILE_HEADER_SIZE, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pManifest->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return;
}
while (true) {
ssize_t size = 0;
size = taosTRead(pManifest->fd, (void *)(&mRecord), sizeof(mRecord));
if (size < 0) {
tsdbError("vgId:%d failed to read SManifestRecord part from file %s since %s", REPO_ID(pRepo), pManifest->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (size < sizeof(mRecord)) break;
if ((mRecord.type != TSDB_MANIFEST_DATA_RECORD && mRecord.type != TSDB_MANIFEST_META_RECORD && mRecord.type != TSDB_MANIFEST_END_RECORD) || mRecord.len < 0) {
tsdbError("vgId:%d manifest file %s is broken since invalid mRecord content", REPO_ID(pRepo), pManifest->fname);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
if (mRecord.type == TSDB_MANIFEST_END_RECORD) {
ASSERT(isManifestEnd && mRecord.len == 0);
break;
}
if (tsdbManifestMakeRoom(pManifest, mRecord.len) < 0) return -1;
size = taosTRead(pManifest->fd, pManifest->pBuffer, mRecord.len);
if (size < 0) {
tsdbError("vgId:%d failed to read SManifestRecord content from file %s since %s", REPO_ID(pRepo), pManifest->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (size < mRecord.len) break;
if (!taosCheckChecksumWhole((uint8_t *)pManifest->pBuffer, size)) {
tsdbError("vgId:%d manifest file %s is broken since checksum error", REPO_ID(pRepo), pManifest->fname);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
if (mRecord.type == TSDB_MANIFEST_DATA_RECORD) {
// func1(pManifest->pBuffer, mRecord.len, isManifestEnd);
} else if (mRecord.type == TSDB_MANIFEST_META_RECORD) {
// func2(pManifest->pBuffer, mRecord.len, isManifestEnd);
} else {
ASSERT(0);
}
}
return 0;
}
int tsdbEncodeManifestRecord(SManifestHandle *pManifest) {
pManifest->contSize = 0;
}
static int tsdbEncodeManifestHeader(void **buffer) {
int len = taosEncodeFixedU32(buf, TSDB_MANIFEST_FILE_VERSION);
return len;
}
static void *tsdbDecodeManifestHeader(void *buffer, uint32_t version) {
buffer = taosDecodeFixedU32(buffer, &version);
return buffer;
}
static int tsdbWriteManifestHeader(STsdbRepo *pRepo, SManifestHandle *pManifest) {
char buffer[TSDB_MANIFEST_FILE_HEADER_SIZE] = "\0";
tsdbEncodeManifestHeader(&buffer);
taosCalcChecksumAppend(0, (uint8_t)buffer, TSDB_MANIFEST_FILE_HEADER_SIZE);
if (taosTWrite(pManifest->fd, buffer, TSDB_MANIFEST_FILE_HEADER_SIZE) < 0) {
tsdbError("vgId:%d failed to write file %s since %s", REPO_ID(pRepo), pManifest->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
\ No newline at end of file
......@@ -463,315 +463,315 @@ static void tsdbFreeTableData(STableData *pTableData) {
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(*(SDataRow *)data); }
static void *tsdbCommitData(void *arg) {
STsdbRepo * pRepo = (STsdbRepo *)arg;
SMemTable * pMem = pRepo->imem;
STsdbCfg * pCfg = &pRepo->config;
STsdbMeta * pMeta = pRepo->tsdbMeta;
SCommitHandle commitHandle = {0};
SCommitHandle *pCommitH = &commitHandle;
ASSERT(pRepo->commit == 1 && pMem != NULL);
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
pCommitH->pRepo = pRepo;
if (tsdbInitManifestHandle(pRepo, &(pCommitH->manifest)) < 0) goto _exit;
// Create the iterator to read from cache
if (pMem->numOfRows > 0) {
iters = tsdbCreateCommitIters(pRepo);
if (iters == NULL) {
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
if (tsdbInitWriteHelper(&(pCommitH->whelper), pRepo) < 0) {
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
goto _exit;
}
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
// Loop to commit to each file
for (int fid = sfid; fid <= efid; fid++) {
if (tsdbCommitToFile(pRepo, fid, iters, &(pCommitH->whelper), pDataCols) < 0) {
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _exit;
}
}
}
// Commit to update meta file
if (tsdbCommitMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
tsdbFitRetention(pRepo);
if (tsdbAppendManifestEnd(&pCommitH->manifest, pCommitH->pRepo) < 0) {
// TODO
}
tsdbApplyManifestAction(&pCommitH->manifest);
_exit:
tdFreeDataCols(pDataCols);
tsdbDestroyCommitIters(iters, pMem->maxTables);
tsdbCloseManifestHandle(&(pCommitH->manifest));
tsdbDestroyHelper(&(pCommitH->whelper));
tsdbEndCommit(pRepo);
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
return NULL;
}
static int tsdbCommitMeta(STsdbRepo *pRepo, SManifestHandle *pManifest) {
SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta;
SActObj * pAct = NULL;
SActCont * pCont = NULL;
if (listNEles(pMem->actList) > 0) {
pManifest->contSize = tdEncodeCommitAction(pMeta->pStore, &(pManifest->pBuffer));
if (tsdbAppendManifestRecord(pManifest, pRepo, TSDB_MANIFEST_META_RECORD) < 0) goto _err;
if (tdKVStoreStartCommit(pMeta->pStore) < 0) {
tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
SListNode *pNode = NULL;
while ((pNode = tdListPopHead(pMem->actList)) != NULL) {
pAct = (SActObj *)pNode->data;
if (pAct->act == TSDB_UPDATE_META) {
pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) {
tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
tstrerror(terrno));
tdKVStoreEndCommit(pMeta->pStore);
goto _err;
}
} else if (pAct->act == TSDB_DROP_META) {
if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) {
tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
tstrerror(terrno));
tdKVStoreEndCommit(pMeta->pStore);
goto _err;
}
} else {
ASSERT(false);
}
}
if (tdKVStoreEndCommit(pMeta->pStore) < 0) {
tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
}
return 0;
_err:
return -1;
}
static void tsdbEndCommit(STsdbRepo *pRepo) {
ASSERT(pRepo->commit == 1);
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
}
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
for (int i = 0; i < nIters; i++) {
TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
}
return 0;
}
// static void *tsdbCommitData(void *arg) {
// STsdbRepo * pRepo = (STsdbRepo *)arg;
// SMemTable * pMem = pRepo->imem;
// STsdbCfg * pCfg = &pRepo->config;
// STsdbMeta * pMeta = pRepo->tsdbMeta;
// SCommitHandle commitHandle = {0};
// SCommitHandle *pCommitH = &commitHandle;
// ASSERT(pRepo->commit == 1 && pMem != NULL);
// tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
// pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
// pCommitH->pRepo = pRepo;
// if (tsdbInitManifestHandle(pRepo, &(pCommitH->manifest)) < 0) goto _exit;
// // Create the iterator to read from cache
// if (pMem->numOfRows > 0) {
// iters = tsdbCreateCommitIters(pRepo);
// if (iters == NULL) {
// tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _exit;
// }
// if (tsdbInitWriteHelper(&(pCommitH->whelper), pRepo) < 0) {
// tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _exit;
// }
// if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
// REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
// goto _exit;
// }
// int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
// int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
// // Loop to commit to each file
// for (int fid = sfid; fid <= efid; fid++) {
// if (tsdbCommitToFile(pRepo, fid, iters, &(pCommitH->whelper), pDataCols) < 0) {
// tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
// goto _exit;
// }
// }
// }
// // Commit to update meta file
// if (tsdbCommitMeta(pRepo) < 0) {
// tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _exit;
// }
// tsdbFitRetention(pRepo);
// if (tsdbAppendManifestEnd(&pCommitH->manifest, pCommitH->pRepo) < 0) {
// // TODO
// }
// tsdbApplyManifestAction(&pCommitH->manifest);
// _exit:
// tdFreeDataCols(pDataCols);
// tsdbDestroyCommitIters(iters, pMem->maxTables);
// tsdbCloseManifestHandle(&(pCommitH->manifest));
// tsdbDestroyHelper(&(pCommitH->whelper));
// tsdbEndCommit(pRepo);
// tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
// return NULL;
// }
// static int tsdbCommitMeta(STsdbRepo *pRepo, SManifestHandle *pManifest) {
// SMemTable *pMem = pRepo->imem;
// STsdbMeta *pMeta = pRepo->tsdbMeta;
// SActObj * pAct = NULL;
// SActCont * pCont = NULL;
// if (listNEles(pMem->actList) > 0) {
// pManifest->contSize = tdEncodeCommitAction(pMeta->pStore, &(pManifest->pBuffer));
// if (tsdbAppendManifestRecord(pManifest, pRepo, TSDB_MANIFEST_META_RECORD) < 0) goto _err;
// if (tdKVStoreStartCommit(pMeta->pStore) < 0) {
// tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _err;
// }
// SListNode *pNode = NULL;
// while ((pNode = tdListPopHead(pMem->actList)) != NULL) {
// pAct = (SActObj *)pNode->data;
// if (pAct->act == TSDB_UPDATE_META) {
// pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
// if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) {
// tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
// tstrerror(terrno));
// tdKVStoreEndCommit(pMeta->pStore);
// goto _err;
// }
// } else if (pAct->act == TSDB_DROP_META) {
// if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) {
// tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
// tstrerror(terrno));
// tdKVStoreEndCommit(pMeta->pStore);
// goto _err;
// }
// } else {
// ASSERT(false);
// }
// }
// if (tdKVStoreEndCommit(pMeta->pStore) < 0) {
// tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _err;
// }
// }
// return 0;
// _err:
// return -1;
// }
// static void tsdbEndCommit(STsdbRepo *pRepo) {
// ASSERT(pRepo->commit == 1);
// if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
// }
// static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
// for (int i = 0; i < nIters; i++) {
// TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
// if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
// }
// return 0;
// }
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) {
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
}
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
char * dataDir = NULL;
STsdbCfg * pCfg = &pRepo->config;
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pGroup = NULL;
SMemTable * pMem = pRepo->imem;
bool newLast = false;
TSKEY minKey = 0, maxKey = 0;
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
// Check if there are data to commit to this file
int hasDataToCommit = tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey);
if (!hasDataToCommit) {
tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid);
return 0;
}
// Create and open files for commit
dataDir = tsdbGetDataDirName(pRepo->rootDir);
if (dataDir == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) {
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
// Open files for write/read
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
newLast = TSDB_NLAST_FILE_OPENED(pHelper);
if (tsdbLoadCompIdx(pHelper, NULL) < 0) {
tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
// Loop to commit data in each table
for (int tid = 1; tid < pMem->maxTables; tid++) {
SCommitIter *pIter = iters + tid;
if (pIter->pTable == NULL) continue;
taosRLockLatch(&(pIter->pTable->latch));
if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err;
if (pIter->pIter != NULL) {
if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
taosRUnLockLatch(&(pIter->pTable->latch));
tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
tstrerror(terrno));
goto _err;
}
}
taosRUnLockLatch(&(pIter->pTable->latch));
// Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
// Write the SCompBlock part
if (tsdbWriteCompInfo(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
}
if (tsdbWriteCompIdx(pHelper) < 0) {
tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
taosTFree(dataDir);
tsdbCloseHelperFile(pHelper, 0, pGroup);
pthread_rwlock_wrlock(&(pFileH->fhlock));
(void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname);
pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
if (newLast) {
(void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname);
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
} else {
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info;
}
pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info;
pthread_rwlock_unlock(&(pFileH->fhlock));
return 0;
_err:
taosTFree(dataDir);
tsdbCloseHelperFile(pHelper, 1, NULL);
return -1;
}
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta;
SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter));
if (iters == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
if (tsdbRLockRepoMeta(pRepo) < 0) goto _err;
// reference all tables
for (int i = 0; i < pMem->maxTables; i++) {
if (pMeta->tables[i] != NULL) {
tsdbRefTable(pMeta->tables[i]);
iters[i].pTable = pMeta->tables[i];
}
}
if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err;
for (int i = 0; i < pMem->maxTables; i++) {
if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) {
if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
tSkipListIterNext(iters[i].pIter);
}
}
return iters;
_err:
tsdbDestroyCommitIters(iters, pMem->maxTables);
return NULL;
}
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
if (iters == NULL) return;
for (int i = 1; i < maxTables; i++) {
if (iters[i].pTable != NULL) {
tsdbUnRefTable(iters[i].pTable);
tSkipListDestroyIter(iters[i].pIter);
}
}
free(iters);
}
// static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
// char * dataDir = NULL;
// STsdbCfg * pCfg = &pRepo->config;
// STsdbFileH *pFileH = pRepo->tsdbFileH;
// SFileGroup *pGroup = NULL;
// SMemTable * pMem = pRepo->imem;
// bool newLast = false;
// TSKEY minKey = 0, maxKey = 0;
// tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
// // Check if there are data to commit to this file
// int hasDataToCommit = tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey);
// if (!hasDataToCommit) {
// tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid);
// return 0;
// }
// // Create and open files for commit
// dataDir = tsdbGetDataDirName(pRepo->rootDir);
// if (dataDir == NULL) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// return -1;
// }
// if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) {
// tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
// goto _err;
// }
// // Open files for write/read
// if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
// tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _err;
// }
// newLast = TSDB_NLAST_FILE_OPENED(pHelper);
// if (tsdbLoadCompIdx(pHelper, NULL) < 0) {
// tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _err;
// }
// // Loop to commit data in each table
// for (int tid = 1; tid < pMem->maxTables; tid++) {
// SCommitIter *pIter = iters + tid;
// if (pIter->pTable == NULL) continue;
// taosRLockLatch(&(pIter->pTable->latch));
// if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err;
// if (pIter->pIter != NULL) {
// if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// goto _err;
// }
// if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
// taosRUnLockLatch(&(pIter->pTable->latch));
// tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
// TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
// tstrerror(terrno));
// goto _err;
// }
// }
// taosRUnLockLatch(&(pIter->pTable->latch));
// // Move the last block to the new .l file if neccessary
// if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
// tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _err;
// }
// // Write the SCompBlock part
// if (tsdbWriteCompInfo(pHelper) < 0) {
// tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno));
// goto _err;
// }
// }
// if (tsdbWriteCompIdx(pHelper) < 0) {
// tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
// goto _err;
// }
// taosTFree(dataDir);
// tsdbCloseHelperFile(pHelper, 0, pGroup);
// pthread_rwlock_wrlock(&(pFileH->fhlock));
// (void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname);
// pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
// if (newLast) {
// (void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname);
// pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
// } else {
// pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info;
// }
// pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info;
// pthread_rwlock_unlock(&(pFileH->fhlock));
// return 0;
// _err:
// taosTFree(dataDir);
// tsdbCloseHelperFile(pHelper, 1, NULL);
// return -1;
// }
// static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
// SMemTable *pMem = pRepo->imem;
// STsdbMeta *pMeta = pRepo->tsdbMeta;
// SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter));
// if (iters == NULL) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// return NULL;
// }
// if (tsdbRLockRepoMeta(pRepo) < 0) goto _err;
// // reference all tables
// for (int i = 0; i < pMem->maxTables; i++) {
// if (pMeta->tables[i] != NULL) {
// tsdbRefTable(pMeta->tables[i]);
// iters[i].pTable = pMeta->tables[i];
// }
// }
// if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err;
// for (int i = 0; i < pMem->maxTables; i++) {
// if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) {
// if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// goto _err;
// }
// tSkipListIterNext(iters[i].pIter);
// }
// }
// return iters;
// _err:
// tsdbDestroyCommitIters(iters, pMem->maxTables);
// return NULL;
// }
// static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
// if (iters == NULL) return;
// for (int i = 1; i < maxTables; i++) {
// if (iters[i].pTable != NULL) {
// tsdbUnRefTable(iters[i].pTable);
// tSkipListDestroyIter(iters[i].pIter);
// }
// }
// free(iters);
// }
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
ASSERT(pMemTable->maxTables < maxTables);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册