提交 44258669 编写于 作者: H Hongze Cheng

more code

上级 2cf70ab0
......@@ -65,7 +65,7 @@ struct STFile {
struct {
int32_t level;
int32_t nseg;
} stt;
} stt[1];
};
};
......
......@@ -64,7 +64,7 @@ typedef struct SSttFileWriterConfig SSttFileWriterConfig;
int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer);
int32_t tsdbSttFWriterClose(SSttFileWriter **writer, int8_t abort, STFileOp *op);
int32_t tsdbSttFWriteTSData(SSttFileWriter *writer, SRowInfo *pRowInfo);
int32_t tsdbSttFWriteTSData(SSttFileWriter *writer, SRowInfo *row);
int32_t tsdbSttFWriteTSDataBlock(SSttFileWriter *writer, SBlockData *pBlockData);
int32_t tsdbSttFWriteDLData(SSttFileWriter *writer, TABLEID *tbid, SDelData *pDelData);
......
......@@ -23,56 +23,99 @@ extern "C" {
#endif
// SDelBlock ----------
typedef struct SDelBlock SDelBlock;
typedef struct SDelBlk SDelBlk;
int32_t tDelBlockCreate(SDelBlock *pDelBlock, int32_t capacity);
int32_t tDelBlockDestroy(SDelBlock *pDelBlock);
int32_t tDelBlockClear(SDelBlock *pDelBlock);
int32_t tDelBlockAppend(SDelBlock *pDelBlock, const TABLEID *tbid, const SDelData *pDelData);
typedef union {
int64_t aData[5];
struct {
int64_t suid;
int64_t uid;
int64_t version;
int64_t skey;
int64_t ekey;
};
} SDelRecord;
// STbStatisBlock ----------
typedef struct STbStatisBlock STbStatisBlock;
typedef struct STbStatisBlk STbStatisBlk;
int32_t tTbStatisBlockCreate(STbStatisBlock *pTbStatisBlock, int32_t capacity);
int32_t tTbStatisBlockDestroy(STbStatisBlock *pTbStatisBlock);
int32_t tTbStatisBlockClear(STbStatisBlock *pTbStatisBlock);
// other apis
int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb);
int32_t tsdbUpdateSkmRow(STsdb *pTsdb, const TABLEID *tbid, int32_t sver, SSkmInfo *pSkmRow);
/* Exposed Structs */
// <suid, uid, version, skey, ekey>
struct SDelBlock {
int32_t capacity;
int32_t nRow;
int64_t *aData[5]; // [suid, uid, version, skey, ekey
};
typedef union {
TARRAY2(int64_t) aData[5];
struct {
TARRAY2(int64_t) aSuid[1];
TARRAY2(int64_t) aUid[1];
TARRAY2(int64_t) aVer[1];
TARRAY2(int64_t) aSkey[1];
TARRAY2(int64_t) aEkey[1];
};
} SDelBlock;
struct SDelBlk {
typedef struct SDelBlk {
int32_t nRow;
TABLEID minTid;
TABLEID maxTid;
int64_t minVer;
int64_t maxVer;
SFDataPtr dp;
};
struct STbStatisBlock {
int32_t capacity;
int32_t nRow;
int64_t *aData[7]; // [suid, uid, skey, sver, ekey, ever, count]
};
struct STbStatisBlk {
int32_t nRow;
} SDelBlk;
#define DEL_BLOCK_SIZE(db) TARRAY2_SIZE((db)->aSuid)
int32_t tDelBlockInit(SDelBlock *delBlock);
int32_t tDelBlockFree(SDelBlock *delBlock);
int32_t tDelBlockClear(SDelBlock *delBlock);
int32_t tDelBlockPut(SDelBlock *delBlock, const SDelRecord *delRecord);
int32_t tDelBlockEncode(SDelBlock *delBlock, void *buf, int32_t size);
int32_t tDelBlockDecode(const void *buf, SDelBlock *delBlock);
// STbStatisBlock ----------
typedef union {
int64_t aData[9];
struct {
int64_t suid;
int64_t uid;
int64_t firstKey;
int64_t firstVer;
int64_t lastKey;
int64_t lastVer;
int64_t minVer;
int64_t maxVer;
int64_t count;
};
} STbStatisRecord;
typedef union {
TARRAY2(int64_t) aData[9];
struct {
TARRAY2(int64_t) suid[1];
TARRAY2(int64_t) uid[1];
TARRAY2(int64_t) firstKey[1];
TARRAY2(int64_t) firstVer[1];
TARRAY2(int64_t) lastKey[1];
TARRAY2(int64_t) lastVer[1];
TARRAY2(int64_t) minVer[1];
TARRAY2(int64_t) maxVer[1];
TARRAY2(int64_t) aCount[1];
};
} STbStatisBlock;
typedef struct STbStatisBlk {
int32_t numRec;
TABLEID minTid;
TABLEID maxTid;
int64_t minVer;
int64_t maxVer;
SFDataPtr dp;
};
} STbStatisBlk;
#define STATIS_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid)
int32_t tStatisBlockInit(STbStatisBlock *statisBlock);
int32_t tStatisBlockFree(STbStatisBlock *statisBlock);
int32_t tStatisBlockClear(STbStatisBlock *statisBlock);
int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *statisRecord);
int32_t tStatisBlockEncode(STbStatisBlock *statisBlock, void *buf, int32_t size);
int32_t tStatisBlockDecode(const void *buf, STbStatisBlock *statisBlock);
// other apis
int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb);
int32_t tsdbUpdateSkmRow(STsdb *pTsdb, const TABLEID *tbid, int32_t sver, SSkmInfo *pSkmRow);
#ifdef __cplusplus
}
......
......@@ -70,8 +70,8 @@ static int32_t open_writer_with_new_stt(SCommitter *pCommitter) {
config.file.fid = pCommitter->fid;
config.file.cid = pCommitter->eid;
config.file.size = 0;
config.file.stt.level = 0;
config.file.stt.nseg = 0;
config.file.stt->level = 0;
config.file.stt->nseg = 0;
code = tsdbSttFWriterOpen(&config, &pCommitter->pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -126,7 +126,7 @@ static int32_t open_committer_writer(SCommitter *pCommitter) {
ASSERT(TARRAY2_SIZE(&lvl0->farr) > 0);
STFileObj *fobj = TARRAY2_LAST(&lvl0->farr);
if (fobj->f.stt.nseg >= pCommitter->sttTrigger) {
if (fobj->f.stt->nseg >= pCommitter->sttTrigger) {
return open_writer_with_new_stt(pCommitter);
} else {
return open_writer_with_exist_stt(pCommitter, &fobj->f);
......
......@@ -267,9 +267,9 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) {
if (code) return code;
if (fobj->f.type == TSDB_FTYPE_STT) {
SSttLvl *lvl = tsdbTFileSetGetLvl(fset, fobj->f.stt.level);
SSttLvl *lvl = tsdbTFileSetGetLvl(fset, fobj->f.stt->level);
if (!lvl) {
code = tsdbSttLvlInit(fobj->f.stt.level, &lvl);
code = tsdbSttLvlInit(fobj->f.stt->level, &lvl);
if (code) return code;
code = TARRAY2_SORT_INSERT(&fset->lvlArr, lvl, tsdbSttLvlCmprFn);
......@@ -285,7 +285,7 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) {
} else if (op->optype == TSDB_FOP_REMOVE) {
// delete a file
if (op->of.type == TSDB_FTYPE_STT) {
SSttLvl *lvl = tsdbTFileSetGetLvl(fset, op->of.stt.level);
SSttLvl *lvl = tsdbTFileSetGetLvl(fset, op->of.stt->level);
ASSERT(lvl);
STFileObj tfobj = {.f = {.cid = op->of.cid}};
......@@ -305,7 +305,7 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) {
}
} else {
if (op->nf.type == TSDB_FTYPE_STT) {
SSttLvl *lvl = tsdbTFileSetGetLvl(fset, op->of.stt.level);
SSttLvl *lvl = tsdbTFileSetGetLvl(fset, op->of.stt->level);
ASSERT(lvl);
STFileObj tfobj = {.f = {.cid = op->of.cid}}, *tfobjp = &tfobj;
......
......@@ -130,12 +130,12 @@ static int32_t stt_to_json(const STFile *file, cJSON *json) {
if (code) return code;
/* lvl */
if (cJSON_AddNumberToObject(json, "level", file->stt.level) == NULL) {
if (cJSON_AddNumberToObject(json, "level", file->stt->level) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
/* nseg */
if (cJSON_AddNumberToObject(json, "nseg", file->stt.nseg) == NULL) {
if (cJSON_AddNumberToObject(json, "nseg", file->stt->nseg) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -155,7 +155,7 @@ static int32_t stt_from_json(const cJSON *json, STFile *file) {
/* lvl */
item = cJSON_GetObjectItem(json, "level");
if (cJSON_IsNumber(item)) {
file->stt.level = item->valuedouble;
file->stt->level = item->valuedouble;
} else {
return TSDB_CODE_FILE_CORRUPTED;
}
......@@ -163,7 +163,7 @@ static int32_t stt_from_json(const cJSON *json, STFile *file) {
/* nseg */
item = cJSON_GetObjectItem(json, "nseg");
if (cJSON_IsNumber(item)) {
file->stt.nseg = item->valuedouble;
file->stt->nseg = item->valuedouble;
} else {
return TSDB_CODE_FILE_CORRUPTED;
}
......@@ -290,7 +290,7 @@ bool tsdbIsSameTFile(const STFile *f1, const STFile *f2) {
bool tsdbIsTFileChanged(const STFile *f1, const STFile *f2) {
if (f1->size != f2->size) return true;
if (f1->type == TSDB_FTYPE_STT && f1->stt.nseg != f2->stt.nseg) return true;
if (f1->type == TSDB_FTYPE_STT && f1->stt->nseg != f2->stt->nseg) return true;
return false;
}
......
......@@ -189,7 +189,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
}
fobj = TARRAY2_GET(&lvl->farr, 0);
if (fobj->f.stt.nseg < merger->tsdb->pVnode->config.sttTrigger) {
if (fobj->f.stt->nseg < merger->tsdb->pVnode->config.sttTrigger) {
merger->ctx.toData = false;
break;
} else {
......@@ -249,7 +249,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
.fid = fset->fid,
.cid = merger->cid,
.size = 0,
.stt = {.level = merger->ctx.level, .nseg = 0},
.stt = {{.level = merger->ctx.level, .nseg = 0}},
},
};
code = tsdbSttFWriterOpen(&config, &merger->sttWriter);
......@@ -362,7 +362,7 @@ int32_t tsdbMerge(STsdb *tsdb) {
fobj = TARRAY2_GET(&lvl0->farr, 0);
if (fobj->f.stt.nseg >= sttTrigger) {
if (fobj->f.stt->nseg >= sttTrigger) {
code = tsdbMergeFileSet(&merger, fset);
TSDB_CHECK_CODE(code, lino, _exit);
}
......
......@@ -122,7 +122,7 @@ int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **
size = segReader->footer->prevFooter;
}
ASSERT(TARRAY2_SIZE(&reader[0]->segReaderArray) == config->file.stt.nseg);
ASSERT(TARRAY2_SIZE(&reader[0]->segReaderArray) == config->file.stt->nseg);
_exit:
if (code) {
......@@ -250,9 +250,9 @@ struct SSttFileWriter {
SSttFileWriterConfig config[1];
struct {
bool opened;
} ctx;
} ctx[1];
// file
STFile file;
STFile file[1];
// data
TSttBlkArray sttBlkArray[1];
TDelBlkArray delBlkArray[1];
......@@ -262,8 +262,8 @@ struct SSttFileWriter {
SDelBlock dData[1];
STbStatisBlock sData[1];
// helper data
SSkmInfo skmTb;
SSkmInfo skmRow;
SSkmInfo skmTb[1];
SSkmInfo skmRow[1];
int32_t aBufSize[5];
uint8_t *aBuf[5];
STsdbFD *fd;
......@@ -292,15 +292,15 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) {
code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->aBuf, writer->aBufSize);
TSDB_CHECK_CODE(code, lino, _exit);
sttBlk->bInfo.offset = writer->file.size;
sttBlk->bInfo.offset = writer->file->size;
sttBlk->bInfo.szKey = writer->aBufSize[2] + writer->aBufSize[3];
sttBlk->bInfo.szBlock = writer->aBufSize[0] + writer->aBufSize[1] + sttBlk->bInfo.szKey;
for (int32_t i = 3; i >= 0; i--) {
if (writer->aBufSize[i]) {
code = tsdbWriteFile(writer->fd, writer->file.size, writer->config->aBuf[i], writer->aBufSize[i]);
code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[i], writer->aBufSize[i]);
TSDB_CHECK_CODE(code, lino, _exit);
writer->file.size += writer->aBufSize[i];
writer->file->size += writer->aBufSize[i];
}
}
tBlockDataClear(writer->bData);
......@@ -317,37 +317,38 @@ _exit:
}
static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
if (writer->sData->nRow == 0) return 0;
if (STATIS_BLOCK_SIZE(writer->sData)) return 0;
int32_t code = 0;
int32_t lino = 0;
STbStatisBlk statisBlk[1];
statisBlk->nRow = writer->sData->nRow;
statisBlk->minTid.suid = writer->sData->aData[0][0];
statisBlk->minTid.uid = writer->sData->aData[1][0];
statisBlk->maxTid.suid = writer->sData->aData[0][writer->sData->nRow - 1];
statisBlk->maxTid.uid = writer->sData->aData[1][writer->sData->nRow - 1];
statisBlk->minVer = statisBlk->maxVer = statisBlk->maxVer = writer->sData->aData[2][0];
for (int32_t iRow = 1; iRow < writer->sData->nRow; iRow++) {
if (statisBlk->minVer > writer->sData->aData[2][iRow]) statisBlk->minVer = writer->sData->aData[2][iRow];
if (statisBlk->maxVer < writer->sData->aData[2][iRow]) statisBlk->maxVer = writer->sData->aData[2][iRow];
}
statisBlk->dp.offset = writer->file.size;
statisBlk->dp.size = 0;
// TODO: add compression here
int64_t tsize = sizeof(int64_t) * writer->sData->nRow;
for (int32_t i = 0; i < ARRAY_SIZE(writer->sData->aData); i++) {
code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)writer->sData->aData[i], tsize);
TSDB_CHECK_CODE(code, lino, _exit);
statisBlk->dp.size += tsize;
writer->file.size += tsize;
}
tTbStatisBlockClear(writer->sData);
STbStatisBlk statisBlk[1] = {{
.numRec = STATIS_BLOCK_SIZE(writer->sData),
.minTid = {.suid = TARRAY2_FIRST(writer->sData->suid), .uid = TARRAY2_FIRST(writer->sData->uid)},
.maxTid = {.suid = TARRAY2_LAST(writer->sData->suid), .uid = TARRAY2_LAST(writer->sData->uid)},
// .minVer = TARRAY2_FIRST(writer->sData->aVer),
// .maxVer = TARRAY2_FIRST(writer->sData->aVer),
}};
// statisBlk->minVer = statisBlk->maxVer = statisBlk->maxVer = writer->sData->aData[2][0];
// for (int32_t iRow = 1; iRow < writer->sData->nRow; iRow++) {
// if (statisBlk->minVer > writer->sData->aData[2][iRow]) statisBlk->minVer = writer->sData->aData[2][iRow];
// if (statisBlk->maxVer < writer->sData->aData[2][iRow]) statisBlk->maxVer = writer->sData->aData[2][iRow];
// }
// statisBlk->dp.offset = writer->file->size;
// statisBlk->dp.size = 0;
// // TODO: add compression here
// int64_t tsize = sizeof(int64_t) * writer->sData->nRow;
// for (int32_t i = 0; i < ARRAY_SIZE(writer->sData->aData); i++) {
// code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->sData->aData[i], tsize);
// TSDB_CHECK_CODE(code, lino, _exit);
// statisBlk->dp.size += tsize;
// writer->file->size += tsize;
// }
tStatisBlockClear(writer->sData);
code = TARRAY2_APPEND_P(writer->statisBlkArray, statisBlk);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -361,6 +362,7 @@ _exit:
}
static int32_t tsdbSttFileDoWriteDelBlock(SSttFileWriter *writer) {
#if 0
if (writer->dData->nRow == 0) return 0;
int32_t code = 0;
......@@ -379,16 +381,16 @@ static int32_t tsdbSttFileDoWriteDelBlock(SSttFileWriter *writer) {
if (delBlk->maxVer < writer->sData->aData[2][iRow]) delBlk->maxVer = writer->sData->aData[2][iRow];
}
delBlk->dp.offset = writer->file.size;
delBlk->dp.offset = writer->file->size;
delBlk->dp.size = 0; // TODO
int64_t tsize = sizeof(int64_t) * writer->dData->nRow;
for (int32_t i = 0; i < ARRAY_SIZE(writer->dData->aData); i++) {
code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)writer->dData->aData[i], tsize);
code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->dData->aData[i], tsize);
TSDB_CHECK_CODE(code, lino, _exit);
delBlk->dp.size += tsize;
writer->file.size += tsize;
writer->file->size += tsize;
}
tDelBlockDestroy(writer->dData);
......@@ -403,21 +405,23 @@ _exit:
// tsdbTrace();
}
return code;
#endif
return 0;
}
static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) {
int32_t code = 0;
int32_t lino;
writer->footer->sttBlkPtr->offset = writer->file.size;
writer->footer->sttBlkPtr->offset = writer->file->size;
writer->footer->sttBlkPtr->size = sizeof(SSttBlk) * TARRAY2_SIZE(writer->sttBlkArray);
if (writer->footer->sttBlkPtr->size) {
code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)TARRAY2_DATA(writer->sttBlkArray),
code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->sttBlkArray),
writer->footer->sttBlkPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
writer->file.size += writer->footer->sttBlkPtr->size;
writer->file->size += writer->footer->sttBlkPtr->size;
}
_exit:
......@@ -432,14 +436,14 @@ static int32_t tsdbSttFileDoWriteStatisBlk(SSttFileWriter *writer) {
int32_t code = 0;
int32_t lino;
writer->footer->statisBlkPtr->offset = writer->file.size;
writer->footer->statisBlkPtr->offset = writer->file->size;
writer->footer->statisBlkPtr->size = sizeof(STbStatisBlock) * TARRAY2_SIZE(writer->statisBlkArray);
if (writer->footer->statisBlkPtr->size) {
code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)TARRAY2_DATA(writer->statisBlkArray),
code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->statisBlkArray),
writer->footer->statisBlkPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
writer->file.size += writer->footer->statisBlkPtr->size;
writer->file->size += writer->footer->statisBlkPtr->size;
}
_exit:
......@@ -454,14 +458,14 @@ static int32_t tsdbSttFileDoWriteDelBlk(SSttFileWriter *writer) {
int32_t code = 0;
int32_t lino;
writer->footer->delBlkPtr->offset = writer->file.size;
writer->footer->delBlkPtr->offset = writer->file->size;
writer->footer->delBlkPtr->size = sizeof(SDelBlk) * TARRAY2_SIZE(writer->delBlkArray);
if (writer->footer->delBlkPtr->size) {
code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)TARRAY2_DATA(writer->delBlkArray),
code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->delBlkArray),
writer->footer->delBlkPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
writer->file.size += writer->footer->delBlkPtr->size;
writer->file->size += writer->footer->delBlkPtr->size;
}
_exit:
......@@ -473,8 +477,9 @@ _exit:
}
static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) {
int32_t code = tsdbWriteFile(writer->fd, writer->file.size, (const uint8_t *)&writer->footer, sizeof(writer->footer));
writer->file.size += sizeof(writer->footer);
int32_t code =
tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)&writer->footer, sizeof(writer->footer));
writer->file->size += sizeof(writer->footer);
return code;
}
......@@ -484,39 +489,39 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
int32_t vid = TD_VID(writer->config->tsdb->pVnode);
// set
writer->file = writer->config->file;
writer->file.stt.nseg++;
if (!writer->config->skmTb) writer->config->skmTb = &writer->skmTb;
if (!writer->config->skmRow) writer->config->skmRow = &writer->skmRow;
writer->file[0] = writer->config->file;
writer->file->stt->nseg++;
if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb;
if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow;
if (!writer->config->aBuf) writer->config->aBuf = writer->aBuf;
// open file
int32_t flag;
char fname[TSDB_FILENAME_LEN];
if (writer->file.size) {
if (writer->file->size) {
flag = TD_FILE_READ | TD_FILE_WRITE;
} else {
flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
}
tsdbTFileName(writer->config->tsdb, &writer->file, fname);
tsdbTFileName(writer->config->tsdb, writer->file, fname);
code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd);
TSDB_CHECK_CODE(code, lino, _exit);
if (!writer->file.size) {
if (!writer->file->size) {
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
code = tsdbWriteFile(writer->fd, 0, hdr, sizeof(hdr));
TSDB_CHECK_CODE(code, lino, _exit);
writer->file.size += sizeof(hdr);
writer->file->size += sizeof(hdr);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
} else {
writer->ctx.opened = true;
writer->ctx->opened = true;
}
return 0;
}
......@@ -572,11 +577,11 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, STFileOp *op) {
tsdbCloseFile(&writer->fd);
ASSERT(writer->config->file.size > writer->file.size);
ASSERT(writer->config->file.size > writer->file->size);
op->optype = writer->config->file.size ? TSDB_FOP_MODIFY : TSDB_FOP_CREATE;
op->fid = writer->config->file.fid;
op->of = writer->config->file;
op->nf = writer->file;
op->nf = writer->file[0];
_exit:
if (code) {
......@@ -590,8 +595,8 @@ static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) {
tsdbTFileName(writer->config->tsdb, &writer->config->file, fname);
if (writer->config->file.size) { // truncate the file to the original size
ASSERT(writer->config->file.size <= writer->file.size);
if (writer->config->file.size < writer->file.size) {
ASSERT(writer->config->file.size <= writer->file->size);
if (writer->config->file.size < writer->file->size) {
taosFtruncateFile(writer->fd->pFD, writer->config->file.size);
tsdbCloseFile(&writer->fd);
}
......@@ -604,11 +609,11 @@ static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) {
}
int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **writer) {
writer[0] = taosMemoryMalloc(sizeof(*writer[0]));
writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
writer[0]->config[0] = config[0];
writer[0]->ctx.opened = false;
writer[0]->ctx->opened = false;
return 0;
}
......@@ -617,7 +622,7 @@ int32_t tsdbSttFWriterClose(SSttFileWriter **writer, int8_t abort, STFileOp *op)
int32_t lino = 0;
int32_t vid = TD_VID(writer[0]->config->tsdb->pVnode);
if (!writer[0]->ctx.opened) {
if (!writer[0]->ctx->opened) {
op->optype = TSDB_FOP_NONE;
} else {
if (abort) {
......@@ -643,65 +648,65 @@ int32_t tsdbSttFWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
int32_t code = 0;
int32_t lino = 0;
if (!writer->ctx.opened) {
if (!writer->ctx->opened) {
code = tsdbSttFWriterDoOpen(writer);
TSDB_CHECK_CODE(code, lino, _exit);
}
TABLEID *tbid = (TABLEID *)row;
TSDBROW *pRow = &row->row;
TSDBKEY key = TSDBROW_KEY(pRow);
if (!TABLE_SAME_SCHEMA(writer->bData[0].suid, writer->bData[0].uid, tbid->suid, tbid->uid)) {
if (writer->bData[0].nRow > 0) {
code = tsdbSttFileDoWriteTSDataBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (!TABLE_SAME_SCHEMA(writer->bData->suid, writer->bData->uid, row->suid, row->uid)) {
code = tsdbSttFileDoWriteTSDataBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit);
if (writer->sData[0].nRow >= writer->config->maxRow) {
if (STATIS_BLOCK_SIZE(writer->sData) >= writer->config->maxRow) {
code = tsdbSttFileDoWriteStatisBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit);
}
writer->sData[0].aData[0][writer->sData[0].nRow] = tbid->suid; // suid
writer->sData[0].aData[1][writer->sData[0].nRow] = tbid->uid; // uid
writer->sData[0].aData[2][writer->sData[0].nRow] = key.ts; // skey
writer->sData[0].aData[3][writer->sData[0].nRow] = key.version; // sver
writer->sData[0].aData[4][writer->sData[0].nRow] = key.ts; // ekey
writer->sData[0].aData[5][writer->sData[0].nRow] = key.version; // ever
writer->sData[0].aData[6][writer->sData[0].nRow] = 1; // count
writer->sData[0].nRow++;
STbStatisRecord record[1] = {{
.suid = row->suid,
.uid = row->uid,
.firstKey = key.ts,
.firstVer = key.version,
.lastKey = key.ts,
.lastVer = key.version,
.minVer = key.version,
.maxVer = key.version,
.count = 1,
}};
code = tStatisBlockPut(writer->sData, record);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbUpdateSkmTb(writer->config->tsdb, tbid, writer->config->skmTb);
code = tsdbUpdateSkmTb(writer->config->tsdb, (TABLEID *)row, writer->config->skmTb);
TSDB_CHECK_CODE(code, lino, _exit);
TABLEID id = {
.suid = tbid->suid,
.uid = tbid->uid ? 0 : tbid->uid,
};
code = tBlockDataInit(&writer->bData[0], &id, writer->config->skmTb->pTSchema, NULL, 0);
TABLEID id = {.suid = row->suid, .uid = row->suid ? 0 : row->uid};
code = tBlockDataInit(writer->bData, &id, writer->config->skmTb->pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (row->row.type == TSDBROW_ROW_FMT) {
code = tsdbUpdateSkmRow(writer->config->tsdb, tbid, TSDBROW_SVERSION(pRow), writer->config->skmRow);
code = tsdbUpdateSkmRow(writer->config->tsdb, (TABLEID *)row, TSDBROW_SVERSION(pRow), writer->config->skmRow);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataAppendRow(&writer->bData[0], pRow, writer->config->skmRow->pTSchema, tbid->uid);
code = tBlockDataAppendRow(writer->bData, pRow, writer->config->skmRow->pTSchema, row->uid);
TSDB_CHECK_CODE(code, lino, _exit);
if (writer->bData[0].nRow >= writer->config->maxRow) {
if (writer->bData->nRow >= writer->config->maxRow) {
code = tsdbSttFileDoWriteTSDataBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (key.ts > writer->sData[0].aData[4][writer->sData[0].nRow - 1]) {
writer->sData[0].aData[4][writer->sData[0].nRow - 1] = key.ts; // ekey
writer->sData[0].aData[5][writer->sData[0].nRow - 1] = key.version; // ever
writer->sData[0].aData[6][writer->sData[0].nRow - 1]++; // count
} else if (key.ts == writer->sData[0].aData[4][writer->sData[0].nRow - 1]) {
writer->sData[0].aData[4][writer->sData[0].nRow - 1] = key.ts; // ekey
writer->sData[0].aData[5][writer->sData[0].nRow - 1] = key.version; // ever
TARRAY2_LAST(writer->sData->minVer) = TMIN(TARRAY2_LAST(writer->sData->minVer), key.version);
TARRAY2_LAST(writer->sData->maxVer) = TMAX(TARRAY2_LAST(writer->sData->maxVer), key.version);
if (key.ts > TARRAY2_LAST(writer->sData->lastKey)) {
TARRAY2_LAST(writer->sData->lastKey) = key.ts;
TARRAY2_LAST(writer->sData->lastVer) = key.version;
TARRAY2_LAST(writer->sData->aCount)++;
} else if (key.ts == TARRAY2_LAST(writer->sData->lastKey)) {
TARRAY2_LAST(writer->sData->lastVer) = key.version;
} else {
ASSERTS(0, "timestamp should be in ascending order");
}
......@@ -740,21 +745,22 @@ int32_t tsdbSttFWriteDLData(SSttFileWriter *writer, TABLEID *tbid, SDelData *pDe
ASSERTS(0, "TODO: Not implemented yet");
int32_t code;
if (!writer->ctx.opened) {
if (!writer->ctx->opened) {
code = tsdbSttFWriterDoOpen(writer);
return code;
}
writer->dData[0].aData[0][writer->dData[0].nRow] = tbid->suid; // suid
writer->dData[0].aData[1][writer->dData[0].nRow] = tbid->uid; // uid
writer->dData[0].aData[2][writer->dData[0].nRow] = pDelData->version; // version
writer->dData[0].aData[3][writer->dData[0].nRow] = pDelData->sKey; // skey
writer->dData[0].aData[4][writer->dData[0].nRow] = pDelData->eKey; // ekey
writer->dData[0].nRow++;
// writer->dData[0].aData[0][writer->dData[0].nRow] = tbid->suid; // suid
// writer->dData[0].aData[1][writer->dData[0].nRow] = tbid->uid; // uid
// writer->dData[0].aData[2][writer->dData[0].nRow] = pDelData->version; // version
// writer->dData[0].aData[3][writer->dData[0].nRow] = pDelData->sKey; // skey
// writer->dData[0].aData[4][writer->dData[0].nRow] = pDelData->eKey; // ekey
// writer->dData[0].nRow++;
if (writer->dData[0].nRow >= writer->config->maxRow) {
return tsdbSttFileDoWriteDelBlock(writer);
} else {
return 0;
}
// if (writer->dData[0].nRow >= writer->config->maxRow) {
// return tsdbSttFileDoWriteDelBlock(writer);
// } else {
// return 0;
// }
return 0;
}
\ No newline at end of file
......@@ -16,67 +16,82 @@
#include "dev.h"
// SDelBlock ----------
int32_t tDelBlockCreate(SDelBlock *pDelBlock, int32_t capacity) {
int32_t code;
memset(pDelBlock, 0, sizeof(*pDelBlock));
pDelBlock->capacity = capacity;
for (int32_t i = 0; i < ARRAY_SIZE(pDelBlock->aData); ++i) {
if ((code = tRealloc((uint8_t **)&pDelBlock->aData[i], sizeof(int64_t) * capacity))) {
for (i--; i >= 0; --i) tFree(pDelBlock->aData[i]);
return code;
}
int32_t tDelBlockInit(SDelBlock *delBlock) {
for (int32_t i = 0; i < ARRAY_SIZE(delBlock->aData); ++i) {
TARRAY2_INIT(&delBlock->aData[i]);
}
return 0;
}
int32_t tDelBlockFree(SDelBlock *delBlock) {
for (int32_t i = 0; i < ARRAY_SIZE(delBlock->aData); ++i) {
TARRAY2_FREE(&delBlock->aData[i]);
}
return 0;
}
int32_t tDelBlockClear(SDelBlock *delBlock) {
for (int32_t i = 0; i < ARRAY_SIZE(delBlock->aData); ++i) {
TARRAY2_CLEAR(&delBlock->aData[i], NULL);
}
return 0;
}
int32_t tDelBlockDestroy(SDelBlock *pDelBlock) {
for (int32_t i = 0; i < ARRAY_SIZE(pDelBlock->aData); ++i) {
tFree(pDelBlock->aData[i]);
int32_t tDelBlockPut(SDelBlock *delBlock, const SDelRecord *delRecord) {
for (int32_t i = 0; i < ARRAY_SIZE(delBlock->aData); ++i) {
int32_t code = TARRAY2_APPEND(&delBlock->aData[i], delRecord->aData[i]);
if (code) return code;
}
return 0;
}
int32_t tDelBlockClear(SDelBlock *pDelBlock) {
pDelBlock->nRow = 0;
int32_t tDelBlockEncode(SDelBlock *delBlock, void *buf, int32_t size) {
// TODO
return 0;
}
int32_t tDelBlockAppend(SDelBlock *pDelBlock, const TABLEID *tbid, const SDelData *pDelData) {
ASSERT(pDelBlock->nRow < pDelBlock->capacity);
pDelBlock->aData[0][pDelBlock->nRow] = tbid->suid;
pDelBlock->aData[1][pDelBlock->nRow] = tbid->uid;
pDelBlock->aData[2][pDelBlock->nRow] = pDelData->version;
pDelBlock->aData[3][pDelBlock->nRow] = pDelData->sKey;
pDelBlock->aData[4][pDelBlock->nRow] = pDelData->eKey;
pDelBlock->nRow++;
int32_t tDelBlockDecode(const void *buf, SDelBlock *delBlock) {
// TODO
return 0;
}
// STbStatisBlock ----------
int32_t tStatisBlockInit(STbStatisBlock *statisBlock) {
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->aData); ++i) {
TARRAY2_INIT(&statisBlock->aData[i]);
}
return 0;
}
int32_t tTbStatisBlockCreate(STbStatisBlock *pTbStatisBlock, int32_t capacity) {
memset(pTbStatisBlock, 0, sizeof(*pTbStatisBlock));
pTbStatisBlock->capacity = capacity;
for (int32_t i = 0; i < ARRAY_SIZE(pTbStatisBlock->aData); ++i) {
if (tRealloc((uint8_t **)&pTbStatisBlock->aData[i], sizeof(int64_t) * capacity)) {
for (i--; i >= 0; --i) tFree(pTbStatisBlock->aData[i]);
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t tStatisBlockFree(STbStatisBlock *statisBlock) {
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->aData); ++i) {
TARRAY2_FREE(&statisBlock->aData[i]);
}
return 0;
}
int32_t tTbStatisBlockDestroy(STbStatisBlock *pTbStatisBlock) {
for (int32_t i = 0; i < ARRAY_SIZE(pTbStatisBlock->aData); ++i) {
tFree(pTbStatisBlock->aData[i]);
int32_t tStatisBlockClear(STbStatisBlock *statisBlock) {
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->aData); ++i) {
TARRAY2_CLEAR(&statisBlock->aData[i], NULL);
}
return 0;
}
int32_t tTbStatisBlockClear(STbStatisBlock *pTbStatisBlock) {
pTbStatisBlock->nRow = 0;
int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *statisRecord) {
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->aData); ++i) {
int32_t code = TARRAY2_APPEND(&statisBlock->aData[i], statisRecord->aData[i]);
if (code) return code;
}
return 0;
}
int32_t tStatisBlockEncode(STbStatisBlock *statisBlock, void *buf, int32_t size) {
// TODO
return 0;
}
int32_t tStatisBlockDecode(const void *buf, STbStatisBlock *statisBlock) {
// TODO
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册