提交 da2150a1 编写于 作者: H Hongze Cheng

more code 1

上级 d1ba9a2c
...@@ -686,6 +686,20 @@ struct SDiskData { ...@@ -686,6 +686,20 @@ struct SDiskData {
SArray *aDiskCol; // SArray<SDiskCol> SArray *aDiskCol; // SArray<SDiskCol>
}; };
struct SDiskDataBuilder {
int64_t suid;
int64_t uid;
int32_t nRow;
uint8_t cmprAlg;
uint8_t calcSma;
SCompressor *pUidC;
SCompressor *pVerC;
SCompressor *pKeyC;
int32_t nBuilder;
SArray *aBuilder; // SArray<SDiskColBuilder>
uint8_t *aBuf[2];
};
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo, const char *idStr); STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo, const char *idStr);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
......
...@@ -70,12 +70,12 @@ typedef struct { ...@@ -70,12 +70,12 @@ typedef struct {
int8_t toLastOnly; int8_t toLastOnly;
}; };
struct { struct {
SDataFWriter *pWriter; SDataFWriter *pWriter;
SArray *aBlockIdx; // SArray<SBlockIdx> SArray *aBlockIdx; // SArray<SBlockIdx>
SArray *aSttBlk; // SArray<SSttBlk> SArray *aSttBlk; // SArray<SSttBlk>
SMapData mBlock; // SMapData<SDataBlk> SMapData mBlock; // SMapData<SDataBlk>
SBlockData bData; SBlockData bData;
SBlockData bDatal; SDiskDataBuilder *pBuilder;
} dWriter; } dWriter;
SSkmInfo skmTable; SSkmInfo skmTable;
SSkmInfo skmRow; SSkmInfo skmRow;
...@@ -139,7 +139,7 @@ int32_t tsdbBegin(STsdb *pTsdb) { ...@@ -139,7 +139,7 @@ int32_t tsdbBegin(STsdb *pTsdb) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} }
return code; return code;
} }
...@@ -180,7 +180,7 @@ int32_t tsdbCommit(STsdb *pTsdb) { ...@@ -180,7 +180,7 @@ int32_t tsdbCommit(STsdb *pTsdb) {
_exit: _exit:
if (code) { if (code) {
tsdbEndCommit(&commith, code); tsdbEndCommit(&commith, code);
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} }
return code; return code;
} }
...@@ -222,7 +222,7 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { ...@@ -222,7 +222,7 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d, %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else { } else {
tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode)); tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode));
} }
...@@ -280,7 +280,7 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel ...@@ -280,7 +280,7 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d, %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
tstrerror(code)); tstrerror(code));
} }
return code; return code;
...@@ -314,7 +314,7 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { ...@@ -314,7 +314,7 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d, %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
tstrerror(code)); tstrerror(code));
} }
return code; return code;
...@@ -466,7 +466,7 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { ...@@ -466,7 +466,7 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
tstrerror(code)); tstrerror(code));
} }
return code; return code;
...@@ -542,7 +542,6 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -542,7 +542,6 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
taosArrayClear(pCommitter->dWriter.aSttBlk); taosArrayClear(pCommitter->dWriter.aSttBlk);
tMapDataReset(&pCommitter->dWriter.mBlock); tMapDataReset(&pCommitter->dWriter.mBlock);
tBlockDataReset(&pCommitter->dWriter.bData); tBlockDataReset(&pCommitter->dWriter.bData);
tBlockDataReset(&pCommitter->dWriter.bDatal);
// open iter // open iter
code = tsdbOpenCommitIter(pCommitter); code = tsdbOpenCommitIter(pCommitter);
...@@ -550,7 +549,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -550,7 +549,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} }
return code; return code;
} }
...@@ -602,7 +601,7 @@ int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapDa ...@@ -602,7 +601,7 @@ int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapDa
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
} }
return code; return code;
} }
...@@ -645,7 +644,7 @@ int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray ...@@ -645,7 +644,7 @@ int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
} }
return code; return code;
} }
...@@ -681,7 +680,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { ...@@ -681,7 +680,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
tstrerror(code)); tstrerror(code));
} }
return code; return code;
...@@ -707,7 +706,7 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { ...@@ -707,7 +706,7 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
tstrerror(code)); tstrerror(code));
} }
return code; return code;
...@@ -734,7 +733,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { ...@@ -734,7 +733,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbDataFReaderClose(&pCommitter->dReader.pReader); tsdbDataFReaderClose(&pCommitter->dReader.pReader);
tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0); tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0);
} }
...@@ -772,7 +771,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { ...@@ -772,7 +771,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} }
return code; return code;
} }
...@@ -820,12 +819,12 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { ...@@ -820,12 +819,12 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
code = tBlockDataCreate(&pCommitter->dWriter.bData); code = tBlockDataCreate(&pCommitter->dWriter.bData);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataCreate(&pCommitter->dWriter.bDatal); code = tDiskDataBuilderCreate(&pCommitter->dWriter.pBuilder);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
tstrerror(code)); tstrerror(code));
} }
return code; return code;
...@@ -849,7 +848,7 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) { ...@@ -849,7 +848,7 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) {
taosArrayDestroy(pCommitter->dWriter.aSttBlk); taosArrayDestroy(pCommitter->dWriter.aSttBlk);
tMapDataClear(&pCommitter->dWriter.mBlock); tMapDataClear(&pCommitter->dWriter.mBlock);
tBlockDataDestroy(&pCommitter->dWriter.bData, 1); tBlockDataDestroy(&pCommitter->dWriter.bData, 1);
tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1); tDiskDataBuilderDestroy(pCommitter->dWriter.pBuilder);
tTSchemaDestroy(pCommitter->skmTable.pTSchema); tTSchemaDestroy(pCommitter->skmTable.pTSchema);
tTSchemaDestroy(pCommitter->skmRow.pTSchema); tTSchemaDestroy(pCommitter->skmRow.pTSchema);
} }
...@@ -880,7 +879,7 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) { ...@@ -880,7 +879,7 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} }
return code; return code;
} }
...@@ -966,7 +965,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) { ...@@ -966,7 +965,7 @@ static int32_t tsdbCommitDel(SCommitter *pCommitter) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else { } else {
tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel); tsdbDebug("vgId:%d commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
} }
...@@ -1006,7 +1005,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { ...@@ -1006,7 +1005,7 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else { } else {
tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode)); tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
} }
...@@ -1100,7 +1099,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { ...@@ -1100,7 +1099,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
tstrerror(code)); tstrerror(code));
} }
return code; return code;
...@@ -1148,7 +1147,7 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1148,7 +1147,7 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
tstrerror(code)); tstrerror(code));
} }
return code; return code;
...@@ -1235,7 +1234,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) ...@@ -1235,7 +1234,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk)
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
tstrerror(code)); tstrerror(code));
} }
return code; return code;
...@@ -1310,7 +1309,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1310,7 +1309,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
tstrerror(code)); tstrerror(code));
} }
return code; return code;
...@@ -1320,25 +1319,26 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) { ...@@ -1320,25 +1319,26 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SBlockData *pBDatal = &pCommitter->dWriter.bDatal; SDiskDataBuilder *pBuilder = pCommitter->dWriter.pBuilder;
if (pBDatal->suid || pBDatal->uid) { if (pBuilder->suid || pBuilder->uid) {
if ((pBDatal->suid != id.suid) || (id.suid == 0)) { if (!TABLE_SAME_SCHEMA(pBuilder->suid, pBuilder->uid, id.suid, id.uid)) {
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); // code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk,
// pCommitter->cmprAlg); // todo
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tBlockDataReset(pBDatal); // tBlockDataReset(pBDatal);
} }
} }
if (!pBDatal->suid && !pBDatal->uid) { if (!pBuilder->suid && !pBuilder->uid) {
ASSERT(pCommitter->skmTable.suid == id.suid); ASSERT(pCommitter->skmTable.suid == id.suid);
ASSERT(pCommitter->skmTable.uid == id.uid); ASSERT(pCommitter->skmTable.uid == id.uid);
code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema); // code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema); todo
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
tstrerror(code)); tstrerror(code));
} }
return code; return code;
...@@ -1349,7 +1349,6 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { ...@@ -1349,7 +1349,6 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
int32_t lino = 0; int32_t lino = 0;
SBlockData *pBData = &pCommitter->dWriter.bData; SBlockData *pBData = &pCommitter->dWriter.bData;
SBlockData *pBDatal = &pCommitter->dWriter.bDatal;
TABLEID id = {.suid = pBData->suid, .uid = pBData->uid}; TABLEID id = {.suid = pBData->suid, .uid = pBData->uid};
code = tsdbInitLastBlockIfNeed(pCommitter, id); code = tsdbInitLastBlockIfNeed(pCommitter, id);
...@@ -1357,18 +1356,20 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { ...@@ -1357,18 +1356,20 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) { for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) {
TSDBROW row = tsdbRowFromBlockData(pBData, iRow); TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
code = tBlockDataAppendRow(pBDatal, &row, NULL, pBData->uid);
code = tDiskDataBuilderAddRow(pCommitter->dWriter.pBuilder, &row, NULL, &id);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (pBDatal->nRow >= pCommitter->maxRow) { if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) {
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, NULL /*TODO */, pCommitter->dWriter.aSttBlk,
pCommitter->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
tstrerror(code)); tstrerror(code));
} }
return code; return code;
...@@ -1385,60 +1386,79 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { ...@@ -1385,60 +1386,79 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
if (pRowInfo == NULL) goto _exit; if (pRowInfo == NULL) goto _exit;
SBlockData *pBData;
if (pCommitter->toLastOnly) { if (pCommitter->toLastOnly) {
pBData = &pCommitter->dWriter.bDatal; // init the data if need
code = tsdbInitLastBlockIfNeed(pCommitter, id);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
pBData = &pCommitter->dWriter.bData;
ASSERT(pBData->nRow == 0);
}
while (pRowInfo) { while (pRowInfo) {
STSchema *pTSchema = NULL; STSchema *pTSchema = NULL;
if (pRowInfo->row.type == 0) { if (pRowInfo->row.type == 0) {
code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
TSDB_CHECK_CODE(code, lino, _exit);
pTSchema = pCommitter->skmRow.pTSchema;
}
code = tDiskDataBuilderAddRow(pCommitter->dWriter.pBuilder, &pRowInfo->row, pTSchema, &id);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
pTSchema = pCommitter->skmRow.pTSchema;
}
code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid); code = tsdbNextCommitRow(pCommitter);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbNextCommitRow(pCommitter); pRowInfo = tsdbGetCommitRow(pCommitter);
TSDB_CHECK_CODE(code, lino, _exit); if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
pRowInfo = NULL;
}
pRowInfo = tsdbGetCommitRow(pCommitter); if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) {
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { // code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk,
pRowInfo = NULL; // pCommitter->cmprAlg); (todo)
TSDB_CHECK_CODE(code, lino, _exit);
}
} }
} else {
SBlockData *pBData = &pCommitter->dWriter.bData;
ASSERT(pBData->nRow == 0);
if (pBData->nRow >= pCommitter->maxRow) { while (pRowInfo) {
if (pCommitter->toLastOnly) { STSchema *pTSchema = NULL;
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); if (pRowInfo->row.type == 0) {
code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else { pTSchema = pCommitter->skmRow.pTSchema;
}
code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbNextCommitRow(pCommitter);
TSDB_CHECK_CODE(code, lino, _exit);
pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
pRowInfo = NULL;
}
if (pBData->nRow >= pCommitter->maxRow) {
code = code =
tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
}
if (!pCommitter->toLastOnly && pBData->nRow) { if (pBData->nRow) {
if (pBData->nRow > pCommitter->minRow) { if (pBData->nRow > pCommitter->minRow) {
code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); code =
TSDB_CHECK_CODE(code, lino, _exit); tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
} else { TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbAppendLastBlock(pCommitter); } else {
TSDB_CHECK_CODE(code, lino, _exit); code = tsdbAppendLastBlock(pCommitter);
TSDB_CHECK_CODE(code, lino, _exit);
}
} }
} }
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
tstrerror(code)); tstrerror(code));
} }
return code; return code;
...@@ -1495,13 +1515,13 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { ...@@ -1495,13 +1515,13 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
code = tsdbMoveCommitData(pCommitter, id); code = tsdbMoveCommitData(pCommitter, id);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, // code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
pCommitter->cmprAlg); // pCommitter->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s fail at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
tstrerror(code)); tstrerror(code));
} }
return code; return code;
......
...@@ -34,20 +34,6 @@ struct SDiskColBuilder { ...@@ -34,20 +34,6 @@ struct SDiskColBuilder {
uint8_t *aBuf[1]; uint8_t *aBuf[1];
}; };
struct SDiskDataBuilder {
int64_t suid;
int64_t uid;
int32_t nRow;
uint8_t cmprAlg;
uint8_t calcSma;
SCompressor *pUidC;
SCompressor *pVerC;
SCompressor *pKeyC;
int32_t nBuilder;
SArray *aBuilder; // SArray<SDiskColBuilder>
uint8_t *aBuf[2];
};
// SDiskColBuilder ================================================ // SDiskColBuilder ================================================
#define tDiskColBuilderCreate() \ #define tDiskColBuilderCreate() \
(SDiskColBuilder) { 0 } (SDiskColBuilder) { 0 }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册