提交 313fd6b2 编写于 作者: H Hongze Cheng

partial work

上级 a1139e80
......@@ -350,6 +350,7 @@ typedef struct {
void tsdbInitDFile(SDFile* pDFile, int vid, int fid, int ver, int level, int id, const SDFInfo* pInfo,
TSDB_FILE_T ftype);
void tsdbInitDFileWithOld(SDFile* pDFile, SDFile* pOldDFile);
int tsdbOpenDFile(SDFile* pDFile, int flags);
void tsdbCloseDFile(SDFile* pDFile);
int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence);
......@@ -627,57 +628,9 @@ static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
return pBufBlock;
}
// ================= tsdbRWHelper.c
typedef struct {
int32_t tid;
uint32_t len;
uint32_t offset;
uint32_t hasLast : 2;
uint32_t numOfBlocks : 30;
uint64_t uid;
TSKEY maxKey;
} SBlockIdx;
typedef struct {
int64_t last : 1;
int64_t offset : 63;
int32_t algorithm : 8;
int32_t numOfRows : 24;
int32_t len;
int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
int16_t numOfSubBlocks;
int16_t numOfCols; // not including timestamp column
TSKEY keyFirst;
TSKEY keyLast;
} SBlock;
#include "tsdbReadImpl.h"
typedef struct {
int32_t delimiter; // For recovery usage
int32_t tid;
uint64_t uid;
SBlock blocks[];
} SBlockInfo;
typedef struct {
int16_t colId;
int32_t len;
int32_t type : 8;
int32_t offset : 24;
int64_t sum;
int64_t max;
int64_t min;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
char padding[2];
} SBlockCol;
typedef struct {
int32_t delimiter; // For recovery usage
int32_t numOfCols; // For recovery usage
uint64_t uid; // For recovery usage
SBlockCol cols[];
} SBlockData;
// ================= tsdbRWHelper.c
typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_READ_IMPL_H_
#define _TD_TSDB_READ_IMPL_H_
#include "taosdef.h"
#include "tdataformat.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SReadH SReadH;
typedef struct {
int32_t tid;
uint32_t len;
uint32_t offset;
uint32_t hasLast : 2;
uint32_t numOfBlocks : 30;
uint64_t uid;
TSKEY maxKey;
} SBlockIdx;
typedef struct {
int64_t last : 1;
int64_t offset : 63;
int32_t algorithm : 8;
int32_t numOfRows : 24;
int32_t len;
int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
int16_t numOfSubBlocks;
int16_t numOfCols; // not including timestamp column
TSKEY keyFirst;
TSKEY keyLast;
} SBlock;
typedef struct {
int32_t delimiter; // For recovery usage
int32_t tid;
uint64_t uid;
SBlock blocks[];
} SBlockInfo;
typedef struct {
int16_t colId;
int32_t len;
int32_t type : 8;
int32_t offset : 24;
int64_t sum;
int64_t max;
int64_t min;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
char padding[2];
} SBlockCol;
typedef struct {
int32_t delimiter; // For recovery usage
int32_t numOfCols; // For recovery usage
uint64_t uid; // For recovery usage
SBlockCol cols[];
} SBlockData;
struct SReadH {
STsdbRepo * pRepo;
SDFileSet * pSet;
SArray * aBlkIdx;
int cidx;
STable * pTable;
SBlockIdx * pBlockIdx;
SBlockInfo *pBlkInfo;
SBlockData *pBlkData;
SDataCols * pDCols[2];
void * pBuf;
void * pCBuf;
};
#define TSDB_READ_REPO(rh) (rh)->pRepo
#define TSDB_READ_FSET(rh) (rh)->pSet
#define TSDB_READ_BUF(rh) (rh)->pBuf
#define TSDB_READ_COMP_BUF(rh) (rh)->pCBuf
#define TSDB_READ_FSET_IS_SET(rh) ((rh)->pSet != NULL)
#ifdef __cplusplus
}
#endif
#endif /*_TD_TSDB_READ_IMPL_H_*/
\ No newline at end of file
......@@ -14,6 +14,8 @@
*/
#include "tsdbMain.h"
#define TSDB_IVLD_FID INT_MIN
typedef struct {
int minFid;
int midFid;
......@@ -77,6 +79,7 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
SCommitH ch = {0};
SFSIter fsIter = {0};
SDFileSet *pOldSet = NULL;
int fid;
if (pMem->numOfRows <= 0) return 0;
......@@ -84,20 +87,30 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
return -1;
}
// TODO
int sfid = MIN(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision), 1 /*TODO*/);
int efid = MAX(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision), 1 /*TODO*/);
tsdbSeekCommitIter(ch.iters, pMem->maxTables, ch.rtn.minKey);
tsdbInitFSIter(pRepo, &fsIter);
pOldSet = tsdbFSIterNext(&fsIter);
for (int fid = sfid; fid <= efid; fid++) {
if (tsdbCommitToFile(pRepo, pOldSet, &ch, fid) < 0) {
tsdbDestroyCommitH(&ch, pMem->maxTables);
return -1;
}
if (pOldSet != NULL && pOldSet->fid == fid) {
fid = tsdbNextCommitFid(ch.iters, pMem->maxTables);
while (true) {
if (pOldSet == NULL && fid == TSDB_IVLD_FID) break;
if (pOldSet == NULL || (fid != TSDB_IVLD_FID && pOldSet->fid > fid)) {
ASSERT(fid >= ch.rtn.minFid);
// commit to new SDFileSet fid
tsdbCommitToFile(pRepo, NULL, &ch, fid);
fid = tsdbNextCommitFid(ch.iters, pMem->maxTables);
} else if (fid != TSDB_IVLD_FID && pOldSet->fid == fid) {
ASSERT(fid >= ch.rtn.minFid);
// commit to fid with old SDFileSet
tsdbCommitToFile(pRepo, pOldSet, &ch, fid);
fid = tsdbNextCommitFid(ch.iters, pMem->maxTables);
pOldSet = tsdbFSIterNext(&fsIter);
} else {
// check if pOldSet need to be changed
tsdbCommitToFile(pRepo, pOldSet, &ch, TSDB_IVLD_FID);
pOldSet = tsdbFSIterNext(&fsIter)
}
}
......@@ -195,115 +208,64 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS
}
static int tsdbCommitToFile(STsdbRepo *pRepo, SDFileSet *pOldSet, SCommitH *pch, int fid) {
STsdbCfg * pCfg = &(pRepo->config);
SMemTable *pMem = pRepo->imem;
TSKEY minKey, maxKey;
bool hasData;
SDFileSet rSet, wSet;
SDFileSet rSet;
SDFileSet wSet;
int level;
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
hasData = tsdbHasDataToCommit(pch->iters, pMem->maxTables, minKey, maxKey);
if (pOldSet == NULL || pOldSet->fid != fid) { // need to create SDFileSet and commit
if (!hasData) return 0;
tsdbInitDFileSet(&wSet, REPO_ID(pRepo), fid, 0/*TODO*/, level, TFS_UNDECIDED_ID);
tsdbOpenDFileSet(&wSet, O_WRONLY | O_CREAT);
tsdbUpdateDFileSetHeader(&wSet);
} else {
int level = tsdbGetFidLevel(fid, &(pch->rtn));
// Check if SDFileSet expires
if (level < 0) {
if (hasData) {
tsdbSeekCommitIter(pch->iters, pMem->maxTables, maxKey + 1);
}
return 0;
}
// TODO: Check if SDFileSet in correct level
if (true /*pOldSet level is not the same as level*/) {
tsdbInitDFileSet(&rSet, REPO_ID(pRepo), fid, 0/*TODO*/, level, TFS_UNDECIDED_ID);
// TODO: check if level is correct
tsdbOpenDFileSet(&wSet, O_WRONLY|O_CREAT);
}
}
// TODO: close the file set
if (!hasData) {
tsdbUpdateDFileSet(pRepo, &rSet);
if (pOldSet && pOldSet->fid < pch->rtn.minFid) { // file is deleted
ASSERT(fid == TSDB_IVLD_FID);
return 0;
}
{
// TODO: commit the memory data
}
if (tsdbUpdateDFileSet(pRepo, &wSet) < 0) {
return -1;
}
return 0;
// if (pOldSet) {
// ASSERT(fid == TSDB_IVLD_FID || pOldSet->fid == fid);
// if (true /* TODO: pOldSet not in correct level*/) {
// // TODO: Check if pOldSet is on correct level, if not, move it to correct level
// } else {
// tsdbInitDFile(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_HEAD), REPO_ID(pRepo), fid, 0 /*TODO*/, 0 /*TODO*/, 0
// /*TODO*/,
// NULL, TSDB_FILE_HEAD);
// // TODO: init data
// tsdbInitDFileWithOld(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_DATA), TSDB_DFILE_IN_SET(pOldSet, TSDB_FILE_DATA));
// // TODO: init last file
// SDFile *pDFile = TSDB_DFILE_IN_SET(pOldSet, TSDB_FILE_LAST);
// if (pDFile->info->size < 32K) {
// } else {
// }
// tsdbInitDFileWithOld(&oSet, pOldSet);
// pReadSet = &oSet;
// }
// } else {
// ASSERT(fid != TSDB_IVLD_FID);
// // Create a new file group
// tsdbInitDFileSet(&nSet, REPO_ID(pRepo), fid, 0 /*TODO*/, tsdbGetFidLevel(fid, &(pch->rtn)), TFS_UNDECIDED_ID);
// tsdbOpenDFileSet(&nSet, O_WRONLY | O_CREAT);
// tsdbUpdateDFileSetHeader(&nSet);
// }
#if 0
STsdbCfg * pCfg = &(pRepo->config);
SMemTable *pMem = pRepo->imem;
TSKEY minKey, maxKey;
SDFileSet oldSet = {0};
SDFileSet newSet = {0};
int level;
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
level = tsdbGetFidLevel(fid, &(pch->rtn));
if (pOldSet) { // fset exists, check if the file shold be removed or upgrade tier level
if (level < 0) { // if out of data, remove it and ignore expired memory data
tsdbRemoveExpiredDFileSet(pRepo, fid);
tsdbSeekCommitIter(pch->iters, pMem->maxTables, maxKey + 1);
return 0;
}
// Move the data file set to correct level
tsdbMoveDFileSet(pOldSet, level);
} else { // fset not exist, create the fset
pOldSet = &oldSet;
if (tsdbCreateDFileSet(fid, level, pOldSet) < 0) {
// TODO
return -1;
}
{
// TODO: set rSet and wSet, the read file set and write file set
}
if (tsdbHasDataToCommit(pch->iters, pMem->maxTables, minKey, maxKey)) {
if (tsdbSetAndOpenHelperFile(&(pch->whelper), pOldSet, &newSet) < 0) return -1;
if (tsdbLoadCompIdx(&pch->whelper, NULL) < 0) return -1;
for (int tid = 0; tid < pMem->maxTables; tid++) {
SCommitIter *pIter = pch->iters + tid;
if (pIter->pTable == NULL) continue;
if (tsdbSetHelperTable(&(pch->whelper), pIter->pTable, pRepo) < 0) return -1;
TSDB_RLOCK_TABLE(pIter->pTable);
if (pIter->pIter != NULL) { // has data in memory to commit
// TODO
}
TSDB_RUNLOCK_TABLE(pIter->pTable);
if (tsdbMoveLastBlockIfNeccessary() < 0) return -1;
if (fid == TSDB_IVLD_FID) {
// TODO: copy rSet as wSet
} else {
tsdbSetAndOpenCommitFSet(pch, &rSet, &wSet);
if (tsdbWriteCompInfo() < 0) return -1;
for (int i = 0; i < pMem->maxTable; i++) {
tsdbCommitTableData;
/* code */
}
if (tsdbWriteCompIdx() < 0) return -1;
tsdbCloseAndUnSetCommitFSet(pch);
}
tsdbUpdateDFileSet(pRepo, &newSet);
return 0;
#endif
tsdbUpdateDFileSet(pRepo, &wSet);
}
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
......@@ -425,4 +387,12 @@ static int tsdbGetFidLevel(int fid, SRtn *pRtn) {
} else {
return -1;
}
}
static int tsdbNextCommitFid(SCommitIter *iters, int niters) {
int fid = TSDB_IVLD_FID;
// TODO
return fid;
}
\ No newline at end of file
......@@ -128,6 +128,11 @@ void tsdbInitDFile(SDFile *pDFile, int vid, int fid, int ver, int level, int id,
tfsInitFile(&(pDFile->f), level, id, NULL /*TODO*/);
}
void tsdbInitDFileWithOld(SDFile *pDFile, SDFile *pOldDFile) {
*pDFile = *pOldDFile;
TSDB_FILE_SET_CLOSED(pDFile);
}
int tsdbOpenDFile(SDFile *pDFile, int flags) {
ASSERT(!TSDB_FILE_OPENED(pDFile));
......@@ -172,6 +177,18 @@ int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
return nwrite;
}
int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nread = taosRead(pDFile->fd, buf, nbyte);
if (nread < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return nread;
}
int64_t tsdbTellDFile(SDFile *pDFile) { return tsdbSeekDFile(pDFile, 0, SEEK_CUR); }
int tsdbEncodeDFile(void **buf, SDFile *pDFile) {
......@@ -250,7 +267,7 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) {
return 0;
}
int tsdbMoveDFileSet(SDFileSet *pOldSet, int tolevel, SDFileSet *pNewSet) {
int tsdbMoveDFileSet(SDFileSet *pOldSet, SDFileSet *pNewSet) {
// TODO
return 0;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tchecksum.h"
#include "tsdbMain.h"
int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) {
// TODO
return 0;
}
void tsdbDestroyReadH(SReadH *pReadh) {
// TODO
}
int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet) {
// TODO
return 0;
}
void tsdbCloseAndUnsetFSet(SReadH *pReadh) {
// TODO
}
int tsdbLoadBlockIdx(SReadH *pReadh) {
SDFile * pDFile = TSDB_DFILE_IN_SET(TSDB_READ_FSET(pReadh));
SBlockIdx blkIdx;
if (tsdbSeekDFile(pDFile, pDFile->info.offset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load SBlockIdx part while seek file %s sinces %s", TSDB_READ_REPO_ID(pReadh), ,
tstrerror(terrno));
return -1;
}
int64_t nread = tsdbReadDFile(pDFile, TSDB_READ_BUF(pReadh), pDFile->info.len);
if (nread < 0) {
tsdbError("vgId:%d failed to load SBlockIdx part while seek file %s sinces %s", TSDB_READ_REPO_ID(pReadh), ,
tstrerror(terrno));
return -1;
}
if (nread < pDFile->info.len) {
tsdbError("vgId:%d failed to load SBlockIdx part while seek file %s sinces %s", TSDB_READ_REPO_ID(pReadh), ,
tstrerror(terrno));
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), pDFile->info.len)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
void *ptr = TSDB_READ_BUF(pReadh);
while (POINTER_DISTANCE(ptr, TSDB_READ_BUF(pReadh)) < (pDFile->info.len - sizeof(TSCKSUM))) {
ptr = tsdbDecodeSBlockIdx(ptr, &blkIdx);
if (taosArrayPush(pReadh->aBlcIdx, (void *)(&blkIdx)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
return 0;
}
int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
if (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (tdInitDataCols(pReadh->pDCols[1], pSchema) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
size_t size = taosArrayGetSize(pReadh->aBlkIdx);
if (size > 0) {
while (true) {
if (pReadh->cidx >= size) {
pReadh->pBlockIdx = NULL;
break;
}
SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx);
if (pBlkIdx->tid == TABLE_TID(pTable)) {
if (pBlkIdx->uid == TABLE_UID(pTable)) {
pReadh->pBlockIdx = pBlkIdx;
} else {
pReadh->pBlockIdx = NULL;
}
pReadh->cidx++;
break;
} else if (pBlkIdx->tid > TABLE_TID(pTable)) {
pReadh->pBlockIdx = NULL;
break;
} else {
pReadh->cidx++;
}
}
} else {
pReadh->pBlockIdx = NULL;
}
return 0;
}
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
// TODO
return 0;
}
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pInfo) {
// TODO
return 0;
}
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pInfo, int16_t *colIds, int numOfColsIds) {
// TODO
return 0;
}
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
// TODO
return 0;
}
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) {
int tlen = 0;
tlen += taosEncodeVariantI32(buf, pIdx->tid);
tlen += taosEncodeVariantU32(buf, pIdx->len);
tlen += taosEncodeVariantU32(buf, pIdx->offset);
tlen += taosEncodeFixedU8(buf, pIdx->hasLast);
tlen += taosEncodeVariantU32(buf, pIdx->numOfBlocks);
tlen += taosEncodeFixedU64(buf, pIdx->uid);
tlen += taosEncodeFixedU64(buf, pIdx->maxKey);
return tlen;
}
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
uint8_t hasLast = 0;
uint32_t numOfBlocks = 0;
uint64_t value = 0;
if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL;
if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL;
if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL;
if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL;
pIdx->hasLast = hasLast;
if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL;
pIdx->numOfBlocks = numOfBlocks;
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
pIdx->uid = (int64_t)value;
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
pIdx->maxKey = (TSKEY)value;
return buf;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册