提交 8ae2ab1c 编写于 作者: H Hongze Cheng

more work

上级 33c3f34f
......@@ -138,22 +138,23 @@ int32_t tGetColData(uint8_t *p, SColData *pColData);
#define tBlockDataFirstKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataFirstRow(PBLOCKDATA))
#define tBlockDataLastKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataLastRow(PBLOCKDATA))
int32_t tBlockDataCreate(SBlockData *pBlockData);
void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear);
void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema, int64_t suid, int64_t uid);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
void tBlockDataClearData(SBlockData *pBlockData);
int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest);
int32_t tBlockDataCreate(SBlockData *pBlockData);
void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear);
int32_t tBlockDataInit(SBlockData *pBlockData, int64_t suid, int64_t uid, STSchema *pTSchema);
int32_t tBlockDataInitEx(SBlockData *pBlockData, int64_t *suid, int64_t uid, SArray *aColId);
void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
void tBlockDataClear(SBlockData *pBlockData);
SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx);
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData);
int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData);
int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData);
#if 1
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest);
int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData);
int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData);
#endif
// SDiskDataHdr
int32_t tPutDiskDataHdr(uint8_t *p, void *ph);
int32_t tGetDiskDataHdr(uint8_t *p, void *ph);
......@@ -190,7 +191,7 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol
uint8_t **ppBuf);
int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData,
uint8_t **ppBuf);
int32_t tsdbReadAndCheckFile(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size);
int32_t tsdbReadAndCheckFile(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck);
// tsdbMemTable ==============================================================================================
// SMemTable
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
......
......@@ -539,7 +539,7 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) {
if (code) goto _err;
// clear
tBlockDataClearData(pBlockData);
tBlockDataClear(pBlockData);
return code;
......@@ -578,7 +578,7 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
}
// clear
tBlockDataClearData(pBlockData);
tBlockDataClear(pBlockData);
return code;
......@@ -596,7 +596,7 @@ static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter, S
code = tsdbReadDataBlock(pCommitter->dReader.pReader, pBlock, pBlockDataR, NULL, 0);
if (code) goto _err;
tBlockDataClearData(pBlockDataW);
tBlockDataClear(pBlockDataW);
int32_t iRow = 0;
TSDBROW row;
TSDBROW *pRow1 = tsdbTbDataIterGet(pIter);
......@@ -672,7 +672,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
STbData *pTbData = pIter->pTbData;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
tBlockDataClearData(pBlockData);
tBlockDataClear(pBlockData);
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
while (true) {
if (pRow == NULL) {
......@@ -741,7 +741,7 @@ static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, S
STbData *pTbData = pIter->pTbData;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
tBlockDataClearData(pBlockData);
tBlockDataClear(pBlockData);
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
while (true) {
if (pRow == NULL) break;
......
......@@ -1748,7 +1748,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
tBlockDataReset(&pStatus->fileBlockData);
tBlockDataClearData(&pStatus->fileBlockData);
tBlockDataClear(&pStatus->fileBlockData);
code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -2208,7 +2208,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
// 3. load the neighbor block, and set it to be the currently accessed file data block
tBlockDataReset(&pStatus->fileBlockData);
tBlockDataClearData(&pStatus->fileBlockData);
tBlockDataClear(&pStatus->fileBlockData);
int32_t code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -2871,7 +2871,7 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
tBlockDataReset(&pStatus->fileBlockData);
tBlockDataClearData(&pStatus->fileBlockData);
tBlockDataClear(&pStatus->fileBlockData);
int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
tBlockDataDestroy(&pStatus->fileBlockData, 1);
......
......@@ -731,13 +731,14 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
int32_t nColId, SBlockData *pBlockData) {
int32_t code = 0;
// TODO
tBlockDataReset(pBlockData);
ASSERT(pBlockData->suid || pBlockData->uid);
tBlockDataClear(pBlockData);
TdFilePtr pFD = fromLast ? pReader->pLastFD : pReader->pDataFD;
// uid + version + tskey
code = tsdbReadAndCheckFile(pFD, pBlkInfo->offset, &pReader->pBuf1, pBlkInfo->szKey);
code = tsdbReadAndCheckFile(pFD, pBlkInfo->offset, &pReader->pBuf1, pBlkInfo->szKey, 1);
if (code) goto _err;
SDiskDataHdr hdr;
uint8_t *p = pReader->pBuf1 + tGetDiskDataHdr(pReader->pBuf1, &hdr);
......@@ -776,8 +777,8 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
// read and decode columns
if (hdr.szBlkCol > 0) {
code =
tsdbReadAndCheckFile(pFD, pBlkInfo->offset + pBlkInfo->szKey, &pReader->pBuf1, hdr.szBlkCol + sizeof(TSCKSUM));
code = tsdbReadAndCheckFile(pFD, pBlkInfo->offset + pBlkInfo->szKey, &pReader->pBuf1,
hdr.szBlkCol + sizeof(TSCKSUM), 1);
if (code) goto _err;
int32_t n = 0;
......@@ -797,7 +798,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
} else {
code = tsdbReadAndCheckFile(
pFD, pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + sizeof(TSCKSUM) + blockCol.offset, &pReader->pBuf2,
blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM));
blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM), 1);
code = tsdbDecmprColData(pReader->pBuf2, &blockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->pBuf3);
if (code) goto _err;
......
......@@ -482,7 +482,7 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
if (code) goto _err;
tBlockReset(&pWriter->blockW);
tBlockDataClearData(&pWriter->bDataW);
tBlockDataClear(&pWriter->bDataW);
}
pWriter->iRow++;
......@@ -675,7 +675,7 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
if (code) goto _err;
tBlockReset(&pWriter->blockW);
tBlockDataClearData(&pWriter->bDataW);
tBlockDataClear(&pWriter->bDataW);
}
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
......@@ -725,7 +725,7 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
if (code) goto _err;
tBlockReset(&pWriter->blockW);
tBlockDataClearData(&pWriter->bDataW);
tBlockDataClear(&pWriter->bDataW);
}
return code;
......
......@@ -1145,31 +1145,54 @@ void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear) {
pBlockData->aColData = NULL;
}
void tBlockDataReset(SBlockData *pBlockData) {
pBlockData->suid = 0;
pBlockData->uid = 0;
int32_t tBlockDataInit(SBlockData *pBlockData, int64_t suid, int64_t uid, STSchema *pTSchema) {
int32_t code = 0;
ASSERT(suid || uid);
pBlockData->suid = suid;
pBlockData->uid = uid;
pBlockData->nRow = 0;
taosArrayClear(pBlockData->aIdx);
for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) {
STColumn *pTColumn = &pTSchema->columns[iColumn];
SColData *pColData;
code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData);
if (code) goto _exit;
tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0);
}
_exit:
return code;
}
int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema, int64_t suid, int64_t uid) {
int32_t tBlockDataInitEx(SBlockData *pBlockData, int64_t *suid, int64_t uid, SArray *aColId) {
int32_t code = 0;
ASSERT(suid || uid);
tBlockDataReset(pBlockData);
pBlockData->suid = suid;
pBlockData->uid = uid;
pBlockData->nRow = 0;
if (pTSchema) {
for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) {
STColumn *pTColumn = &pTSchema->columns[iColumn];
SColData *pColData;
taosArrayClear(pBlockData->aIdx);
if (aColId) {
int16_t lcid = -1;
for (int32_t iColId = 0; iColId < taosArrayGetSize(aColId); iColId++) {
int16_t cid = *(int16_t *)taosArrayGet(aColId, iColId);
code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData);
ASSERT(cid != PRIMARYKEY_TIMESTAMP_COL_ID);
ASSERT(cid > lcid);
lcid = cid;
SColData *pColData;
code = tBlockDataAddColData(pBlockData, iColId, &pColData);
if (code) goto _exit;
tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) != 0);
tColDataInit(pColData, cid, TSDB_DATA_TYPE_NULL, -1);
}
}
......@@ -1177,7 +1200,14 @@ _exit:
return code;
}
void tBlockDataClearData(SBlockData *pBlockData) {
void tBlockDataReset(SBlockData *pBlockData) {
pBlockData->suid = 0;
pBlockData->uid = 0;
pBlockData->nRow = 0;
taosArrayClear(pBlockData->aIdx);
}
void tBlockDataClear(SBlockData *pBlockData) {
pBlockData->nRow = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
......@@ -1869,7 +1899,7 @@ _exit:
return code;
}
int32_t tsdbReadAndCheckFile(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size) {
int32_t tsdbReadAndCheckFile(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck) {
int32_t code = 0;
// alloc
......@@ -1894,7 +1924,7 @@ int32_t tsdbReadAndCheckFile(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int
}
// check
if (!taosCheckChecksumWhole(*ppOut, size)) {
if (toCheck && !taosCheckChecksumWhole(*ppOut, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _exit;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册