提交 72038198 编写于 作者: H Hongze Cheng

more code

上级 b88327d7
...@@ -44,7 +44,6 @@ struct STsdbDataIter2 { ...@@ -44,7 +44,6 @@ struct STsdbDataIter2 {
int32_t iBlockIdx; int32_t iBlockIdx;
int32_t iDataBlk; int32_t iDataBlk;
int32_t iRow; int32_t iRow;
} dIter; } dIter;
// TSDB_STT_FILE_DATA_ITER // TSDB_STT_FILE_DATA_ITER
...@@ -1022,8 +1021,18 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI ...@@ -1022,8 +1021,18 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
if (pId) {
pWriter->tbid = *pId;
} else {
pWriter->tbid = (TABLEID){INT64_MAX, INT64_MAX};
}
if (pWriter->pDIter) { if (pWriter->pDIter) {
STsdbDataIter2* pIter = pWriter->pDIter; STsdbDataIter2* pIter = pWriter->pDIter;
ASSERT(pIter->dIter.iRow >= pIter->dIter.bData.nRow);
ASSERT(pIter->dIter.iDataBlk >= pIter->dIter.mDataBlk.nItem);
for (;;) { for (;;) {
if (pIter->dIter.iBlockIdx >= taosArrayGetSize(pIter->dIter.aBlockIdx)) { if (pIter->dIter.iBlockIdx >= taosArrayGetSize(pIter->dIter.aBlockIdx)) {
pWriter->pDIter = NULL; pWriter->pDIter = NULL;
...@@ -1032,10 +1041,8 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI ...@@ -1032,10 +1041,8 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx); SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx);
int32_t c = tTABLEIDCmprFn(pBlockIdx, pId); int32_t c = tTABLEIDCmprFn(pBlockIdx, &pWriter->tbid);
if (c < 0) { if (c < 0) {
pIter->dIter.iBlockIdx++;
code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk); code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -1050,43 +1057,46 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI ...@@ -1050,43 +1057,46 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI
code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pIter->dIter.mDataBlk, pNewBlockIdx); code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pIter->dIter.mDataBlk, pNewBlockIdx);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else if (c == 0) {
pIter->dIter.iBlockIdx++;
pIter->dIter.iBlockIdx++;
} else if (c == 0) {
code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk); code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
pIter->dIter.iDataBlk = 0; pIter->dIter.iDataBlk = 0;
pIter->dIter.iBlockIdx++;
break; break;
} else { } else {
pIter->dIter.iDataBlk = pIter->dIter.mDataBlk.nItem;
break; break;
} }
} }
} }
pWriter->tbid = pId[0]; if (pId == NULL) {
if (pWriter->sData.nRow) {
tMapDataReset(&pWriter->mDataBlk); code = tsdbWriteSttBlock(pWriter->pDataFWriter, &pWriter->sData, pWriter->aSttBlk, pWriter->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
#if 0 }
code = tsdbSnapWriteCopyData(pWriter, pId); } else {
TSDB_CHECK_CODE(code, lino, _exit); code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable);
TSDB_CHECK_CODE(code, lino, _exit);
pWriter->id.suid = pId->suid; tMapDataReset(&pWriter->mDataBlk);
pWriter->id.uid = pId->uid;
code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable); code = tBlockDataInit(&pWriter->bData, pId, pWriter->skmTable.pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
tMapDataReset(&pWriter->dWriter.mDataBlk); // TODO: init pWriter->sData ??
code = tBlockDataInit(&pWriter->dWriter.bData, pId, pWriter->skmTable.pTSchema, NULL, 0); }
TSDB_CHECK_CODE(code, lino, _exit);
#endif
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbTrace("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__, pId->suid,
pId->uid);
} }
return code; return code;
} }
...@@ -1219,6 +1229,7 @@ static int32_t tsdbSnapWriteFileDataStart(STsdbSnapWriter* pWriter, int32_t fid) ...@@ -1219,6 +1229,7 @@ static int32_t tsdbSnapWriteFileDataStart(STsdbSnapWriter* pWriter, int32_t fid)
} }
tBlockDataReset(&pWriter->bData); tBlockDataReset(&pWriter->bData);
tBlockDataReset(&pWriter->sData);
_exit: _exit:
if (code) { if (code) {
...@@ -1342,7 +1353,7 @@ static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) { ...@@ -1342,7 +1353,7 @@ static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
code = tBlockDataAppendRow(&pWriter->bData, pRow, NULL, pWriter->tbid.uid); code = tBlockDataAppendRow(&pWriter->bData, pRow, pWriter->skmTable.pTSchema, pWriter->tbid.uid);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (pWriter->bData.nRow >= pWriter->maxRow) { if (pWriter->bData.nRow >= pWriter->maxRow) {
...@@ -1362,8 +1373,8 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn ...@@ -1362,8 +1373,8 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn
int32_t lino = 0; int32_t lino = 0;
// switch to new table if need // switch to new table if need
if (pRowInfo->uid != pWriter->tbid.uid) { if (pRowInfo == NULL || pRowInfo->uid != pWriter->tbid.uid) {
if (pRowInfo->uid) { if (pWriter->tbid.uid != 0) {
code = tsdbSnapWriteTableDataEnd(pWriter); code = tsdbSnapWriteTableDataEnd(pWriter);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
...@@ -1372,6 +1383,9 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn ...@@ -1372,6 +1383,9 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
// end with a NULL row
if (pRowInfo == NULL) goto _exit;
// do write the row // do write the row
if (pWriter->pDIter == NULL || (pWriter->pDIter->dIter.iRow >= pWriter->pDIter->dIter.bData.nRow && if (pWriter->pDIter == NULL || (pWriter->pDIter->dIter.iRow >= pWriter->pDIter->dIter.bData.nRow &&
pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem)) { pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem)) {
...@@ -1397,7 +1411,14 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn ...@@ -1397,7 +1411,14 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn
} }
} }
while (pWriter->pDIter->dIter.iDataBlk < pWriter->pDIter->dIter.mDataBlk.nItem) { for (;;) {
if (pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem) {
code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
TSDB_CHECK_CODE(code, lino, _exit);
goto _exit;
}
// FIXME: Here can be slow, use array instead
SDataBlk dataBlk; SDataBlk dataBlk;
tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk, &dataBlk, tGetDataBlk); tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);
...@@ -1408,8 +1429,10 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn ...@@ -1408,8 +1429,10 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
goto _exit; goto _exit;
} else if (c < 0) { } else if (c < 0) {
code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg); if (pWriter->bData.nRow > 0) {
TSDB_CHECK_CODE(code, lino, _exit); code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
}
tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk); tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk);
pWriter->pDIter->dIter.iDataBlk++; pWriter->pDIter->dIter.iDataBlk++;
...@@ -1417,8 +1440,8 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn ...@@ -1417,8 +1440,8 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn
code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData); code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
pWriter->pDIter->dIter.iDataBlk++;
pWriter->pDIter->dIter.iRow = 0; pWriter->pDIter->dIter.iRow = 0;
pWriter->pDIter->dIter.iDataBlk++;
break; break;
} }
} }
...@@ -1455,7 +1478,6 @@ static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* pWriter, SSnapDataHd ...@@ -1455,7 +1478,6 @@ static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* pWriter, SSnapDataHd
// loop write each row // loop write each row
SRowInfo* pRowInfo; SRowInfo* pRowInfo;
code = tsdbSnapWriteGetRow(pWriter, &pRowInfo); code = tsdbSnapWriteGetRow(pWriter, &pRowInfo);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t iRow = 0; iRow < pWriter->bData.nRow; ++iRow) { for (int32_t iRow = 0; iRow < pWriter->bData.nRow; ++iRow) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册