提交 a72633d7 编写于 作者: H Hongze Cheng

more code

上级 47b96101
......@@ -93,7 +93,7 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
}
#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \
if (CODE) { \
if ((CODE)) { \
LINO = __LINE__; \
goto LABEL; \
}
......
......@@ -224,7 +224,7 @@ int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward,
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter);
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum);
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj *pTableMap, int64_t *rowsNum);
// STbData
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
......
......@@ -27,20 +27,23 @@ typedef struct {
int8_t sttTrigger;
SArray *aTbDataP;
// context
TSKEY nextKey;
int32_t fid;
int32_t expLevel;
TSKEY minKey;
TSKEY maxKey;
TSKEY nextKey;
int32_t fid;
int32_t expLevel;
TSKEY minKey;
TSKEY maxKey;
struct SFileSet *pFileSet;
// writer
SArray *aFileOp;
struct SSttFWriter *pWriter;
} SCommitter;
static int32_t open_committer_writer(SCommitter *pCommitter) {
int32_t code;
int32_t code = 0;
int32_t lino;
STsdb *pTsdb = pCommitter->pTsdb;
struct SSttFWriterConf conf = {
.pTsdb = pCommitter->pTsdb,
.maxRow = pCommitter->maxRow,
......@@ -51,20 +54,21 @@ static int32_t open_committer_writer(SCommitter *pCommitter) {
.aBuf = NULL,
};
// pCommitter->pTsdb->pFS = NULL;
// taosbsearch(pCommitter->pTsdb->pFS->aFileSet, &pCommitter->fid, tsdbCompareFid, &lino);
struct SFileSet *pSet = NULL;
if (pSet == NULL) {
conf.file = (struct STFile){
.cid = 1,
.fid = pCommitter->fid,
.diskId = (SDiskID){0},
.type = TSDB_FTYPE_STT,
};
tsdbTFileInit(pCommitter->pTsdb, &conf.file);
if (pCommitter->pFileSet) {
ASSERTS(0, "TODO: Not implemented yet");
} else {
// TODO
ASSERT(0);
conf.file.type = TSDB_FTYPE_STT;
if (tfsAllocDisk(pTsdb->pVnode->pTfs, pCommitter->expLevel, &conf.file.diskId) < 0) {
code = TSDB_CODE_FS_NO_VALID_DISK;
TSDB_CHECK_CODE(code, lino, _exit);
}
conf.file.size = 0;
conf.file.cid = 1;
conf.file.fid = pCommitter->fid;
tsdbTFileInit(pTsdb, &conf.file);
}
code = tsdbSttFWriterOpen(&conf, &pCommitter->pWriter);
......@@ -72,8 +76,13 @@ static int32_t open_committer_writer(SCommitter *pCommitter) {
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
tstrerror(code), pCommitter->fid);
tsdbError( //
"vgId:%d %s failed at line %d since %s, fid:%d", //
TD_VID(pCommitter->pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code), //
pCommitter->fid);
}
return code;
}
......@@ -92,10 +101,19 @@ static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDB
_exit:
if (code) {
tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
tsdbError( //
"vgId:%d failed at line %d since %s", //
TD_VID(pCommitter->pTsdb->pVnode), //
lino, //
tstrerror(code));
} else {
tsdbTrace("vgId:%d %s done, fid:%d suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64,
TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, tbid->suid, tbid->uid, TSDBROW_KEY(pRow).ts,
tsdbTrace("vgId:%d %s done, fid:%d suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64, //
TD_VID(pCommitter->pTsdb->pVnode), //
__func__, //
pCommitter->fid, //
tbid->suid, //
tbid->uid, //
TSDBROW_KEY(pRow).ts, //
TSDBROW_KEY(pRow).version);
}
return 0;
......@@ -206,6 +224,8 @@ static int32_t start_commit_file_set(SCommitter *pCommitter) {
pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec());
pCommitter->nextKey = TSKEY_MAX;
pCommitter->pFileSet = NULL; // TODO: need to search the file system
tsdbDebug( //
"vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", //
TD_VID(pCommitter->pTsdb->pVnode), //
......@@ -219,13 +239,24 @@ static int32_t start_commit_file_set(SCommitter *pCommitter) {
static int32_t end_commit_file_set(SCommitter *pCommitter) {
int32_t code = 0;
int32_t lino = 0;
int32_t lino;
// TODO
if (pCommitter->pWriter == NULL) return 0;
struct SFileOp *pFileOp = taosArrayReserve(pCommitter->aFileOp, 1);
if (pFileOp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbSttFWriterClose(&pCommitter->pWriter, 0, pFileOp);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
} else {
tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid);
}
return code;
}
......
......@@ -34,8 +34,8 @@ typedef enum {
struct SFileOp {
tsdb_fop_t op;
struct STFile oFile; // old file state
struct STFile nFile; // new file state
struct STFile oState; // old file state
struct STFile nState; // new file state
};
struct SFileSet {
......
......@@ -29,15 +29,17 @@ typedef struct {
struct SSttFWriter {
struct SSttFWriterConf config;
// file
struct STFile tFile;
// data
SFSttFooter footer;
SBlockData bData;
SDelBlock dData;
STbStatisBlock sData;
SArray *aSttBlk; // SArray<SSttBlk>
SArray *aDelBlk; // SArray<SDelBlk>
SArray *aStatisBlk; // SArray<STbStatisBlk>
void *bloomFilter;
SFSttFooter footer;
SArray *aSttBlk; // SArray<SSttBlk>
SArray *aDelBlk; // SArray<SDelBlk>
SArray *aStatisBlk; // SArray<STbStatisBlk>
void *bloomFilter; // TODO
// helper data
SSkmInfo skmTb;
SSkmInfo skmRow;
......@@ -71,20 +73,33 @@ static int32_t write_timeseries_block(struct SSttFWriter *pWriter) {
}
// compress data block
code = tCmprBlockData(pBData, pWriter->config.cmprAlg, NULL, NULL, pWriter->config.aBuf, pWriter->aBufSize);
TSDB_CHECK_CODE(code, lino, _exit);
pSttBlk->bInfo.offset = pWriter->config.file.size;
TSDB_CHECK_CODE( //
code = tCmprBlockData( //
pBData, //
pWriter->config.cmprAlg, //
NULL, //
NULL, //
pWriter->config.aBuf, //
pWriter->aBufSize), //
lino, //
_exit);
pSttBlk->bInfo.offset = pWriter->tFile.size;
pSttBlk->bInfo.szKey = pWriter->aBufSize[2] + pWriter->aBufSize[3];
pSttBlk->bInfo.szBlock = pWriter->aBufSize[0] + pWriter->aBufSize[1] + pSttBlk->bInfo.szKey;
for (int32_t iBuf = 3; iBuf >= 0; iBuf--) {
if (pWriter->aBufSize[iBuf]) {
code =
tsdbWriteFile(pWriter->pFd, pWriter->config.file.size, pWriter->config.aBuf[iBuf], pWriter->aBufSize[iBuf]);
TSDB_CHECK_CODE(code, lino, _exit);
pWriter->config.file.size += pWriter->aBufSize[iBuf];
TSDB_CHECK_CODE( //
code = tsdbWriteFile( //
pWriter->pFd, //
pWriter->tFile.size, //
pWriter->config.aBuf[iBuf], //
pWriter->aBufSize[iBuf]), //
lino, //
_exit);
pWriter->tFile.size += pWriter->aBufSize[iBuf];
}
}
......@@ -92,8 +107,12 @@ static int32_t write_timeseries_block(struct SSttFWriter *pWriter) {
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino,
tstrerror(code));
tsdbError( //
"vgId:%d %s failed at line %d since %s", //
TD_VID(pWriter->config.pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
} else {
// tsdbTrace();
}
......@@ -106,8 +125,7 @@ static int32_t write_statistics_block(struct SSttFWriter *pWriter) {
STbStatisBlk *pStatisBlk = (STbStatisBlk *)taosArrayReserve(pWriter->aStatisBlk, 1);
if (pStatisBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
}
pStatisBlk->nRow = pWriter->sData.nRow;
......@@ -121,24 +139,35 @@ static int32_t write_statistics_block(struct SSttFWriter *pWriter) {
if (pStatisBlk->maxVer < pWriter->sData.aData[2][iRow]) pStatisBlk->maxVer = pWriter->sData.aData[2][iRow];
}
pStatisBlk->dp.offset = pWriter->config.file.size;
pStatisBlk->dp.size = 0; // TODO
pStatisBlk->dp.offset = pWriter->tFile.size;
pStatisBlk->dp.size = 0;
// TODO: add compression here
int64_t tsize = sizeof(int64_t) * pWriter->sData.nRow;
for (int32_t i = 0; i < ARRAY_SIZE(pWriter->sData.aData); i++) {
code = tsdbWriteFile(pWriter->pFd, pWriter->config.file.size, (const uint8_t *)pWriter->sData.aData[i], tsize);
TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE( //
code = tsdbWriteFile( //
pWriter->pFd, //
pWriter->tFile.size, //
(const uint8_t *)pWriter->sData.aData[i], //
tsize), //
lino, //
_exit);
pStatisBlk->dp.size += tsize;
pWriter->config.file.size += tsize;
pWriter->tFile.size += tsize;
}
tTbStatisBlockClear(&pWriter->sData);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino,
tstrerror(code));
tsdbError( //
"vgId:%d %s failed at line %d since %s", //
TD_VID(pWriter->config.pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
} else {
// tsdbTrace();
}
......@@ -274,12 +303,27 @@ static int32_t write_file_header(struct SSttFWriter *pWriter) {
static int32_t create_stt_fwriter(const struct SSttFWriterConf *pConf, struct SSttFWriter **ppWriter) {
int32_t code = 0;
if ((ppWriter[0] = taosMemoryCalloc(1, sizeof(*ppWriter[0]))) == NULL) {
// alloc
if (((ppWriter[0] = taosMemoryCalloc(1, sizeof(*ppWriter[0]))) == NULL) //
|| ((ppWriter[0]->aSttBlk = taosArrayInit(64, sizeof(SSttBlk))) == NULL) //
|| ((ppWriter[0]->aDelBlk = taosArrayInit(64, sizeof(SDelBlk))) == NULL) //
|| ((ppWriter[0]->aStatisBlk = taosArrayInit(64, sizeof(STbStatisBlock))) == NULL) //
) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
if ((code = tBlockDataCreate(&ppWriter[0]->bData)) //
|| (code = tDelBlockCreate(&ppWriter[0]->dData, pConf->maxRow)) //
|| (code = tTbStatisBlockCreate(&ppWriter[0]->sData, pConf->maxRow)) //
) {
goto _exit;
}
// init
ppWriter[0]->config = pConf[0];
ppWriter[0]->tFile = pConf->file;
ppWriter[0]->footer.prevFooter = ppWriter[0]->tFile.size;
if (pConf->pSkmTb == NULL) {
ppWriter[0]->config.pSkmTb = &ppWriter[0]->skmTb;
}
......@@ -290,40 +334,14 @@ static int32_t create_stt_fwriter(const struct SSttFWriterConf *pConf, struct SS
ppWriter[0]->config.aBuf = ppWriter[0]->aBuf;
}
// time-series data block
tBlockDataCreate(&ppWriter[0]->bData);
if ((ppWriter[0]->aSttBlk = taosArrayInit(64, sizeof(SSttBlk))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// deleted data block
if ((code = tDelBlockCreate(&ppWriter[0]->dData, pConf->maxRow))) goto _exit;
if ((ppWriter[0]->aDelBlk = taosArrayInit(64, sizeof(SDelBlk))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// statistics data block
if ((code = tTbStatisBlockCreate(&ppWriter[0]->sData, pConf->maxRow))) goto _exit;
if ((ppWriter[0]->aStatisBlk = taosArrayInit(64, sizeof(STbStatisBlock))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// TODO: bloom filter
_exit:
if (code && ppWriter[0]) {
// statistics data block
taosArrayDestroy(ppWriter[0]->aStatisBlk);
tTbStatisBlockDestroy(&ppWriter[0]->sData);
// deleted data block
taosArrayDestroy(ppWriter[0]->aDelBlk);
tDelBlockDestroy(&ppWriter[0]->dData);
// time-series data block
taosArrayDestroy(ppWriter[0]->aSttBlk);
tBlockDataDestroy(&ppWriter[0]->bData);
taosArrayDestroy(ppWriter[0]->aStatisBlk);
taosArrayDestroy(ppWriter[0]->aDelBlk);
taosArrayDestroy(ppWriter[0]->aSttBlk);
taosMemoryFree(ppWriter[0]);
ppWriter[0] = NULL;
}
......@@ -354,19 +372,48 @@ static int32_t open_stt_fwriter(struct SSttFWriter *pWriter) {
int32_t lino;
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
int32_t flag = TD_FILE_READ | TD_FILE_WRITE;
if (pWriter->tFile.size == 0) {
flag |= TD_FILE_CREATE | TD_FILE_TRUNC;
}
code = tsdbOpenFile(pWriter->config.file.fname, pWriter->config.szPage, flag, &pWriter->pFd);
code = tsdbOpenFile( //
pWriter->config.file.fname, //
pWriter->config.szPage, //
flag, //
&pWriter->pFd);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteFile(pWriter->pFd, 0, hdr, sizeof(hdr));
TSDB_CHECK_CODE(code, lino, _exit);
if (pWriter->tFile.size == 0) {
code = tsdbWriteFile( //
pWriter->pFd, //
0, //
hdr, //
sizeof(hdr));
TSDB_CHECK_CODE(code, lino, _exit);
pWriter->tFile.size += sizeof(hdr);
}
_exit:
if (code) {
if (pWriter->pFd) tsdbCloseFile(&pWriter->pFd);
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino,
tstrerror(code));
if (pWriter->pFd) {
tsdbCloseFile(&pWriter->pFd);
}
tsdbError( //
"vgId:%d %s failed at line %d since %s", //
TD_VID(pWriter->config.pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
} else {
tsdbDebug( //
"vgId:%d %s done, fname:%s size:%" PRId64, //
TD_VID(pWriter->config.pTsdb->pVnode), //
__func__, //
pWriter->config.file.fname, //
pWriter->config.file.size //
);
}
return code;
}
......@@ -388,8 +435,15 @@ int32_t tsdbSttFWriterOpen(const struct SSttFWriterConf *pConf, struct SSttFWrit
_exit:
if (code) {
if (ppWriter[0]) destroy_stt_fwriter(ppWriter[0]);
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pConf->pTsdb->pVnode), __func__, lino, tstrerror(code));
if (ppWriter[0]) {
destroy_stt_fwriter(ppWriter[0]);
}
tsdbError( //
"vgId:%d %s failed at line %d since %s", //
TD_VID(pConf->pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
}
return code;
}
......@@ -450,13 +504,17 @@ int32_t tsdbSttFWriteTSData(struct SSttFWriter *pWriter, TABLEID *tbid, TSDBROW
if (!TABLE_SAME_SCHEMA(pWriter->bData.suid, pWriter->bData.uid, tbid->suid, tbid->uid)) {
if (pWriter->bData.nRow > 0) {
code = write_timeseries_block(pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE( //
code = write_timeseries_block(pWriter), //
lino, //
_exit);
}
if (pWriter->sData.nRow >= pWriter->config.maxRow) {
code = write_statistics_block(pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE( //
code = write_statistics_block(pWriter), //
lino, //
_exit);
}
pWriter->sData.aData[0][pWriter->sData.nRow] = tbid->suid; // suid
......@@ -468,25 +526,54 @@ int32_t tsdbSttFWriteTSData(struct SSttFWriter *pWriter, TABLEID *tbid, TSDBROW
pWriter->sData.aData[6][pWriter->sData.nRow] = 1; // count
pWriter->sData.nRow++;
code = tsdbUpdateSkmTb(pWriter->config.pTsdb, tbid, pWriter->config.pSkmTb);
TSDB_CHECK_CODE(code, lino, _exit);
TABLEID id = {.suid = tbid->suid, .uid = tbid->suid ? 0 : tbid->uid};
code = tBlockDataInit(&pWriter->bData, &id, pWriter->config.pSkmTb->pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE( //
code = tsdbUpdateSkmTb( //
pWriter->config.pTsdb, //
tbid, //
pWriter->config.pSkmTb), //
lino, //
_exit);
TABLEID id = {.suid = tbid->suid, //
.uid = tbid->suid //
? 0
: tbid->uid};
TSDB_CHECK_CODE( //
code = tBlockDataInit( //
&pWriter->bData, //
&id, //
pWriter->config.pSkmTb->pTSchema, //
NULL, //
0), //
lino, //
_exit);
}
if (pRow->type == TSDBROW_ROW_FMT) {
code = tsdbUpdateSkmRow(pWriter->config.pTsdb, tbid, TSDBROW_SVERSION(pRow), pWriter->config.pSkmRow);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataAppendRow(&pWriter->bData, pRow, pWriter->config.pSkmRow->pTSchema, tbid->uid);
TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE( //
code = tsdbUpdateSkmRow( //
pWriter->config.pTsdb, //
tbid, //
TSDBROW_SVERSION(pRow), //
pWriter->config.pSkmRow), //
lino, //
_exit);
}
TSDB_CHECK_CODE( //
code = tBlockDataAppendRow( //
&pWriter->bData, //
pRow, //
pWriter->config.pSkmRow->pTSchema, //
tbid->uid), //
lino, //
_exit);
if (pWriter->bData.nRow >= pWriter->config.maxRow) {
code = write_timeseries_block(pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE( //
code = write_timeseries_block(pWriter), //
lino, //
_exit);
}
if (key.ts > pWriter->sData.aData[4][pWriter->sData.nRow - 1]) {
......@@ -502,13 +589,19 @@ int32_t tsdbSttFWriteTSData(struct SSttFWriter *pWriter, TABLEID *tbid, TSDBROW
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino,
tstrerror(code));
tsdbError( //
"vgId:%d %s failed at line %d since %s", //
TD_VID(pWriter->config.pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
}
return code;
}
int32_t tsdbSttFWriteDLData(struct SSttFWriter *pWriter, TABLEID *tbid, SDelData *pDelData) {
ASSERTS(0, "TODO: Not implemented yet");
pWriter->dData.aData[0][pWriter->dData.nRow] = tbid->suid; // suid
pWriter->dData.aData[1][pWriter->dData.nRow] = tbid->uid; // uid
pWriter->dData.aData[2][pWriter->dData.nRow] = pDelData->version; // version
......
......@@ -64,6 +64,7 @@ struct SDelBlk {
int64_t maxVer;
SFDataPtr dp;
};
struct STbStatisBlock {
int32_t capacity;
int32_t nRow;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册