提交 4bfdfe89 编写于 作者: H Hongze Cheng

adjust api

上级 f16029ad
...@@ -150,7 +150,7 @@ int32_t tCmprBlockL(void const *lhs, void const *rhs); ...@@ -150,7 +150,7 @@ int32_t tCmprBlockL(void const *lhs, void const *rhs);
int32_t tBlockDataCreate(SBlockData *pBlockData); int32_t tBlockDataCreate(SBlockData *pBlockData);
void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear); void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear);
int32_t tBlockDataInit(SBlockData *pBlockData, int64_t suid, int64_t uid, STSchema *pTSchema); int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid);
int32_t tBlockDataInitEx(SBlockData *pBlockData, SBlockData *pBlockDataFrom); int32_t tBlockDataInitEx(SBlockData *pBlockData, SBlockData *pBlockDataFrom);
void tBlockDataReset(SBlockData *pBlockData); void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
......
...@@ -612,7 +612,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { ...@@ -612,7 +612,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk); tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk);
/* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); */ /* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); */
tBlockDataReset(state->pBlockData); tBlockDataReset(state->pBlockData);
code = tBlockDataInit(state->pBlockData, state->suid, state->uid, state->pTSchema); TABLEID tid = {.suid = state->suid, .uid = state->uid};
code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, NULL, 0);
if (code) goto _err; if (code) goto _err;
code = tsdbReadDataBlock(state->pDataFReader, &block, state->pBlockData); code = tsdbReadDataBlock(state->pDataFReader, &block, state->pBlockData);
......
...@@ -1305,7 +1305,8 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) { ...@@ -1305,7 +1305,8 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) {
if (!pBDatal->suid && !pBDatal->uid) { if (!pBDatal->suid && !pBDatal->uid) {
ASSERT(pCommitter->skmTable.suid == id.suid); ASSERT(pCommitter->skmTable.suid == id.suid);
ASSERT(pCommitter->skmTable.uid == id.uid); ASSERT(pCommitter->skmTable.uid == id.uid);
code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema); TABLEID tid = {.suid = id.suid, .uid = id.suid ? 0 : id.uid};
code = tBlockDataInit(pBDatal, &tid, pCommitter->skmTable.pTSchema, NULL, 0);
if (code) goto _exit; if (code) goto _exit;
} }
...@@ -1428,9 +1429,9 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { ...@@ -1428,9 +1429,9 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
// impl // impl
code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable); code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable);
if (code) goto _err; if (code) goto _err;
code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema); code = tBlockDataInit(&pCommitter->dReader.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0);
if (code) goto _err; if (code) goto _err;
code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema); code = tBlockDataInit(&pCommitter->dWriter.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0);
if (code) goto _err; if (code) goto _err;
/* merge with data in .data file */ /* merge with data in .data file */
......
...@@ -1741,7 +1741,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1741,7 +1741,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tsLast = getCurrentKeyInLastBlock(pLastBlockReader); tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
} }
int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN; int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow); TSDBKEY ik = TSDBROW_KEY(piRow);
...@@ -2009,12 +2009,12 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { ...@@ -2009,12 +2009,12 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
} }
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; } static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
if (pBlockData->nRow > 0) { if (pBlockData->nRow > 0) {
ASSERT(pBlockData->nRow == pDumpInfo->totalRows); ASSERT(pBlockData->nRow == pDumpInfo->totalRows);
} }
return pBlockData->nRow > 0 && (!pDumpInfo->allDumped); return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
} }
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
...@@ -2452,7 +2452,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -2452,7 +2452,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
code = buildComposedDataBlock(pReader); code = buildComposedDataBlock(pReader);
} else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) { } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
tBlockDataReset(&pStatus->fileBlockData); tBlockDataReset(&pStatus->fileBlockData);
code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema); TABLEID tid = {.suid = pReader->suid, .uid = pScanInfo->uid};
code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2932,7 +2933,8 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn ...@@ -2932,7 +2933,8 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
// 3. load the neighbor block, and set it to be the currently accessed file data block // 3. load the neighbor block, and set it to be the currently accessed file data block
tBlockDataReset(&pStatus->fileBlockData); tBlockDataReset(&pStatus->fileBlockData);
int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pFBlock->uid, pReader->pSchema); TABLEID tid = {.suid = pReader->suid, .uid = pFBlock->uid};
int32_t code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -3684,7 +3686,8 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { ...@@ -3684,7 +3686,8 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
tBlockDataReset(&pStatus->fileBlockData); tBlockDataReset(&pStatus->fileBlockData);
int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pBlockScanInfo->uid, pReader->pSchema); TABLEID tid = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
int32_t code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
return NULL; return NULL;
......
...@@ -926,12 +926,13 @@ _err: ...@@ -926,12 +926,13 @@ _err:
return code; return code;
} }
static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, SBlockData *pBlockData) { static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, SBlockData *pBlockData,
int32_t iStt) {
int32_t code = 0; int32_t code = 0;
tBlockDataClear(pBlockData); tBlockDataClear(pBlockData);
STsdbFD *pFD = pReader->pDataFD; STsdbFD *pFD = (iStt < 0) ? pReader->pDataFD : pReader->aSttFD[iStt];
// uid + version + tskey // uid + version + tskey
code = tRealloc(&pReader->aBuf[0], pBlkInfo->szKey); code = tRealloc(&pReader->aBuf[0], pBlkInfo->szKey);
...@@ -1070,9 +1071,12 @@ _err: ...@@ -1070,9 +1071,12 @@ _err:
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) { int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
int32_t code = 0; int32_t code = 0;
code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], pBlockData); code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], pBlockData, -1);
if (code) goto _err; if (code) goto _err;
ASSERT(pDataBlk->nSubBlock == 1);
#if 0
if (pDataBlk->nSubBlock > 1) { if (pDataBlk->nSubBlock > 1) {
SBlockData bData1; SBlockData bData1;
SBlockData bData2; SBlockData bData2;
...@@ -1113,6 +1117,7 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData ...@@ -1113,6 +1117,7 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData
tBlockDataDestroy(&bData1, 1); tBlockDataDestroy(&bData1, 1);
tBlockDataDestroy(&bData2, 1); tBlockDataDestroy(&bData2, 1);
} }
#endif
return code; return code;
...@@ -1124,6 +1129,10 @@ _err: ...@@ -1124,6 +1129,10 @@ _err:
int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData) { int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData) {
int32_t code = 0; int32_t code = 0;
code = tsdbReadBlockDataImpl(pReader, &pSttBlk->bInfo, pBlockData, iStt);
if (code) goto _err;
#if 0
// alloc // alloc
code = tRealloc(&pReader->aBuf[0], pSttBlk->bInfo.szBlock); code = tRealloc(&pReader->aBuf[0], pSttBlk->bInfo.szBlock);
if (code) goto _err; if (code) goto _err;
...@@ -1136,6 +1145,7 @@ int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, ...@@ -1136,6 +1145,7 @@ int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk,
code = tDecmprBlockData(pReader->aBuf[0], pSttBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]); code = tDecmprBlockData(pReader->aBuf[0], pSttBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]);
if (code) goto _err; if (code) goto _err;
#endif
return code; return code;
_err: _err:
......
...@@ -319,7 +319,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { ...@@ -319,7 +319,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, id.suid, id.uid, &pReader->skmTable); code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, id.suid, id.uid, &pReader->skmTable);
if (code) goto _err; if (code) goto _err;
code = tBlockDataInit(pBlockData, id.suid, id.uid, pReader->skmTable.pTSchema); code = tBlockDataInit(pBlockData, &id, pReader->skmTable.pTSchema, NULL, 0);
if (code) goto _err; if (code) goto _err;
while (pRowInfo->suid == id.suid && pRowInfo->uid == id.uid) { while (pRowInfo->suid == id.suid && pRowInfo->uid == id.uid) {
...@@ -715,7 +715,7 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI ...@@ -715,7 +715,7 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI
if (code) goto _err; if (code) goto _err;
tMapDataReset(&pWriter->dWriter.mDataBlk); tMapDataReset(&pWriter->dWriter.mDataBlk);
code = tBlockDataInit(&pWriter->dWriter.bData, pId->suid, pId->uid, pWriter->skmTable.pTSchema); code = tBlockDataInit(&pWriter->dWriter.bData, pId, pWriter->skmTable.pTSchema, NULL, 0);
if (code) goto _err; if (code) goto _err;
return code; return code;
...@@ -1000,7 +1000,8 @@ static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) { ...@@ -1000,7 +1000,8 @@ static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) {
code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pWriter->id.suid, pWriter->id.uid, &pWriter->skmTable); code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pWriter->id.suid, pWriter->id.uid, &pWriter->skmTable);
if (code) goto _err; if (code) goto _err;
code = tBlockDataInit(pBData, pWriter->id.suid, pWriter->id.suid ? 0 : pWriter->id.uid, pWriter->skmTable.pTSchema); TABLEID tid = {.suid = pWriter->id.suid, .uid = pWriter->id.suid ? 0 : pWriter->id.uid};
code = tBlockDataInit(pBData, &tid, pWriter->skmTable.pTSchema, NULL, 0);
if (code) goto _err; if (code) goto _err;
} }
......
...@@ -948,24 +948,47 @@ void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear) { ...@@ -948,24 +948,47 @@ void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear) {
pBlockData->aColData = NULL; pBlockData->aColData = NULL;
} }
int32_t tBlockDataInit(SBlockData *pBlockData, int64_t suid, int64_t uid, STSchema *pTSchema) { int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid) {
int32_t code = 0; int32_t code = 0;
ASSERT(suid || uid); ASSERT(pId->suid || pId->uid);
pBlockData->suid = suid; pBlockData->suid = pId->suid;
pBlockData->uid = uid; pBlockData->uid = pId->uid;
pBlockData->nRow = 0; pBlockData->nRow = 0;
taosArrayClear(pBlockData->aIdx); taosArrayClear(pBlockData->aIdx);
for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) { if (aCid) {
int32_t iColumn = 1;
STColumn *pTColumn = &pTSchema->columns[iColumn]; STColumn *pTColumn = &pTSchema->columns[iColumn];
for (int32_t iCid = 0; iCid < nCid; iCid++) {
while (pTColumn && pTColumn->colId < aCid[iCid]) {
iColumn++;
pTColumn = (iColumn < pTSchema->numOfCols) ? &pTSchema->columns[iColumn] : NULL;
}
SColData *pColData; if (pTColumn == NULL) {
code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData); break;
if (code) goto _exit; } else if (pTColumn->colId == aCid[iCid]) {
SColData *pColData;
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData);
if (code) goto _exit;
tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0);
tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0); iColumn++;
pTColumn = (iColumn < pTSchema->numOfCols) ? &pTSchema->columns[iColumn] : NULL;
}
}
} else {
for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) {
STColumn *pTColumn = &pTSchema->columns[iColumn];
SColData *pColData;
code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData);
if (code) goto _exit;
tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0);
}
} }
_exit: _exit:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册