提交 17581100 编写于 作者: H Hongze Cheng

more work

上级 5c796c8b
......@@ -241,6 +241,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "No table d
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "File already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, 0, 0x0611, "Need to reconfigure table")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, 0, 0x0612, "Invalid information to create table")
TAOS_DEFINE_ERROR(TSDB_TDB_NO_AVAIL_DISK, 0, 0x0613, "No available disk")
// query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "Invalid handle")
......
......@@ -56,6 +56,7 @@ typedef struct {
#define TFILE_LEVEL(pf) ((pf)->level)
#define TFILE_ID(pf) ((pf)->id)
#define TFILE_NAME(pf) ((pf)->aname)
#define TFILE_REL_NAME(pf) ((pf)->rname)
void tfsInitFile(TFILE *pf, int level, int id, const char *bname);
bool tfsIsSameFile(TFILE *pf1, TFILE *pf2);
......
......@@ -30,6 +30,8 @@ extern "C" {
#define TSDB_FILE_FULL_NAME(f) TFILE_NAME(TSDB_FILE_F(f))
#define TSDB_FILE_OPENED(f) (TSDB_FILE_FD(f) >= 0)
#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1)
#define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf))
#define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf))
typedef enum {
TSDB_FILE_HEAD = 0,
......@@ -214,13 +216,16 @@ typedef struct {
#define TSDB_FSET_FID(s) ((s)->fid)
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0))
void tsdbInitDFileSet(SDFileSet* pSet, int vid, int fid, int ver, int level, int id);
void tsdbInitDFileSetWithOld(SDFileSet* pSet, SDFileSet* pOldSet);
int tsdbOpenDFileSet(SDFileSet* pSet, int flags);
void tsdbCloseDFileSet(SDFileSet* pSet);
int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
int tsdbCopyDFileSet(SDFileSet* pFromSet, SDFileSet* pToSet);
int tsdbCopyDFileSet(SDFileSet src, int tolevel, int toid, SDFileSet* pDest);
int tsdbCopyDFileSet(SDFileSet src, int tolevel, int toid, SDFileSet* pDest);
#ifdef __cplusplus
}
......
......@@ -174,7 +174,6 @@ void tsdbGetStoreInfo(char* fname, uint32_t* magic, int64_t* size);
// int tsdbLoadFileHeader(SFile* pFile, uint32_t* version);
// void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size);
// void tsdbGetFidGroup(STsdbCfg* pCfg, SFidGroup* pFidGroup);
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
// int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup);
// ================= tsdbMain.c
......
......@@ -85,6 +85,7 @@ int tsdbAsyncCommit(STsdbRepo* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
void* tsdbCommitData(STsdbRepo* pRepo);
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY* minKey, TSKEY* maxKey);
static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
if (pIter == NULL) return NULL;
......
......@@ -19,6 +19,7 @@
// TODO: remove the include
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <inttypes.h>
#include <sys/stat.h>
#include <sys/types.h>
......@@ -84,6 +85,7 @@ struct STsdbRepo {
#define REPO_ID(r) (r)->config.tsdbId
#define REPO_CFG(r) (&((r)->config))
#define REPO_FS_VERSION(r) // TODO
#define IS_REPO_LOCKED(r) (r)->repoLocked
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
......
......@@ -12,10 +12,11 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbMain.h"
#include "tsdbint.h"
#define TSDB_IVLD_FID INT_MIN
#define TSDB_MAX_SUBBLOCKS 8
#define TSDB_KEY_FID(key, days, precision) ((key) / tsMsPerDay[(precision)] / (days))
typedef struct {
int minFid;
......@@ -25,28 +26,31 @@ typedef struct {
} SRtn;
typedef struct {
int version;
SRtn rtn; // retention snapshot
int niters;
SCommitIter *iters; // memory iterators
bool isRFileSet;
SReadH readh;
SDFileSet * pWSet;
SFSIter fsIter; // tsdb file iterator
int niters; // memory iterators
SCommitIter *iters;
SDFileSet wSet; // commit file
TSKEY minKey;
TSKEY maxKey;
SArray * aBlkIdx;
SArray * aSupBlk;
SArray * aSubBlk;
SArray * aBlkIdx; // SBlockIdx array
SArray * aSupBlk; // Table super-block array
SArray * aSubBlk; // table sub-block array
SDataCols * pDataCols;
} SCommitH;
#define TSDB_COMMIT_REPO(ch) TSDB_READ_REPO(&(ch->readh))
#define TSDB_COMMIT_REPO_ID(ch) REPO_ID(TSDB_READ_REPO(&(ch->readh)))
#define TSDB_COMMIT_WRITE_FSET(ch) ((ch)->pWSet)
#define TSDB_COMMIT_WRITE_FSET(ch) (&((ch)->wSet))
#define TSDB_COMMIT_TABLE(ch) TSDB_READ_TABLE(&(ch->readh))
#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD)
#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA)
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&(ch->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&(ch->readh))
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_DEFAULT_ROWS(ch) (TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock * 4 / 5)
void *tsdbCommitData(STsdbRepo *pRepo) {
......@@ -78,73 +82,7 @@ _err:
return NULL;
}
static int tsdbCommitTSData(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
STsdbCfg * pCfg = &(pRepo->config);
SCommitH ch = {0};
SFSIter fsIter = {0};
SDFileSet *pOldSet = NULL;
SDFileSet nSet;
int level, id;
int fid;
if (pMem->numOfRows <= 0) return 0;
// Resource initialization
if (tsdbInitCommitH(pRepo, &ch) < 0) {
// TODO
return -1;
}
tsdbInitFSIter(pRepo, &fsIter);
// Skip expired memory data and expired FSET
tsdbSeekCommitIter(ch.iters, pMem->maxTables, ch.rtn.minKey);
fid = tsdbNextCommitFid(ch.iters, pMem->maxTables);
while (true) {
pOldSet = tsdbFSIterNext(&fsIter);
if (pOldSet == NULL || pOldSet->fid >= ch.rtn.minFid) break;
}
// Loop to commit to each file
while (true) {
// Loop over both on disk and memory
if (pOldSet == NULL && fid == TSDB_IVLD_FID) break;
// Only has existing FSET but no memory data to commit in this
// existing FSET, only check if file in correct retention
if (pOldSet && (fid == TSDB_IVLD_FID || pOldSet->fid < fid)) {
if (tsdbApplyRtn(*pOldSet, &(ch.rtn), &nSet) < 0) {
return -1;
}
tsdbUpdateDFileSet(pRepo, &nSet);
pOldSet = tsdbFSIterNext(&fsIter);
continue;
}
SDFileSet *pCSet;
int cfid;
if (pOldSet == NULL || pOldSet->fid > fid) {
// Commit to a new FSET with fid: fid
pCSet = NULL;
cfid = fid;
} else {
// Commit to an existing FSET
pCSet = pOldSet;
cfid = pOldSet->fid;
pOldSet = tsdbFSIterNext(&fsIter);
}
fid = tsdbNextCommitFid(ch.iters, pMem->maxTables);
tsdbCommitToFile(pCSet, &ch, cfid);
}
tsdbDestroyCommitH(&ch, pMem->maxTables);
return 0;
}
// =================== Commit Meta Data
static int tsdbCommitMeta(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta;
......@@ -196,6 +134,93 @@ _err:
return -1;
}
// =================== Commit Time-Series Data
static int tsdbCommitTSData(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
STsdbCfg * pCfg = REPO_CFG(pRepo);
SCommitH ch = {0};
SDFileSet *pSet = NULL;
SDFileSet nSet;
int fid;
if (pMem->numOfRows <= 0) return 0;
// Resource initialization
if (tsdbInitCommitH(pRepo, &ch) < 0) {
return -1;
}
// Skip expired memory data and expired FSET
tsdbSeekCommitIter(&ch, ch.rtn.minKey);
while (true) {
pSet = tsdbFSIterNext(&(ch.fsIter));
if (pSet == NULL || pSet->fid >= ch.rtn.minFid) break;
}
// Loop to commit to each file
fid = tsdbNextCommitFid(&(ch));
while (true) {
// Loop over both on disk and memory
if (pSet == NULL && fid == TSDB_IVLD_FID) break;
if (pSet && (fid == TSDB_IVLD_FID || pSet->fid < fid)) {
// Only has existing FSET but no memory data to commit in this
// existing FSET, only check if file in correct retention
int level, id;
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(ch.rtn)), &level, &id);
if (level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_TDB_NO_AVAIL_DISK;
tsdbDestroyCommitH(&ch);
return -1;
}
if (level > TSDB_FSET_LEVEL(pSet)) {
if (tsdbCopyDFileSet(*pSet, level, id, &nSet) < 0) {
tsdbDestroyCommitH(&ch);
return -1;
}
if (tsdbUpdateDFileSet(pRepo, &nSet) < 0) {
tsdbDestroyCommitH(&ch);
return -1;
}
} else {
if (tsdbUpdateDFileSet(pRepo, pSet) < 0) {
tsdbDestroyCommitH(&ch);
return -1;
}
}
pSet = tsdbFSIterNext(&(ch.fsIter));
} else {
// Has memory data to commit
SDFileSet *pCSet;
int cfid;
if (pSet == NULL || pSet->fid > fid) {
// Commit to a new FSET with fid: fid
pCSet = NULL;
cfid = fid;
} else {
// Commit to an existing FSET
pCSet = pSet;
cfid = pSet->fid;
pSet = tsdbFSIterNext(&(ch.fsIter));
}
fid = tsdbNextCommitFid(&ch);
if (tsdbCommitToFile(pCSet, &ch, cfid) < 0) {
tsdbDestroyCommitH(&ch);
return -1;
}
}
}
tsdbDestroyCommitH(&ch);
return 0;
}
static int tsdbStartCommit(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
......@@ -234,163 +259,210 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS
return false;
}
static int tsdbCommitToFile(SCommitH *pch, SDFileSet *pOldSet, int fid) {
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
int level, id;
int nSet, ver;
STsdbRepo *pRepo;
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo);
ASSERT(pSet == NULL || pSet->fid == fid);
ASSERT(pOldSet == NULL || pOldSet->fid == fid);
tsdbResetCommitFile(pCommith);
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey));
tfsAllocDisk(tsdbGetFidLevel(fid, &(pch->rtn)), &level, &id);
tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &level, &id);
if (level == TFS_UNDECIDED_LEVEL) {
// TODO
terrno = TSDB_TDB_NO_AVAIL_DISK;
return -1;
}
if (pOldSet == NULL || level > TSDB_FSET_LEVEL(pOldSet)) {
// Create new fset to commit
tsdbInitDFileSet(&nSet, pRepo, fid, ver, level, id);
if (tsdbOpenDFileSet(&nSet, O_WRONLY | O_CREAT) < 0) {
// TODO:
return -1;
}
if (tsdbUpdateDFileSetHeader(&nSet) < 0) {
// TODO
return -1;
}
// Set commit file
if (pSet == NULL || level > TSDB_FSET_LEVEL(pSet)) {
tsdbInitDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith), REPO_ID(pRepo), fid, pCommith->version, level, id);
} else {
level = TSDB_FSET_LEVEL(pOldSet);
tsdbInitDFile(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_HEAD), ...);
tsdbInitDFileWithOld(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_DATA), TSDB_DFILE_IN_SET(pOldSet, TSDB_FILE_DATA))
SDFile *pDFile = TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_LAST);
if (pDFile->info.size < 32 * 1024 * 1024) {
tsdbInitDFileWithOld(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_LAST), TSDB_DFILE_IN_SET(pOldSet, TSDB_FILE_LAST))
level = TSDB_FSET_LEVEL(pSet);
id = TSDB_FSET_ID(pSet);
// TSDB_FILE_HEAD
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
tsdbInitDFile(pWHeadf, REPO_ID(pRepo), fid, pCommith->version, level, id, NULL, TSDB_FILE_HEAD);
// TSDB_FILE_DATA
SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh));
SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith);
tsdbInitDFileWithOld(pWDataf, pRDataf);
// TSDB_FILE_LAST
SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh));
SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith);
if (pRLastf->info.size < 32 * 1024) {
tsdbInitDFileWithOld(pWLastf, pRLastf);
} else {
tsdbInitDFile(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_LAST), ...);
tsdbInitDFile(pWLastf, REPO_ID(pRepo), fid, pCommith->version, level, id, NULL, TSDB_FILE_LAST);
}
tsdbOpenDFileSet(&nSet, O_WRONLY | O_CREAT);
// TODO: update file header
}
tsdbSetCommitFile(pch, pOldSet, &nSet);
// Open commit file
if (tsdbOpenCommitFile(pCommith, pSet) < 0) {
return -1;
}
for (size_t tid = 0; tid < pMem->maxTables; tid++) {
SCommitIter *pIter = pch->iters + tid;
// Loop to commit each table data
for (int tid = 0; tid < pCommith->niters; tid++) {
SCommitIter *pIter = pCommith->iters + tid;
// No table exists, continue
if (pIter->pTable == NULL) continue;
if (tsdbCommitToTable(pch, tid) < 0) {
// TODO
if (tsdbCommitToTable(pCommith, tid) < 0) {
// TODO: revert the file change
tsdbCloseCommitFile(pCommith, true);
return -1;
}
}
tsdbUpdateDFileSet(pRepo, &wSet);
tsdbCloseCommitFile(pCommith, false);
if (tsdbUpdateDFileSet(pRepo, &(pCommith->wSet)) < 0) {
// TODO
return -1;
}
return 0;
}
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
static SCommitIter *tsdbCreateCommitIters(SCommitH *pCommith) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta;
SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter));
if (iters == NULL) {
pCommith->niters = pMem->maxTables;
pCommith->iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter));
if (pCommith->iters == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
return -1;
}
if (tsdbRLockRepoMeta(pRepo) < 0) goto _err;
if (tsdbRLockRepoMeta(pRepo) < 0) return -1
// reference all tables
for (int i = 0; i < pMem->maxTables; i++) {
if (pMeta->tables[i] != NULL) {
tsdbRefTable(pMeta->tables[i]);
iters[i].pTable = pMeta->tables[i];
pCommith->iters[i].pTable = pMeta->tables[i];
}
}
if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err;
if (tsdbUnlockRepoMeta(pRepo) < 0) return -1;
for (int i = 0; i < pMem->maxTables; i++) {
if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) {
if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
if ((pCommith->iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(pCommith->iters[i].pTable) == pMem->tData[i]->uid)) {
if ((pCommith->iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
return -1;
}
tSkipListIterNext(iters[i].pIter);
tSkipListIterNext(pCommith->iters[i].pIter);
}
}
return iters;
_err:
tsdbDestroyCommitIters(iters, pMem->maxTables);
return NULL;
return 0;
}
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
if (iters == NULL) return;
static void tsdbDestroyCommitIters(SCommitH *pCommith) {
if (pCommith->iters == NULL) return;
for (int i = 1; i < maxTables; i++) {
if (iters[i].pTable != NULL) {
tsdbUnRefTable(iters[i].pTable);
for (int i = 1; i < pCommith->niters; i++) {
if (pCommith->iters[i].pTable != NULL) {
tsdbUnRefTable(pCommith->iters[i].pTable);
tSkipListDestroyIter(iters[i].pIter);
}
}
free(iters);
free(pCommith->iters);
pCommith->iters = NULL;
}
static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key) {
for (int i = 0; i < nIters; i++) {
SCommitIter *pIter = pIters + i;
// Skip all keys until key (not included)
static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
for (int i = 0; i < pCommith->niters; i++) {
SCommitIter *pIter = pCommith->iters + i;
if (pIter->pTable == NULL) continue;
if (pIter->pIter == NULL) continue;
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key-1, INT32_MAX, NULL, NULL, 0, true, NULL);
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key - 1, INT32_MAX, NULL, NULL, 0, true, NULL);
}
}
static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pch) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbCfg * pCfg = &(pRepo->config);
static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pCommith) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
memset(pCommith, 0, sizeof(*pCommith));
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(pCommith), ftype));
}
pch->iters = tsdbCreateCommitIters(pRepo);
if (pch->iters == NULL) {
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
pCommith->version = REPO_FS_VERSION(pRepo) + 1;
tsdbGetRtnSnap(pRepo, &(pCommith->rtn));
// Init read handle
if (tsdbInitReadH(&(pCommith->readh), pRepo) < 0) {
return -1;
}
if (tsdbInitWriteHelper(&(pch->whelper), pRepo) < 0) {
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
// Init file iterator
if (tsdbInitFSIter(pRepo, &(pCommith->fsIter)) < 0) {
tsdbDestroyCommitH(pCommith);
return -1;
}
if (tsdbCreateCommitIters(pCommith) < 0) {
tsdbDestroyCommitH(pCommith);
return -1;
}
pCommith->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
if (pCommith->aBlkIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCommitH(pCommith);
return -1;
}
pCommith->aSupBlk = taosArrayInit(1024, sizeof(SBlock));
if (pCommith->aSupBlk == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCommitH(pCommith);
return -1;
}
if ((pch->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
pCommith->aSubBlk = taosArrayInit(1024, sizeof(SBlock));
if (pCommith->aSubBlk == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
tsdbDestroyCommitH(pCommith);
return -1;
}
pCommith->pDataCols = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock);
if (pCommith->pDataCols == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCommitH(pCommith);
return -1;
}
return 0;
}
static void tsdbDestroyCommitH(SCommitH *pch, int niter) {
tdFreeDataCols(pch->pDataCols);
tsdbDestroyCommitIters(pch->iters, niter);
tsdbDestroyHelper(&(pch->whelper));
static void tsdbDestroyCommitH(SCommitH *pCommith) {
pCommith->pDataCols = tdFreeDataCols(pCommith->pDataCols);
pCommith->aSubBlk = taosArrayDestroy(pCommith->aSubBlk);
pCommith->aSupBlk = taosArrayDestroy(pCommith->aSupBlk);
pCommith->aBlkIdx = taosArrayDestroy(pCommith->aBlkIdx);
tsdbDestroyCommitIters(pCommith);
tsdbDestroyReadH(&(pCommith->readh));
tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
}
static void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) {
STsdbCfg *pCfg = &(pRepo->config);
STsdbCfg *pCfg = REPO_CFG(pRepo);
TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision);
......@@ -399,9 +471,9 @@ static void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) {
maxKey = now - pCfg->keep1 * tsMsPerDay[pCfg->precision];
pRtn->minKey = minKey;
pRtn->minFid = TSDB_KEY_FILEID(minKey, pCfg->daysPerFile, pCfg->precision);
pRtn->midFid = TSDB_KEY_FILEID(midKey, pCfg->daysPerFile, pCfg->precision);
pRtn->maxFid = TSDB_KEY_FILEID(maxKey, pCfg->daysPerFile, pCfg->precision);
pRtn->minFid = TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision);
pRtn->midFid = TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision);
pRtn->maxFid = TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision);
}
static int tsdbGetFidLevel(int fid, SRtn *pRtn) {
......@@ -416,146 +488,154 @@ static int tsdbGetFidLevel(int fid, SRtn *pRtn) {
}
}
static int tsdbNextCommitFid(SCommitIter *iters, int niters) {
int fid = TSDB_IVLD_FID;
static int tsdbNextCommitFid(SCommitH *pCommith) {
SCommitIter *pIter;
STsdbRepo * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo);
int fid = TSDB_IVLD_FID;
// TODO
for (int i = 0; i < pCommith->niters; i++) {
pIter = pCommith->iters + i;
if (pIter->pTable == NULL || pIter->pIter == NULL) continue;
TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
if (nextKey == TSDB_DATA_TIMESTAMP_NULL) {
continue;
} else {
int tfid = TSDB_KEY_FID(nextKey, pCfg->daysPerFile, pCfg->precision);
if (fid == TSDB_IVLD_FID || fid > tfid) {
fid = tfid;
}
}
}
return fid;
}
static int tsdbApplyRtn(const SDFileSet oSet, const SRtn *pRtn, SDFileSet *pRSet) {
int level, id;
int vid, ver;
static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
SCommitIter *pIter = pCommith->iters + tid;
if (pIter->pTable == NULL) return 0;
tfsAllocDisk(tsdbGetFidLevel(oSet.fid, pRtn), &level, &id);
TSDB_RLOCK_TABLE(pIter->pTable);
if (level == TFS_UNDECIDED_LEVEL) {
// terrno = TSDB_CODE_TDB_NO_AVAILABLE_DISK;
// Set commit table
tsdbResetCommitTable(pCommith);
if (tsdbSetCommitTable(pCommith, pIter->pTable) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
if (level > TSDB_FSET_LEVEL(pSet)) {
tsdbInitDFileSet(pRSet, vid, TSDB_FSET_FID(&oSet), ver, level, id);
if (tsdbCopyDFileSet(&oSet, pRSet) < 0) {
return -1;
}
} else {
tsdbInitDFileSetWithOld(pRSet, &oSet);
}
return 0;
}
static int tsdbCommitToTable(SCommitH *pch, int tid) {
SCommitIter *pIter = pch->iters + tid;
if (pIter->pTable == NULL) return 0;
if (!pCommith->isRFileSet) {
if (pIter->pIter == NULL) {
// No memory data
TSDB_RUNLOCK_TABLE(pIter->pTable);
return 0;
} else {
// TODO: think about no data committed at all
if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, true) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
TSDB_RLOCK_TABLE(pIter->pTable);
TSDB_RUNLOCK_TABLE(pIter->pTable);
if (tsdbWriteBlockInfo(pCommith) < 0) {
return -1;
}
tsdbSetCommitTable(pch, pIter->pTable);
return 0;
}
}
// No memory data and no disk data, just return
if (pIter->pIter == NULL && pch->readh.pBlkIdx == NULL) {
if (pIter->pIter == NULL && pCommith->readh.pBlkIdx == NULL) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return 0;
}
if (tsdbLoadBlockInfo(&(pch->readh), NULL) < 0) {
if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
// Process merge commit
int nBlocks = (pch->readh.pBlkIdx == NULL) ? 0 : pch->readh.pBlkIdx->numOfBlocks;
int nBlocks = (pCommith->readh.pBlkIdx == NULL) ? 0 : pCommith->readh.pBlkIdx->numOfBlocks;
TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
int cidx = 0;
void * ptr = NULL;
SBlock *pBlock;
if (cidx < nBlocks) {
pBlock = pch->readh.pBlkInfo->blocks + cidx;
pBlock = pCommith->readh.pBlkInfo->blocks + cidx;
} else {
pBlock = NULL;
}
while (true) {
if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pch->maxKey) && (pBlock == NULL)) break;
if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) && (pBlock == NULL)) break;
if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pch->maxKey) ||
if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) ||
(pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) {
if (tsdbMoveBlock(pch, cidx) < 0) {
if (tsdbMoveBlock(pCommith, cidx) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
cidx++;
if (cidx < nBlocks) {
pBlock = pch->readh.pBlkInfo->blocks + cidx;
pBlock = pCommith->readh.pBlkInfo->blocks + cidx;
} else {
pBlock = NULL;
}
} else if ((cidx < nBlocks) && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) {
if (tsdbMergeMemData(pch, pIter, cidx) < 0) {
if (tsdbMergeMemData(pCommith, pIter, cidx) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
cidx++;
if (cidx < nBlocks) {
pBlock = pch->readh.pBlkInfo->blocks + cidx;
pBlock = pCommith->readh.pBlkInfo->blocks + cidx;
} else {
pBlock = NULL;
}
nextKey = tsdbNextIterKey(pIter->pIter);
} else {
if (pBlock == NULL) {
if (tsdbCommitMemData(pch, pIter, pch->maxKey, false) < 0) {
if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
nextKey = tsdbNextIterKey(pIter->pIter);
} else {
if (tsdbCommitMemData(pch, pIter, pBlock->keyFirst-1, true) < 0) {
if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst-1, true) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
nextKey = tsdbNextIterKey(pIter->pIter);
}
}
#if 0
if (/* Key end */) {
tsdbMoveBlock(); =============
} else {
if (/*block end*/) {
// process append commit until pch->maxKey >>>>>>>
} else {
if (pBlock->last) {
// TODO: merge the block ||||||||||||||||||||||
} else {
if (pBlock > nextKey) {
// process append commit until pBlock->keyFirst-1 >>>>>>
} else if (pBlock < nextKey) {
// tsdbMoveBlock() ============
} else {
// merge the block ||||||||||||
}
}
}
}
#endif
}
TSDB_RUNLOCK_TABLE(pIter->pTable);
if (tsdbWriteBlockInfo(pch) < 0) return -1;
if (tsdbWriteBlockInfo(pCommith) < 0) return -1;
return 0;
}
static int tsdbSetCommitTable(SCommitH *pch, STable *pTable) {
// TODO
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (pCommith->isRFileSet) {
if (tsdbSetReadTable(&(pCommith->readh), pTable) < 0) {
return -1;
}
}
return 0;
}
......@@ -572,18 +652,9 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
}
}
static int tsdbAppendCommit(SCommitIter *pIter, TSKEY keyEnd) {
// TODO
return 0;
}
static int tsdbMergeCommit(SCommitIter *pIter, SBlock *pBlock, TSKEY keyEnd) {
// TODO
return 0;
}
static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuper) {
// TODO
STsdbCfg * pCfg = &(pHelper->pRepo->config);
SBlockData *pCompData = (SBlockData *)(pHelper->pBuffer);
int64_t offset = 0;
......@@ -1084,4 +1155,46 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
if (pTarget->numOfRows >= maxRows) break;
}
}
static void tsdbResetCommitFile(SCommitH *pCommith) {
tsdbResetCommitTable(pCommith);
taosArrayClear(pCommith->aBlkIdx);
}
static void tsdbResetCommitTable(SCommitH *pCommith) {
tdResetDataCols(pCommith->pDataCols);
taosArrayClear(pCommith->aSubBlk);
taosArrayClear(pCommith->aSupBlk);
}
static int tsdbOpenCommitFile(SCommitH *pCommith, SDFileSet *pRSet) {
if (pRSet == NULL) {
pCommith->isRFileSet = false;
} else {
pCommith->isRFileSet = true;
if (tsdbSetAndOpenReadFSet(&(pCommith->readh), pRSet) < 0) {
return -1;
}
}
if (tsdbOpenDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith), O_WRONLY | O_CREAT) < 0) {
return -1;
}
return 0;
}
static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) {
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
}
if (!hasError) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(pCommith), ftype);
fsync(TSDB_FILE_FD(pDFile));
}
}
tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
}
\ No newline at end of file
......@@ -13,10 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <unistd.h>
#include "tsdbMain.h"
#include "tsdbint.h"
#define REPO_FS(r) ((r)->fs)
#define TSDB_MAX_DFILES(keep, days) ((keep) / (days) + 3)
......
......@@ -118,6 +118,19 @@ void *tsdbDecodeSDFile(void *buf, SDFile *pDFile) {
return buf;
}
static int tsdbCopyDFile(SDFile *pSrc, int tolevel, int toid, SDFile *pDest) {
TSDB_FILE_SET_CLOSED(pDest);
pDest->info = pSrc->info;
tfsInitFile(TSDB_FILE_F(pDest), tolevel, toid, TFILE_REL_NAME(TSDB_FILE_F(pSrc)));
if (taosCopy(TSDB_FILE_FULL_NAME(pSrc), TSDB_FILE_FULL_NAME(pDest)) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return -1;
}
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
int tlen = 0;
......@@ -184,8 +197,21 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) {
return 0;
}
int tsdbCopyDFileSet(SDFileSet *pFromSet, SDFileSet *pToSet) {
// return 0;
int tsdbCopyDFileSet(SDFileSet src, int tolevel, int toid, SDFileSet *pDest) {
ASSERT(tolevel > TSDB_FSET_LEVEL(&src));
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbCopyDFile(TSDB_DFILE_IN_SET(&src, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) {
while (ftype >= 0) {
remove(TSDB_FILE_FULL_NAME(TSDB_DFILE_IN_SET(pDest, ftype)));
ftype--;
}
return -1;
}
}
return 0;
}
static void tsdbGetFilename(int vid, int fid, int64_t ver, TSDB_FILE_T ftype, char *fname) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册