提交 d6028bdd 编写于 作者: H Hongze Cheng

some code

上级 78d674e9
...@@ -202,6 +202,7 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol ...@@ -202,6 +202,7 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol
uint8_t **ppBuf); uint8_t **ppBuf);
int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData, int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData,
uint8_t **ppBuf); uint8_t **ppBuf);
int32_t tRowInfoCmprFn(const void *p1, const void *p2);
// tsdbMemTable ============================================================================================== // tsdbMemTable ==============================================================================================
// SMemTable // SMemTable
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
......
...@@ -247,7 +247,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader); ...@@ -247,7 +247,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData); int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
// STsdbSnapWriter ======================================== // STsdbSnapWriter ========================================
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter); int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter);
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr);
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter); int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter);
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
// STqSnapshotReader == // STqSnapshotReader ==
......
...@@ -423,10 +423,10 @@ int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) ...@@ -423,10 +423,10 @@ int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
// rsma1/rsma2 // rsma1/rsma2
if (pHdr->type == SNAP_DATA_RSMA1) { if (pHdr->type == SNAP_DATA_RSMA1) {
pHdr->type = SNAP_DATA_TSDB; pHdr->type = SNAP_DATA_TSDB;
code = tsdbSnapWrite(pWriter->pDataWriter[0], pData, nData); code = tsdbSnapWrite(pWriter->pDataWriter[0], pHdr);
} else if (pHdr->type == SNAP_DATA_RSMA2) { } else if (pHdr->type == SNAP_DATA_RSMA2) {
pHdr->type = SNAP_DATA_TSDB; pHdr->type = SNAP_DATA_TSDB;
code = tsdbSnapWrite(pWriter->pDataWriter[1], pData, nData); code = tsdbSnapWrite(pWriter->pDataWriter[1], pHdr);
} else if (pHdr->type == SNAP_DATA_QTASK) { } else if (pHdr->type == SNAP_DATA_QTASK) {
code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData); code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData);
} else { } else {
......
...@@ -15,6 +15,280 @@ ...@@ -15,6 +15,280 @@
#include "tsdb.h" #include "tsdb.h"
extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData);
// STsdbDataIter2 ========================================
#define TSDB_MEM_TABLE_DATA_ITER 0
#define TSDB_DATA_FILE_DATA_ITER 1
#define TSDB_STT_FILE_DATA_ITER 2
typedef struct STsdbDataIter2 STsdbDataIter2;
struct STsdbDataIter2 {
STsdbDataIter2* next;
SRBTreeNode rbtn;
int32_t type;
SRowInfo rowInfo;
union {
// TSDB_MEM_TABLE_DATA_ITER
struct {
SMemTable* pMemTable;
} mIter;
// TSDB_DATA_FILE_DATA_ITER
struct {
SDataFReader* pReader;
SArray* aBlockIdx; // SArray<SBlockIdx>
SMapData mDataBlk;
SBlockData bData;
int32_t iBlockIdx;
int32_t iDataBlk;
int32_t iRow;
} dIter;
// TSDB_STT_FILE_DATA_ITER
struct {
SDataFReader* pReader;
int32_t iStt;
SArray* aSttBlk;
SBlockData bData;
int32_t iSttBlk;
int32_t iRow;
} sIter;
};
};
#define TSDB_RBTN_TO_DATA_ITER(pNode) ((STsdbDataIter2*)(((char*)pNode) - offsetof(STsdbDataIter2, rbtn)))
/* open */
static int32_t tsdbOpenDataFileDataIter(SDataFReader* pReader, STsdbDataIter2** ppIter) {
int32_t code = 0;
int32_t lino = 0;
// create handle
STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter));
if (pIter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pIter->type = TSDB_DATA_FILE_DATA_ITER;
pIter->dIter.pReader = pReader;
if ((pIter->dIter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataCreate(&pIter->dIter.bData);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->dIter.iBlockIdx = -1;
pIter->dIter.iDataBlk = -1;
pIter->dIter.iRow = -1;
// read data
code = tsdbReadBlockIdx(pReader, pIter->dIter.aBlockIdx);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pIter->dIter.aBlockIdx) == 0) goto _clear;
_exit:
if (code) {
if (pIter) {
_clear:
tBlockDataDestroy(&pIter->dIter.bData, 1);
taosArrayDestroy(pIter->dIter.aBlockIdx);
taosMemoryFree(pIter);
pIter = NULL;
}
}
*ppIter = pIter;
return code;
}
static int32_t tsdbOpenSttFileDataIter(SDataFReader* pReader, int32_t iStt, STsdbDataIter2** ppIter) {
int32_t code = 0;
int32_t lino = 0;
// create handle
STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter));
if (pIter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pIter->type = TSDB_STT_FILE_DATA_ITER;
pIter->sIter.pReader = pReader;
pIter->sIter.iStt = iStt;
pIter->sIter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if (pIter->sIter.aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataCreate(&pIter->sIter.bData);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->sIter.iSttBlk = -1;
pIter->sIter.iRow = -1;
// read data
code = tsdbReadSttBlk(pReader, iStt, pIter->sIter.aSttBlk);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pIter->sIter.aSttBlk) == 0) goto _clear;
_exit:
if (code) {
if (pIter) {
_clear:
taosArrayDestroy(pIter->sIter.aSttBlk);
tBlockDataDestroy(&pIter->sIter.bData, 1);
taosMemoryFree(pIter);
pIter = NULL;
}
}
*ppIter = pIter;
return code;
}
/* close */
static void tsdbCloseDataFileDataIter(STsdbDataIter2* pIter) {
tBlockDataDestroy(&pIter->dIter.bData, 1);
tMapDataClear(&pIter->dIter.mDataBlk);
taosArrayDestroy(pIter->dIter.aBlockIdx);
taosMemoryFree(pIter);
}
static void tsdbCloseSttFileDataIter(STsdbDataIter2* pIter) {
tBlockDataDestroy(&pIter->sIter.bData, 1);
taosArrayDestroy(pIter->sIter.aSttBlk);
taosMemoryFree(pIter);
}
static void tsdbCloseDataIter2(STsdbDataIter2* pIter) {
if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) {
ASSERT(0);
} else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) {
tsdbCloseDataFileDataIter(pIter);
} else if (pIter->type == TSDB_STT_FILE_DATA_ITER) {
tsdbCloseSttFileDataIter(pIter);
} else {
ASSERT(0);
}
}
/* cmpr */
static int32_t tsdbDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) {
STsdbDataIter2* pIter1 = TSDB_RBTN_TO_DATA_ITER(pNode1);
STsdbDataIter2* pIter2 = TSDB_RBTN_TO_DATA_ITER(pNode2);
return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo);
}
/* seek */
/* iter next */
static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter) {
int32_t code = 0;
int32_t lino = 0;
for (;;) {
if (++pIter->dIter.iRow < pIter->dIter.bData.nRow) {
pIter->rowInfo.suid = pIter->dIter.bData.suid;
pIter->rowInfo.uid = pIter->dIter.bData.uid;
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow);
break;
}
for (;;) {
if (++pIter->dIter.iDataBlk < pIter->dIter.mDataBlk.nItem) {
SDataBlk dataBlk;
tMapDataGetItemByIdx(&pIter->dIter.mDataBlk, pIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);
code = tsdbReadDataBlockEx(pIter->dIter.pReader, &dataBlk, &pIter->dIter.bData);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->dIter.iRow = -1;
break;
}
for (;;) {
if (++pIter->dIter.iBlockIdx < taosArrayGetSize(pIter->dIter.aBlockIdx)) {
SBlockIdx* pBlockIdx = taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx);
code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->dIter.iDataBlk = -1;
break;
} else {
pIter->rowInfo = (SRowInfo){0};
goto _exit;
}
}
}
}
_exit:
if (code) {
tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbSttFileDataIterNext(STsdbDataIter2* pIter) {
int32_t code = 0;
int32_t lino = 0;
for (;;) {
if (++pIter->sIter.iRow < pIter->sIter.bData.nRow) {
pIter->rowInfo.suid = pIter->sIter.bData.suid;
pIter->rowInfo.uid = pIter->sIter.bData.uid ? pIter->sIter.bData.uid : pIter->sIter.bData.aUid[pIter->sIter.iRow];
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->sIter.bData, pIter->sIter.iRow);
break;
}
if (++pIter->sIter.iSttBlk < taosArrayGetSize(pIter->sIter.aSttBlk)) {
SSttBlk* pSttBlk = taosArrayGet(pIter->sIter.aSttBlk, pIter->sIter.iSttBlk);
code = tsdbReadSttBlockEx(pIter->sIter.pReader, pIter->sIter.iStt, pSttBlk, &pIter->sIter.bData);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->sIter.iRow = -1;
} else {
pIter->rowInfo = (SRowInfo){0};
break;
}
}
_exit:
if (code) {
tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbDataIterNext2(STsdbDataIter2* pIter) {
int32_t code = 0;
if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) {
ASSERT(0);
return code;
} else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) {
return tsdbDataFileDataIterNext(pIter);
} else if (pIter->type == TSDB_STT_FILE_DATA_ITER) {
return tsdbSttFileDataIterNext(pIter);
} else {
ASSERT(0);
return code;
}
}
/* get */
// STsdbSnapReader ======================================== // STsdbSnapReader ========================================
typedef enum { SNAP_DATA_FILE_ITER = 0, SNAP_STT_FILE_ITER } EFIterT; typedef enum { SNAP_DATA_FILE_ITER = 0, SNAP_STT_FILE_ITER } EFIterT;
typedef struct { typedef struct {
...@@ -63,8 +337,6 @@ struct STsdbSnapReader { ...@@ -63,8 +337,6 @@ struct STsdbSnapReader {
uint8_t* aBuf[5]; uint8_t* aBuf[5];
}; };
extern int32_t tRowInfoCmprFn(const void* p1, const void* p2);
extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData);
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo); extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
static int32_t tFDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) { static int32_t tFDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) {
...@@ -632,12 +904,9 @@ _exit: ...@@ -632,12 +904,9 @@ _exit:
// STsdbSnapWriter ======================================== // STsdbSnapWriter ========================================
struct STsdbSnapWriter { struct STsdbSnapWriter {
STsdb* pTsdb; STsdb* pTsdb;
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
STsdbFS fs;
// config
int32_t minutes; int32_t minutes;
int8_t precision; int8_t precision;
int32_t minRow; int32_t minRow;
...@@ -646,31 +915,31 @@ struct STsdbSnapWriter { ...@@ -646,31 +915,31 @@ struct STsdbSnapWriter {
int64_t commitID; int64_t commitID;
uint8_t* aBuf[5]; uint8_t* aBuf[5];
// for data file STsdbFS fs;
SBlockData bData;
int32_t fid;
TABLEID id;
SSkmInfo skmTable;
struct {
SDataFReader* pReader;
SArray* aBlockIdx;
int32_t iBlockIdx;
SBlockIdx* pBlockIdx;
SMapData mDataBlk;
int32_t iDataBlk;
SBlockData bData;
int32_t iRow;
} dReader;
struct {
SDataFWriter* pWriter;
SArray* aBlockIdx;
SMapData mDataBlk;
SArray* aSttBlk;
SBlockData bData;
SBlockData sData;
} dWriter;
// for del file // time-series data
SBlockData inData;
int32_t fid;
TABLEID tbid;
SSkmInfo skmTable;
/* reader */
SDataFReader* pDataFReader;
STsdbDataIter2* iterList;
STsdbDataIter2* pDIter;
STsdbDataIter2* pIter;
SRBTree rbt; // SRBTree<STsdbDataIter2>
/* writer */
SDataFWriter* pDataFWriter;
SArray* aBlockIdx;
SMapData mDataBlk; // SMapData<SDataBlk>
SArray* aSttBlk; // SArray<SSttBlk>
SBlockData bData;
SBlockData sData;
// tombstone data
SDelFReader* pDelFReader; SDelFReader* pDelFReader;
SDelFWriter* pDelFWriter; SDelFWriter* pDelFWriter;
int32_t iDelIdx; int32_t iDelIdx;
...@@ -685,7 +954,9 @@ extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, ...@@ -685,7 +954,9 @@ extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData,
static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) { static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
#if 0
ASSERT(pWriter->dReader.iRow >= pWriter->dReader.bData.nRow); ASSERT(pWriter->dReader.iRow >= pWriter->dReader.bData.nRow);
if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) { if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) {
...@@ -702,14 +973,20 @@ static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) { ...@@ -702,14 +973,20 @@ static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) {
pWriter->dReader.iDataBlk = 0; // point to the next one pWriter->dReader.iDataBlk = 0; // point to the next one
tBlockDataReset(&pWriter->dReader.bData); tBlockDataReset(&pWriter->dReader.bData);
pWriter->dReader.iRow = 0; pWriter->dReader.iRow = 0;
#endif
_exit: _exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code; return code;
} }
static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) { static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
#if 0
while (true) { while (true) {
if (pWriter->dReader.pBlockIdx == NULL) break; if (pWriter->dReader.pBlockIdx == NULL) break;
if (tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0) break; if (tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0) break;
...@@ -726,37 +1003,93 @@ static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) { ...@@ -726,37 +1003,93 @@ static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) {
code = tsdbSnapNextTableData(pWriter); code = tsdbSnapNextTableData(pWriter);
if (code) goto _exit; if (code) goto _exit;
} }
#endif
_exit: _exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code; return code;
} }
static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) { static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
if (pWriter->pDIter) {
STsdbDataIter2* pIter = pWriter->pDIter;
for (;;) {
if (pIter->dIter.iBlockIdx + 1 >= taosArrayGetSize(pIter->dIter.aBlockIdx)) {
pWriter->pDIter = NULL;
break;
}
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx + 1);
int32_t c = tTABLEIDCmprFn(pBlockIdx, pId);
if (c < 0) {
++pIter->dIter.iBlockIdx;
code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
TSDB_CHECK_CODE(code, lino, _exit);
SBlockIdx* pNewBlockIdx = taosArrayReserve(pWriter->aBlockIdx, 1);
if (pNewBlockIdx == NULL) {
code == TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pNewBlockIdx->suid = pBlockIdx->suid;
pNewBlockIdx->uid = pBlockIdx->uid;
code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pIter->dIter.mDataBlk, pBlockIdx);
TSDB_CHECK_CODE(code, lino, _exit);
} else if (c == 0) {
++pIter->dIter.iBlockIdx;
code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->dIter.iDataBlk = -1;
break;
} else {
break;
}
}
}
pWriter->tbid = pId[0];
tMapDataReset(&pWriter->mDataBlk);
#if 0
code = tsdbSnapWriteCopyData(pWriter, pId); code = tsdbSnapWriteCopyData(pWriter, pId);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
pWriter->id.suid = pId->suid; pWriter->id.suid = pId->suid;
pWriter->id.uid = pId->uid; pWriter->id.uid = pId->uid;
code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable); code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
tMapDataReset(&pWriter->dWriter.mDataBlk); tMapDataReset(&pWriter->dWriter.mDataBlk);
code = tBlockDataInit(&pWriter->dWriter.bData, pId, pWriter->skmTable.pTSchema, NULL, 0); code = tBlockDataInit(&pWriter->dWriter.bData, pId, pWriter->skmTable.pTSchema, NULL, 0);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
#endif
return code;
_err: _exit:
tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code; return code;
} }
static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
#if 0
if (pWriter->id.suid == 0 && pWriter->id.uid == 0) return code; if (pWriter->id.suid == 0 && pWriter->id.uid == 0) return code;
int32_t c = 1; int32_t c = 1;
...@@ -807,123 +1140,174 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { ...@@ -807,123 +1140,174 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
pWriter->id.suid = 0; pWriter->id.suid = 0;
pWriter->id.uid = 0; pWriter->id.uid = 0;
#endif
return code; _exit:
if (code) {
_err: tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code; return code;
} }
static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) { static int32_t tsdbSnapWriteOpenDataFile(STsdbSnapWriter* pWriter, int32_t fid) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb; int32_t lino = 0;
ASSERT(pWriter->pDataFWriter == NULL && pWriter->fid < fid);
ASSERT(pWriter->dWriter.pWriter == NULL); STsdb* pTsdb = pWriter->pTsdb;
pWriter->fid = fid; pWriter->fid = fid;
pWriter->id = (TABLEID){0}; pWriter->tbid = (TABLEID){0};
SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);
// Reader // open reader
pWriter->pDataFReader = NULL;
pWriter->iterList = NULL;
pWriter->pDIter = NULL;
pWriter->pIter = NULL;
tRBTreeCreate(&pWriter->rbt, tsdbDataIterCmprFn);
if (pSet) { if (pSet) {
code = tsdbDataFReaderOpen(&pWriter->dReader.pReader, pWriter->pTsdb, pSet); code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadBlockIdx(pWriter->dReader.pReader, pWriter->dReader.aBlockIdx); code = tsdbOpenDataFileDataIter(pWriter->pDataFReader, &pWriter->pDIter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} else { if (pWriter->pDIter) {
ASSERT(pWriter->dReader.pReader == NULL); pWriter->pDIter->next = pWriter->iterList;
taosArrayClear(pWriter->dReader.aBlockIdx); pWriter->iterList = pWriter->pDIter;
} }
pWriter->dReader.iBlockIdx = 0; // point to the next one
code = tsdbSnapNextTableData(pWriter);
if (code) goto _err;
// Writer
SHeadFile fHead = {.commitID = pWriter->commitID};
SDataFile fData = {.commitID = pWriter->commitID};
SSmaFile fSma = {.commitID = pWriter->commitID};
SSttFile fStt = {.commitID = pWriter->commitID};
SDFileSet wSet = {.fid = pWriter->fid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
if (pSet) {
wSet.diskId = pSet->diskId;
fData = *pSet->pDataF;
fSma = *pSet->pSmaF;
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
wSet.aSttF[iStt] = pSet->aSttF[iStt]; code = tsdbOpenSttFileDataIter(pWriter->pDataFReader, iStt, &pWriter->pIter);
TSDB_CHECK_CODE(code, lino, _exit);
if (pWriter->pIter) {
code = tsdbSttFileDataIterNext(pWriter->pIter);
TSDB_CHECK_CODE(code, lino, _exit);
// add to tree
tRBTreePut(&pWriter->rbt, &pWriter->pIter->rbtn);
// add to list
pWriter->pIter->next = pWriter->iterList;
pWriter->iterList = pWriter->pIter;
}
} }
wSet.nSttF = pSet->nSttF + 1; // TODO: fix pSet->nSttF == pTsdb->maxFile
pWriter->pIter = NULL;
}
// open writer
SDiskID diskId;
if (pSet) {
diskId = pSet->diskId;
} else { } else {
SDiskID did = {0}; tfsAllocDisk(pTsdb->pVnode->pTfs, 0 /*TODO*/, &diskId);
tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did); tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, diskId);
tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did); }
wSet.diskId = did; SDFileSet wSet = {.diskId = diskId,
wSet.nSttF = 1; .fid = fid,
.pHeadF = &(SHeadFile){.commitID = pWriter->commitID},
.pDataF = (pSet) ? pSet->pDataF : &(SDataFile){.commitID = pWriter->commitID},
.pSmaF = (pSet) ? pSet->pSmaF : &(SSmaFile){.commitID = pWriter->commitID},
.nSttF = 1,
.aSttF = {&(SSttFile){.commitID = pWriter->commitID}}};
code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet);
TSDB_CHECK_CODE(code, lino, _exit);
if (pWriter->aBlockIdx) {
taosArrayClear(pWriter->aBlockIdx);
} else if ((pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
} }
wSet.aSttF[wSet.nSttF - 1] = &fStt;
code = tsdbDataFWriterOpen(&pWriter->dWriter.pWriter, pWriter->pTsdb, &wSet); tMapDataReset(&pWriter->mDataBlk);
if (code) goto _err;
taosArrayClear(pWriter->dWriter.aBlockIdx);
tMapDataReset(&pWriter->dWriter.mDataBlk);
taosArrayClear(pWriter->dWriter.aSttBlk);
tBlockDataReset(&pWriter->dWriter.bData);
tBlockDataReset(&pWriter->dWriter.sData);
return code; if (pWriter->aSttBlk) {
taosArrayClear(pWriter->aSttBlk);
} else if ((pWriter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
_err: tBlockDataReset(&pWriter->bData);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code),
fid);
} else {
tsdbDebug("vgId:%d %s done, fid:%d", TD_VID(pTsdb->pVnode), __func__, fid);
}
return code; return code;
} }
static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter) { static int32_t tsdbSnapWriteCloseDataFile(STsdbSnapWriter* pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
ASSERT(pWriter->dWriter.pWriter); ASSERT(pWriter->pDataFWriter);
code = tsdbSnapWriteTableDataEnd(pWriter); #if 0
if (code) goto _err; // loop write remain data
for (;;) {
SRowInfo* pRowInfo;
// copy remain table data code = tsdbSnapWriteGetRow(pWriter, &pRowInfo);
TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX}; TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbSnapWriteCopyData(pWriter, &id);
if (code) goto _err;
code = if (pRowInfo == NULL) break;
tsdbWriteSttBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.sData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg);
if (code) goto _err;
// Indices code = tsdbSnapWriteTableData(pWriter, pRowInfo);
code = tsdbWriteBlockIdx(pWriter->dWriter.pWriter, pWriter->dWriter.aBlockIdx); TSDB_CHECK_CODE(code, lino, _exit);
if (code) goto _err;
code = tsdbWriteSttBlk(pWriter->dWriter.pWriter, pWriter->dWriter.aSttBlk); code = tsdbSnapWriteNextRow(pWriter);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbUpdateDFileSetHeader(pWriter->dWriter.pWriter); // TODO: write remain data
if (code) goto _err; #endif
code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->dWriter.pWriter->wSet); // do file-level updates
if (code) goto _err; code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aSttBlk);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDataFWriterClose(&pWriter->dWriter.pWriter, 1); code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdx);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
if (pWriter->dReader.pReader) { code = tsdbUpdateDFileSetHeader(pWriter->pDataFWriter);
code = tsdbDataFReaderClose(&pWriter->dReader.pReader); TSDB_CHECK_CODE(code, lino, _exit);
if (code) goto _err;
code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1);
TSDB_CHECK_CODE(code, lino, _exit);
if (pWriter->pDataFReader) {
code = tsdbDataFReaderClose(&pWriter->pDataFReader);
TSDB_CHECK_CODE(code, lino, _exit);
} }
_exit: // TODO: do clear sources
return code; {}
_err: _exit:
if (code) {
tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
} else {
tsdbDebug("vgId:%d %s is done", TD_VID(pWriter->pTsdb->pVnode), __func__);
}
return code; return code;
} }
static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, int8_t* done) { static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, int8_t* done) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
#if 0
SBlockData* pBData = &pWriter->bData; SBlockData* pBData = &pWriter->bData;
TABLEID id = {.suid = pBData->suid, .uid = pBData->uid ? pBData->uid : pBData->aUid[iRow]}; TABLEID id = {.suid = pBData->suid, .uid = pBData->uid ? pBData->uid : pBData->aUid[iRow]};
TSDBROW row = tsdbRowFromBlockData(pBData, iRow); TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
...@@ -942,10 +1326,10 @@ static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, i ...@@ -942,10 +1326,10 @@ static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, i
int32_t c = tsdbKeyCmprFn(&key, &tKey); int32_t c = tsdbKeyCmprFn(&key, &tKey);
if (c < 0) { if (c < 0) {
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid); code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} else if (c > 0) { } else if (c > 0) {
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &trow, NULL, id.uid); code = tBlockDataAppendRow(&pWriter->dWriter.bData, &trow, NULL, id.uid);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
ASSERT(0); ASSERT(0);
} }
...@@ -953,7 +1337,7 @@ static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, i ...@@ -953,7 +1337,7 @@ static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, i
if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) { if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
pWriter->cmprAlg); pWriter->cmprAlg);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
if (c < 0) { if (c < 0) {
...@@ -972,25 +1356,25 @@ static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, i ...@@ -972,25 +1356,25 @@ static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, i
if (c < 0) { if (c < 0) {
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
pWriter->cmprAlg); pWriter->cmprAlg);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk); code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} else if (c > 0) { } else if (c > 0) {
code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid); code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) { if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) {
code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk,
pWriter->cmprAlg); pWriter->cmprAlg);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
*done = 1; *done = 1;
goto _exit; goto _exit;
} else { } else {
code = tsdbReadDataBlockEx(pWriter->dReader.pReader, &dataBlk, &pWriter->dReader.bData); code = tsdbReadDataBlockEx(pWriter->dReader.pReader, &dataBlk, &pWriter->dReader.bData);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
pWriter->dReader.iRow = 0; pWriter->dReader.iRow = 0;
pWriter->dReader.iDataBlk++; pWriter->dReader.iDataBlk++;
...@@ -998,18 +1382,20 @@ static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, i ...@@ -998,18 +1382,20 @@ static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, i
} }
} }
} }
#endif
_exit: _exit:
return code; if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
_err: }
tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
return code; return code;
} }
static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) { static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
#if 0
TABLEID id = {.suid = pWriter->bData.suid, TABLEID id = {.suid = pWriter->bData.suid,
.uid = pWriter->bData.uid ? pWriter->bData.uid : pWriter->bData.aUid[iRow]}; .uid = pWriter->bData.uid ? pWriter->bData.uid : pWriter->bData.aUid[iRow]};
TSDBROW row = tsdbRowFromBlockData(&pWriter->bData, iRow); TSDBROW row = tsdbRowFromBlockData(&pWriter->bData, iRow);
...@@ -1017,8 +1403,8 @@ static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) { ...@@ -1017,8 +1403,8 @@ static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) {
if (pBData->suid || pBData->uid) { if (pBData->suid || pBData->uid) {
if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) { if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) {
code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, pBData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg); code = tsdbWriteSttBlock(pWriter->pDataFWriter, pBData, pWriter->aSttBlk, pWriter->cmprAlg);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
pBData->suid = 0; pBData->suid = 0;
pBData->uid = 0; pBData->uid = 0;
...@@ -1027,105 +1413,263 @@ static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) { ...@@ -1027,105 +1413,263 @@ static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) {
if (pBData->suid == 0 && pBData->uid == 0) { if (pBData->suid == 0 && pBData->uid == 0) {
code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pWriter->id.suid, pWriter->id.uid, &pWriter->skmTable); code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pWriter->id.suid, pWriter->id.uid, &pWriter->skmTable);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
TABLEID tid = {.suid = pWriter->id.suid, .uid = pWriter->id.suid ? 0 : pWriter->id.uid}; TABLEID tid = {.suid = pWriter->id.suid, .uid = pWriter->id.suid ? 0 : pWriter->id.uid};
code = tBlockDataInit(pBData, &tid, pWriter->skmTable.pTSchema, NULL, 0); code = tBlockDataInit(pBData, &tid, pWriter->skmTable.pTSchema, NULL, 0);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tBlockDataAppendRow(pBData, &row, NULL, id.uid); code = tBlockDataAppendRow(pBData, &row, NULL, id.uid);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
if (pBData->nRow >= pWriter->maxRow) { if (pBData->nRow >= pWriter->maxRow) {
code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, pBData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg); code = tsdbWriteSttBlock(pWriter->pDataFWriter, pBData, pWriter->aSttBlk, pWriter->cmprAlg);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
} }
#endif
_exit: _exit:
return code; if (code) {
tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code));
_err: }
return code; return code;
} }
static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) { static int32_t tsdbSnapWriteNextRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInfo) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SBlockData* pBlockData = &pWriter->bData; if (pWriter->pIter) {
TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]}; code = tsdbDataIterNext2(pWriter->pIter);
TSDB_CHECK_CODE(code, lino, _exit);
// End last table data write if need if (pWriter->pIter->rowInfo.suid == 0 && pWriter->pIter->rowInfo.uid == 0) {
if (tTABLEIDCmprFn(&pWriter->id, &id) != 0) { pWriter->pIter = NULL;
code = tsdbSnapWriteTableDataEnd(pWriter); } else {
if (code) goto _err; SRBTreeNode* pNode = tRBTreeMin(&pWriter->rbt);
if (pNode) {
int32_t c = tsdbDataIterCmprFn(&pWriter->pIter->rbtn, pNode);
if (c > 0) {
tRBTreePut(&pWriter->rbt, &pWriter->pIter->rbtn);
pWriter->pIter = NULL;
} else if (c == 0) {
ASSERT(0);
}
}
}
} }
// Start new table data write if need if (pWriter->pIter == NULL) {
if (pWriter->id.suid == 0 && pWriter->id.uid == 0) { SRBTreeNode* pNode = tRBTreeMin(&pWriter->rbt);
code = tsdbSnapWriteTableDataStart(pWriter, &id); if (pNode) {
if (code) goto _err; tRBTreeDrop(&pWriter->rbt, pNode);
pWriter->pIter = TSDB_RBTN_TO_DATA_ITER(pNode);
}
} }
// Merge with .data file data if (ppRowInfo) {
int8_t done = 0; if (pWriter->pIter) {
if (pWriter->dReader.pBlockIdx && tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &id) == 0) { *ppRowInfo = &pWriter->pIter->rowInfo;
code = tsdbSnapWriteToDataFile(pWriter, iRow, &done); } else {
if (code) goto _err; *ppRowInfo = NULL;
}
} }
// Append to the .stt data block (todo: check if need to set/reload sst block) _exit:
if (!done) { if (code) {
code = tsdbSnapWriteToSttFile(pWriter, iRow); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
if (code) goto _err;
} }
return code;
}
static int32_t tsdbSnapWriteGetRow(STsdbSnapWriter* pWriter, SRowInfo** ppRowInfo) {
int32_t code = 0;
int32_t lino = 0;
if (pWriter->pIter) {
*ppRowInfo = &pWriter->pIter->rowInfo;
goto _exit;
}
code = tsdbSnapWriteNextRow(pWriter, ppRowInfo);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code; return code;
}
_err: static int32_t tsdbSnapWriteRowImpl(STsdbSnapWriter* pWriter, SRowInfo* pRowInfo) {
tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); int32_t code = 0;
int32_t lino = 0;
// TODO
ASSERT(0);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code; return code;
} }
static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb; int32_t lino = 0;
SBlockData* pBlockData = &pWriter->bData;
// Decode data code = tBlockDataAppendRow(&pWriter->bData, pRow, NULL, pWriter->tbid.uid);
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; TSDB_CHECK_CODE(code, lino, _exit);
code = tDecmprBlockData(pHdr->data, pHdr->size, pBlockData, pWriter->aBuf);
if (code) goto _err; if (pWriter->bData.nRow >= pWriter->maxRow) {
code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
ASSERT(pBlockData->nRow > 0); static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowInfo) {
int32_t code = 0;
int32_t lino = 0;
// Loop to handle each row // switch to new table if need
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { if (pRowInfo->uid != pWriter->tbid.uid) {
TSKEY ts = pBlockData->aTSKEY[iRow]; if (pRowInfo->uid) {
int32_t fid = tsdbKeyFid(ts, pWriter->minutes, pWriter->precision); code = tsdbSnapWriteTableDataEnd(pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (pWriter->dWriter.pWriter == NULL || pWriter->fid != fid) { code = tsdbSnapWriteTableDataStart(pWriter, (TABLEID*)pRowInfo);
if (pWriter->dWriter.pWriter) { TSDB_CHECK_CODE(code, lino, _exit);
// ASSERT(fid > pWriter->fid); }
code = tsdbSnapWriteCloseFile(pWriter); // do write the row
if (code) goto _err; if (pWriter->pDIter == NULL /* || false */) {
goto _write_incoming_row;
} else {
for (;;) {
while (pWriter->pDIter->dIter.iRow + 1 < pWriter->pDIter->dIter.bData.nRow) {
TSDBROW row = tsdbRowFromBlockData(&pWriter->pDIter->dIter.bData, pWriter->pDIter->dIter.iRow + 1);
int32_t c = tsdbRowCmprFn(&pRowInfo->row, &row);
if (c < 0) {
goto _write_incoming_row;
} else if (c > 0) {
++pWriter->pDIter->dIter.iRow;
code = tsdbSnapWriteTableRow(pWriter, &row);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
ASSERT(0);
}
} }
code = tsdbSnapWriteOpenFile(pWriter, fid); while (pWriter->pDIter->dIter.iDataBlk < pWriter->pDIter->dIter.mDataBlk.nItem) {
if (code) goto _err; SDataBlk dataBlk;
}
code = tsdbSnapWriteRowData(pWriter, iRow); tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk + 1, &dataBlk,
if (code) goto _err; tGetDataBlk);
int32_t c = tDataBlkCmprFn(
&dataBlk, &(SDataBlk){.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)});
if (c > 0) {
goto _write_incoming_row;
} else if (c < 0) {
++pWriter->pDIter->dIter.iDataBlk;
tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk);
} else {
++pWriter->pDIter->dIter.iDataBlk;
code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData);
TSDB_CHECK_CODE(code, lino, _exit);
pWriter->pDIter->dIter.iRow = -1;
break;
}
}
}
} }
_write_incoming_row:
code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code; return code;
}
_err: static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
tsdbError("vgId:%d, vnode snapshot tsdb write data for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, int32_t code = 0;
tstrerror(code)); int32_t lino = 0;
code = tDecmprBlockData(pHdr->data, pHdr->size, &pWriter->inData, pWriter->aBuf);
TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(pWriter->inData.nRow > 0);
// switch to new data file if need
int32_t fid = tsdbKeyFid(pWriter->inData.aTSKEY[0], pWriter->minutes, pWriter->precision);
if (pWriter->fid != fid) {
if (pWriter->pDataFWriter) {
code = tsdbSnapWriteCloseDataFile(pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbSnapWriteOpenDataFile(pWriter, fid);
TSDB_CHECK_CODE(code, lino, _exit);
}
// loop write each row
SRowInfo* pRowInfo;
code = tsdbSnapWriteGetRow(pWriter, &pRowInfo);
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t iRow = 0; iRow < pWriter->bData.nRow; ++iRow) {
SRowInfo rInfo = {.suid = pWriter->inData.suid,
.uid = pWriter->inData.uid ? pWriter->inData.uid : pWriter->inData.aUid[iRow],
.row = tsdbRowFromBlockData(&pWriter->inData, iRow)};
for (;;) {
if (pRowInfo == NULL) {
code = tsdbSnapWriteTableData(pWriter, &rInfo);
TSDB_CHECK_CODE(code, lino, _exit);
break;
} else {
int32_t c = tRowInfoCmprFn(&rInfo, pRowInfo);
if (c < 0) {
code = tsdbSnapWriteTableData(pWriter, &rInfo);
TSDB_CHECK_CODE(code, lino, _exit);
break;
} else if (c > 0) {
code = tsdbSnapWriteTableData(pWriter, pRowInfo);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbSnapWriteNextRow(pWriter, &pRowInfo);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
ASSERT(0);
}
}
}
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbDebug("vgId:%d %s done, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(pWriter->pTsdb->pVnode), __func__,
pWriter->inData.suid, pWriter->inData.uid, pWriter->inData.nRow);
}
return code; return code;
} }
...@@ -1159,7 +1703,7 @@ _exit: ...@@ -1159,7 +1703,7 @@ _exit:
return code; return code;
} }
static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { static int32_t tsdbSnapWriteDelData(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
int32_t code = 0; int32_t code = 0;
STsdb* pTsdb = pWriter->pTsdb; STsdb* pTsdb = pWriter->pTsdb;
...@@ -1186,10 +1730,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 ...@@ -1186,10 +1730,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
taosArrayClear(pWriter->aDelIdxW); taosArrayClear(pWriter->aDelIdxW);
} }
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; TABLEID id = *(TABLEID*)pHdr->data;
TABLEID id = *(TABLEID*)pHdr->data;
ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData);
// Move write data < id // Move write data < id
code = tsdbSnapMoveWriteDelData(pWriter, &id); code = tsdbSnapMoveWriteDelData(pWriter, &id);
...@@ -1208,11 +1749,11 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 ...@@ -1208,11 +1749,11 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
taosArrayClear(pWriter->aDelData); taosArrayClear(pWriter->aDelData);
} }
int64_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID); int64_t n = sizeof(TABLEID);
while (n < nData) { while (n < pHdr->size) {
SDelData delData; SDelData delData;
n += tGetDelData(pData + n, &delData); n += tGetDelData(pHdr->data + n, &delData);
if (taosArrayPush(pWriter->aDelData, &delData) == NULL) { if (taosArrayPush(pWriter->aDelData, &delData) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -1275,12 +1816,11 @@ _err: ...@@ -1275,12 +1816,11 @@ _err:
// APIs // APIs
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) { int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
STsdbSnapWriter* pWriter = NULL;
// alloc // alloc
pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); STsdbSnapWriter* pWriter = (STsdbSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) { if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -1288,11 +1828,6 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1288,11 +1828,6 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
pWriter->pTsdb = pTsdb; pWriter->pTsdb = pTsdb;
pWriter->sver = sver; pWriter->sver = sver;
pWriter->ever = ever; pWriter->ever = ever;
code = tsdbFSCopy(pTsdb, &pWriter->fs);
TSDB_CHECK_CODE(code, lino, _exit);
// config
pWriter->minutes = pTsdb->keepCfg.days; pWriter->minutes = pTsdb->keepCfg.days;
pWriter->precision = pTsdb->keepCfg.precision; pWriter->precision = pTsdb->keepCfg.precision;
pWriter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; pWriter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
...@@ -1300,7 +1835,19 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1300,7 +1835,19 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
pWriter->commitID = pTsdb->pVnode->state.commitID; pWriter->commitID = pTsdb->pVnode->state.commitID;
code = tsdbFSCopy(pTsdb, &pWriter->fs);
TSDB_CHECK_CODE(code, lino, _exit);
// SNAP_DATA_TSDB // SNAP_DATA_TSDB
#if 1
pWriter->fid = INT32_MIN;
code = tBlockDataCreate(&pWriter->inData);
TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataCreate(&pWriter->bData);
TSDB_CHECK_CODE(code, lino, _exit);
#else
code = tBlockDataCreate(&pWriter->bData); code = tBlockDataCreate(&pWriter->bData);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -1330,6 +1877,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1330,6 +1877,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataCreate(&pWriter->dWriter.sData); code = tBlockDataCreate(&pWriter->dWriter.sData);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
#endif
// SNAP_DATA_DEL // SNAP_DATA_DEL
pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx)); pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx));
...@@ -1354,6 +1902,7 @@ _exit: ...@@ -1354,6 +1902,7 @@ _exit:
*ppWriter = NULL; *ppWriter = NULL;
if (pWriter) { if (pWriter) {
#if 0
if (pWriter->aDelIdxW) taosArrayDestroy(pWriter->aDelIdxW); if (pWriter->aDelIdxW) taosArrayDestroy(pWriter->aDelIdxW);
if (pWriter->aDelData) taosArrayDestroy(pWriter->aDelData); if (pWriter->aDelData) taosArrayDestroy(pWriter->aDelData);
if (pWriter->aDelIdxR) taosArrayDestroy(pWriter->aDelIdxR); if (pWriter->aDelIdxR) taosArrayDestroy(pWriter->aDelIdxR);
...@@ -1366,6 +1915,7 @@ _exit: ...@@ -1366,6 +1915,7 @@ _exit:
tBlockDataDestroy(&pWriter->bData, 1); tBlockDataDestroy(&pWriter->bData, 1);
tsdbFSDestroy(&pWriter->fs); tsdbFSDestroy(&pWriter->fs);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
#endif
} }
} else { } else {
tsdbInfo("vgId:%d, %s done", TD_VID(pTsdb->pVnode), __func__); tsdbInfo("vgId:%d, %s done", TD_VID(pTsdb->pVnode), __func__);
...@@ -1376,8 +1926,8 @@ _exit: ...@@ -1376,8 +1926,8 @@ _exit:
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) { int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) {
int32_t code = 0; int32_t code = 0;
if (pWriter->dWriter.pWriter) { if (pWriter->pDataFWriter) {
code = tsdbSnapWriteCloseFile(pWriter); code = tsdbSnapWriteCloseDataFile(pWriter);
if (code) goto _exit; if (code) goto _exit;
} }
...@@ -1422,17 +1972,17 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { ...@@ -1422,17 +1972,17 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
// SNAP_DATA_TSDB // SNAP_DATA_TSDB
// Writer // // Writer
tBlockDataDestroy(&pWriter->dWriter.sData, 1); // tBlockDataDestroy(&pWriter->dWriter.sData, 1);
tBlockDataDestroy(&pWriter->dWriter.bData, 1); // tBlockDataDestroy(&pWriter->dWriter.bData, 1);
taosArrayDestroy(pWriter->dWriter.aSttBlk); // taosArrayDestroy(pWriter->dWriter.aSttBlk);
tMapDataClear(&pWriter->dWriter.mDataBlk); // tMapDataClear(&pWriter->dWriter.mDataBlk);
taosArrayDestroy(pWriter->dWriter.aBlockIdx); // taosArrayDestroy(pWriter->dWriter.aBlockIdx);
// Reader // // Reader
tBlockDataDestroy(&pWriter->dReader.bData, 1); // tBlockDataDestroy(&pWriter->dReader.bData, 1);
tMapDataClear(&pWriter->dReader.mDataBlk); // tMapDataClear(&pWriter->dReader.mDataBlk);
taosArrayDestroy(pWriter->dReader.aBlockIdx); // taosArrayDestroy(pWriter->dReader.aBlockIdx);
tBlockDataDestroy(&pWriter->bData, 1); tBlockDataDestroy(&pWriter->bData, 1);
tDestroyTSchema(pWriter->skmTable.pTSchema); tDestroyTSchema(pWriter->skmTable.pTSchema);
...@@ -1453,35 +2003,32 @@ _err: ...@@ -1453,35 +2003,32 @@ _err:
return code; return code;
} }
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr) {
int32_t code = 0; int32_t code = 0;
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; int32_t lino = 0;
// ts data
if (pHdr->type == SNAP_DATA_TSDB) { if (pHdr->type == SNAP_DATA_TSDB) {
code = tsdbSnapWriteData(pWriter, pData, nData); code = tsdbSnapWriteTimeSeriesData(pWriter, pHdr);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
goto _exit; goto _exit;
} else { } else if (pWriter->pDataFWriter) {
if (pWriter->dWriter.pWriter) { code = tsdbSnapWriteCloseDataFile(pWriter);
code = tsdbSnapWriteCloseFile(pWriter); TSDB_CHECK_CODE(code, lino, _exit);
if (code) goto _err;
}
} }
// del data
if (pHdr->type == SNAP_DATA_DEL) { if (pHdr->type == SNAP_DATA_DEL) {
code = tsdbSnapWriteDel(pWriter, pData, nData); code = tsdbSnapWriteDelData(pWriter, pHdr);
if (code) goto _err; TSDB_CHECK_CODE(code, lino, _exit);
goto _exit;
} }
_exit: _exit:
tsdbDebug("vgId:%d, tsdb snapshot write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); if (code) {
return code; tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64,
TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code), pHdr->type, pHdr->index, pHdr->size);
_err: } else {
tsdbError("vgId:%d, tsdb snapshot write for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path, tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), __func__,
tstrerror(code)); pHdr->type, pHdr->index, pHdr->size);
}
return code; return code;
} }
...@@ -684,7 +684,7 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo ...@@ -684,7 +684,7 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal); tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) { if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData; uint8_t *pVal = pColVal->value.pData;
pColVal->value.pData = NULL; pColVal->value.pData = NULL;
code = tRealloc(&pColVal->value.pData, pColVal->value.nData); code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
if (code) goto _exit; if (code) goto _exit;
...@@ -753,7 +753,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { ...@@ -753,7 +753,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
code = tRealloc(&tColVal->value.pData, pColVal->value.nData); code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
if (code) return code; if (code) return code;
tColVal->value.nData = pColVal->value.nData; tColVal->value.nData = pColVal->value.nData;
if (pColVal->value.nData) { if (pColVal->value.nData) {
memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData); memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData);
} }
...@@ -802,7 +802,7 @@ int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { ...@@ -802,7 +802,7 @@ int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) { if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData; uint8_t *pVal = pColVal->value.pData;
pColVal->value.pData = NULL; pColVal->value.pData = NULL;
code = tRealloc(&pColVal->value.pData, pColVal->value.nData); code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
if (code) goto _exit; if (code) goto _exit;
...@@ -811,7 +811,7 @@ int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { ...@@ -811,7 +811,7 @@ int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
memcpy(pColVal->value.pData, pVal, pColVal->value.nData); memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
} }
} }
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
...@@ -822,7 +822,7 @@ _exit: ...@@ -822,7 +822,7 @@ _exit:
return code; return code;
} }
void tRowMergerClear(SRowMerger *pMerger) { void tRowMergerClear(SRowMerger *pMerger) {
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) { for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol); SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (IS_VAR_DATA_TYPE(pTColVal->type)) { if (IS_VAR_DATA_TYPE(pTColVal->type)) {
...@@ -830,7 +830,7 @@ void tRowMergerClear(SRowMerger *pMerger) { ...@@ -830,7 +830,7 @@ void tRowMergerClear(SRowMerger *pMerger) {
} }
} }
taosArrayDestroy(pMerger->pArray); taosArrayDestroy(pMerger->pArray);
} }
int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
...@@ -853,7 +853,7 @@ int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { ...@@ -853,7 +853,7 @@ int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
pTColVal->value.nData = pColVal->value.nData; pTColVal->value.nData = pColVal->value.nData;
if (pTColVal->value.nData) { if (pTColVal->value.nData) {
memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData); memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
} }
pTColVal->flag = 0; pTColVal->flag = 0;
} else { } else {
...@@ -875,7 +875,7 @@ int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { ...@@ -875,7 +875,7 @@ int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
tColVal->value.nData = pColVal->value.nData; tColVal->value.nData = pColVal->value.nData;
if (tColVal->value.nData) { if (tColVal->value.nData) {
memcpy(tColVal->value.pData, pColVal->value.pData, tColVal->value.nData); memcpy(tColVal->value.pData, pColVal->value.pData, tColVal->value.nData);
} }
tColVal->flag = 0; tColVal->flag = 0;
} else { } else {
......
...@@ -455,7 +455,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { ...@@ -455,7 +455,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
if (code) goto _err; if (code) goto _err;
} }
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pData, nData); code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pHdr);
if (code) goto _err; if (code) goto _err;
} break; } break;
case SNAP_DATA_TQ_HANDLE: { case SNAP_DATA_TQ_HANDLE: {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册