提交 6f30871f 编写于 作者: H Hongze Cheng

more code

上级 e0e7cc4f
...@@ -163,6 +163,7 @@ void tBlockDataDestroy(SBlockData *pBlockData); ...@@ -163,6 +163,7 @@ void tBlockDataDestroy(SBlockData *pBlockData);
int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid); int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid);
void tBlockDataReset(SBlockData *pBlockData); void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
int32_t tBlockDataAppendRowEx(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
void tBlockDataClear(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData);
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData);
int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[], int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[],
......
...@@ -55,6 +55,7 @@ typedef struct STsdbDataIter { ...@@ -55,6 +55,7 @@ typedef struct STsdbDataIter {
typedef struct { typedef struct {
STsdb *pTsdb; STsdb *pTsdb;
int64_t cid; int64_t cid;
int8_t cmprAlg;
int32_t maxRows; int32_t maxRows;
int32_t minRows; int32_t minRows;
STsdbFS fs; STsdbFS fs;
...@@ -72,8 +73,9 @@ typedef struct { ...@@ -72,8 +73,9 @@ typedef struct {
// Writer // Writer
SDataFWriter *pWriter; SDataFWriter *pWriter;
SArray *aBlockIdx; // SArray<SBlockIdx> SArray *aBlockIdx; // SArray<SBlockIdx>
SMapData mDataBlk; // SMapData<SDataBlk> TABLEID tableId;
SArray *aSttBlk; // SArray<SSttBlk> SMapData mDataBlk; // SMapData<SDataBlk>
SArray *aSttBlk; // SArray<SSttBlk>
} STsdbCompactor; } STsdbCompactor;
#define TSDB_FLG_DEEP_COMPACT 0x1 #define TSDB_FLG_DEEP_COMPACT 0x1
...@@ -210,8 +212,23 @@ _exit: ...@@ -210,8 +212,23 @@ _exit:
} }
static void tsdbDataIterClose(STsdbDataIter *pIter) { static void tsdbDataIterClose(STsdbDataIter *pIter) {
// TODO if (pIter == NULL) return;
ASSERT(0);
if (pIter->flag & TSDB_ITER_TYPE_MEM) {
ASSERT(0);
} else if (pIter->flag & TSDB_ITER_TYPE_DAT) {
ASSERT(0);
} else if (pIter->flag & TSDB_ITER_TYPE_STT) {
SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;
tBlockDataDestroy(&pSttDIter->bData);
taosArrayDestroy(pSttDIter->aSttBlk);
tsdbDataFReaderClose(&pSttDIter->pReader);
} else {
ASSERT(0);
}
taosMemoryFree(pIter);
} }
static int32_t tsdbDataIterNext(STsdbDataIter *pIter, TABLEID *pExcludeTableId) { static int32_t tsdbDataIterNext(STsdbDataIter *pIter, TABLEID *pExcludeTableId) {
...@@ -285,7 +302,8 @@ static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) { ...@@ -285,7 +302,8 @@ static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) {
int32_t lino = 0; int32_t lino = 0;
pCompactor->pTsdb = pTsdb; pCompactor->pTsdb = pTsdb;
// pCompactor->cid = 0; (TODO) pCompactor->cid = 0; // TODO
pCompactor->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows; pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows;
pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows; pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows;
...@@ -536,6 +554,7 @@ static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) { ...@@ -536,6 +554,7 @@ static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) {
pCompactor->fid = pCompactor->pDFileSet->fid; pCompactor->fid = pCompactor->pDFileSet->fid;
// reader
code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet); code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -567,9 +586,15 @@ static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) { ...@@ -567,9 +586,15 @@ static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) {
pCompactor->pIter = NULL; pCompactor->pIter = NULL;
tBlockDataReset(&pCompactor->bData); tBlockDataReset(&pCompactor->bData);
// open writers // writer
SDFileSet fSet = {0}; // TODO SDFileSet wSet = {.diskId = (SDiskID){0}, // TODO
code = tsdbDataFWriterOpen(&pCompactor->pWriter, pTsdb, NULL); .fid = pCompactor->pDFileSet->fid,
.pHeadF = &(SHeadFile){.commitID = pCompactor->cid},
.pDataF = &(SDataFile){.commitID = pCompactor->cid},
.pSmaF = &(SSmaFile){.commitID = pCompactor->cid},
.nSttF = 1,
.aSttF = {&(SSttFile){.commitID = pCompactor->cid}}};
code = tsdbDataFWriterOpen(&pCompactor->pWriter, pTsdb, &wSet);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (pCompactor->aBlockIdx == NULL) { if (pCompactor->aBlockIdx == NULL) {
...@@ -604,19 +629,77 @@ _exit: ...@@ -604,19 +629,77 @@ _exit:
} }
static void tsdbCloseCompactor(STsdbCompactor *pCompactor) { static void tsdbCloseCompactor(STsdbCompactor *pCompactor) {
STsdb *pTsdb = pCompactor->pTsdb;
for (STsdbDataIter *pIter = pCompactor->iterList; pIter;) { for (STsdbDataIter *pIter = pCompactor->iterList; pIter;) {
STsdbDataIter *pIterNext = pIter->next; STsdbDataIter *pIterNext = pIter->next;
tsdbDataIterClose(pIter); tsdbDataIterClose(pIter);
pIter = pIterNext; pIter = pIterNext;
} }
// TODO tDestroyTSchema(pCompactor->tbSkm.pTSchema);
ASSERT(0); pCompactor->tbSkm.pTSchema = NULL;
tsdbDataFReaderClose(&pCompactor->pReader);
}
extern int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg);
extern int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg);
static int32_t tsdbCompactWriteBlockData(STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
SBlockData *pBData = &pCompactor->bData;
if (pBData->nRow == 0) goto _exit;
if (pBData->uid && pBData->nRow >= pCompactor->minRows) { // write to .data file
code = tsdbWriteDataBlock(pCompactor->pWriter, pBData, &pCompactor->mDataBlk, pCompactor->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
pCompactor->tableId.suid = pBData->suid;
pCompactor->tableId.uid = pBData->uid;
} else { // write to .stt file
code = tsdbWriteSttBlock(pCompactor->pWriter, pBData, pCompactor->aSttBlk, pCompactor->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
}
tBlockDataClear(&pCompactor->bData);
_exit: _exit:
tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
tstrerror(code));
}
return code;
}
static int32_t tsdbCompactWriteDataBlk(STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
if (pCompactor->mDataBlk.nItem == 0) goto _exit;
SBlockIdx *pBlockIdx = (SBlockIdx *)taosArrayReserve(pCompactor->aBlockIdx, 1);
if (pBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pBlockIdx->suid = pCompactor->tableId.suid;
pBlockIdx->uid = pCompactor->tableId.uid;
code = tsdbWriteDataBlk(pCompactor->pWriter, &pCompactor->mDataBlk, pBlockIdx);
TSDB_CHECK_CODE(code, lino, _exit);
tMapDataReset(&pCompactor->mDataBlk);
pCompactor->tableId.suid = 0;
pCompactor->tableId.uid = 0;
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino,
tstrerror(code));
}
return code;
} }
int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
...@@ -654,31 +737,29 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { ...@@ -654,31 +737,29 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
NULL, 0); NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
if (pCompactor->bData.suid != pRowInfo->suid) { // not same super table if (pCompactor->bData.suid != pRowInfo->suid) { // different super table
if (pCompactor->bData.nRow < pCompactor->minRows) { code = tsdbCompactWriteBlockData(pCompactor);
// TODO: write block data to .stt file, need to check if nRow is 0 TSDB_CHECK_CODE(code, lino, _exit);
tBlockDataClear(&pCompactor->bData);
} else { code = tsdbCompactWriteDataBlk(pCompactor);
// TODO: write block data to .data file, need to check if nRow is 0 TSDB_CHECK_CODE(code, lino, _exit);
tBlockDataClear(&pCompactor->bData);
}
code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema, code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema,
NULL, 0); NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else if (pCompactor->bData.uid != pRowInfo->uid) { } else if (pCompactor->bData.uid != pRowInfo->uid) { // different table
if (pRowInfo->suid) { // different child table if (pRowInfo->suid) {
if (pCompactor->bData.nRow > pCompactor->minRows) { if (pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows) {
// TODO code = tsdbCompactWriteBlockData(pCompactor);
} TSDB_CHECK_CODE(code, lino, _exit);
} else { // different normal table
if (pCompactor->bData.nRow < pCompactor->minRows) {
// TODO: write data to .stt file, need to check if nRow is 0
tBlockDataClear(&pCompactor->bData);
} else {
// TODO: write data to .data file, need to check if nRow is 0
tBlockDataClear(&pCompactor->bData);
} }
} else {
// different normal table
code = tsdbCompactWriteBlockData(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCompactWriteDataBlk(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid},
pTSchema, NULL, 0); pTSchema, NULL, 0);
...@@ -688,12 +769,13 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { ...@@ -688,12 +769,13 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
} }
// append row to block data // append row to block data
code = tBlockDataAppendRow(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid); code = tBlockDataAppendRowEx(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// check if block data is full // check if block data is full
if (pCompactor->bData.nRow >= pCompactor->maxRows) { if (pCompactor->bData.nRow >= pCompactor->maxRows) {
tBlockDataClear(&pCompactor->bData); code = tsdbCompactWriteBlockData(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
} }
// iterate to next row // iterate to next row
...@@ -701,25 +783,25 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { ...@@ -701,25 +783,25 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (pCompactor->bData.nRow > 0) { code = tsdbCompactWriteBlockData(pCompactor);
// write again TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbCompactWriteDataBlk(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk); code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (pCompactor->mDataBlk.nItem > 0) { code = tsdbUpdateDFileSetHeader(pCompactor->pWriter);
SBlockIdx *pBlockIdx = taosArrayReserve(pCompactor->aBlockIdx, 1); TSDB_CHECK_CODE(code, lino, _exit);
if (pBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbWriteDataBlk(pCompactor->pWriter, &pCompactor->mDataBlk, pBlockIdx); code = tsdbFSUpsertFSet(&pCompactor->fs, &pCompactor->pWriter->wSet);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx); code = tsdbDataFWriterClose(&pCompactor->pWriter, 1);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tsdbCloseCompactor(pCompactor); tsdbCloseCompactor(pCompactor);
......
...@@ -637,7 +637,7 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { ...@@ -637,7 +637,7 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
} }
} else { } else {
ASSERT(0); ASSERT(0);
return NULL; // suppress error report by compiler return NULL; // suppress error report by compiler
} }
} }
...@@ -1070,34 +1070,55 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS ...@@ -1070,34 +1070,55 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
if (pBlockData->uid == 0) { if (pBlockData->uid == 0) {
ASSERT(uid); ASSERT(uid);
code = tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1)); code = tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1));
if (code) goto _err; if (code) goto _exit;
pBlockData->aUid[pBlockData->nRow] = uid; pBlockData->aUid[pBlockData->nRow] = uid;
} }
// version // version
code = tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1)); code = tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1));
if (code) goto _err; if (code) goto _exit;
pBlockData->aVersion[pBlockData->nRow] = TSDBROW_VERSION(pRow); pBlockData->aVersion[pBlockData->nRow] = TSDBROW_VERSION(pRow);
// timestamp // timestamp
code = tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1)); code = tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1));
if (code) goto _err; if (code) goto _exit;
pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow); pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow);
SColVal cv = {0}; SColVal cv = {0};
if (pRow->type == TSDBROW_ROW_FMT) { if (pRow->type == TSDBROW_ROW_FMT) {
code = tRowAppendToColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData); code = tRowAppendToColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData);
if (code) goto _err; if (code) goto _exit;
} else if (pRow->type == TSDBROW_COL_FMT) { } else if (pRow->type == TSDBROW_COL_FMT) {
code = tBlockDataAppendBlockRow(pBlockData, pRow->pBlockData, pRow->iRow); code = tBlockDataAppendBlockRow(pBlockData, pRow->pBlockData, pRow->iRow);
if (code) goto _err; if (code) goto _exit;
} else { } else {
ASSERT(0); ASSERT(0);
} }
pBlockData->nRow++; pBlockData->nRow++;
_exit:
return code; return code;
}
_err: int32_t tBlockDataAppendRowEx(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) {
return code; int32_t code = 0;
ASSERT(pBlockData->suid || pBlockData->uid);
if (pBlockData->nRow == 0) {
pBlockData->uid = uid;
} else if (pBlockData->uid && pBlockData->uid != uid) {
ASSERT(pBlockData->suid);
code = tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1));
if (code) return code;
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
pBlockData->aUid[iRow] = pBlockData->uid;
}
pBlockData->uid = 0;
}
return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid);
} }
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData) { void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册