/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "tsdb.h" typedef struct SDiskColBuilder SDiskColBuilder; struct SDiskColBuilder { int16_t cid; int8_t type; uint8_t cmprAlg; uint8_t calcSma; int8_t flag; int32_t nVal; uint8_t *pBitMap; int32_t offset; SCompressor *pOffC; SCompressor *pValC; SColumnDataAgg sma; uint8_t minSet; uint8_t maxSet; uint8_t *aBuf[2]; }; // SDiskData ================================================ static int32_t tDiskDataDestroy(SDiskData *pDiskData) { int32_t code = 0; pDiskData->aDiskCol = taosArrayDestroy(pDiskData->aDiskCol); return code; } // SDiskColBuilder ================================================ #define tDiskColBuilderCreate() \ (SDiskColBuilder) { 0 } static int32_t tDiskColBuilderDestroy(SDiskColBuilder *pBuilder) { int32_t code = 0; tFree(pBuilder->pBitMap); if (pBuilder->pOffC) tCompressorDestroy(pBuilder->pOffC); if (pBuilder->pValC) tCompressorDestroy(pBuilder->pValC); for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) { tFree(pBuilder->aBuf[iBuf]); } return code; } static int32_t tDiskColBuilderInit(SDiskColBuilder *pBuilder, int16_t cid, int8_t type, uint8_t cmprAlg, uint8_t calcSma) { int32_t code = 0; pBuilder->cid = cid; pBuilder->type = type; pBuilder->cmprAlg = cmprAlg; pBuilder->calcSma = IS_VAR_DATA_TYPE(type) ? 0 : calcSma; pBuilder->flag = 0; pBuilder->nVal = 0; pBuilder->offset = 0; if (IS_VAR_DATA_TYPE(type)) { if (pBuilder->pOffC == NULL && (code = tCompressorCreate(&pBuilder->pOffC))) return code; code = tCompressStart(pBuilder->pOffC, TSDB_DATA_TYPE_INT, cmprAlg); if (code) return code; } if (pBuilder->pValC == NULL && (code = tCompressorCreate(&pBuilder->pValC))) return code; code = tCompressStart(pBuilder->pValC, type, cmprAlg); if (code) return code; if (pBuilder->calcSma) { pBuilder->sma = (SColumnDataAgg){.colId = cid}; pBuilder->minSet = 0; pBuilder->maxSet = 0; } return code; } static int32_t tGnrtDiskCol(SDiskColBuilder *pBuilder, SDiskCol *pDiskCol) { int32_t code = 0; ASSERT(pBuilder->flag && pBuilder->flag != HAS_NONE); *pDiskCol = (SDiskCol){(SBlockCol){.cid = pBuilder->cid, .type = pBuilder->type, .smaOn = pBuilder->calcSma, .flag = pBuilder->flag, .szOrigin = 0, .szBitmap = 0, .szOffset = 0, .szValue = 0, .offset = 0}, .pBit = NULL, .pOff = NULL, .pVal = NULL, .agg = pBuilder->sma}; if (pBuilder->flag == HAS_NULL) return code; // BITMAP if (pBuilder->flag != HAS_VALUE) { int32_t nBit; if (pBuilder->flag == (HAS_VALUE | HAS_NULL | HAS_NONE)) { nBit = BIT2_SIZE(pBuilder->nVal); } else { nBit = BIT1_SIZE(pBuilder->nVal); } code = tRealloc(&pBuilder->aBuf[0], nBit + COMP_OVERFLOW_BYTES); if (code) return code; code = tRealloc(&pBuilder->aBuf[1], nBit + COMP_OVERFLOW_BYTES); if (code) return code; pDiskCol->bCol.szBitmap = tsCompressTinyint(pBuilder->pBitMap, nBit, nBit, pBuilder->aBuf[0], nBit + COMP_OVERFLOW_BYTES, pBuilder->cmprAlg, pBuilder->aBuf[1], nBit + COMP_OVERFLOW_BYTES); pDiskCol->pBit = pBuilder->aBuf[0]; } // OFFSET if (IS_VAR_DATA_TYPE(pBuilder->type)) { code = tCompressEnd(pBuilder->pOffC, &pDiskCol->pOff, &pDiskCol->bCol.szOffset, NULL); if (code) return code; } // VALUE if (pBuilder->flag != (HAS_NULL | HAS_NONE)) { code = tCompressEnd(pBuilder->pValC, &pDiskCol->pVal, &pDiskCol->bCol.szValue, &pDiskCol->bCol.szOrigin); if (code) return code; } return code; } static FORCE_INLINE int32_t tDiskColPutValue(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; if (IS_VAR_DATA_TYPE(pColVal->type)) { code = tCompress(pBuilder->pOffC, &pBuilder->offset, sizeof(int32_t)); if (code) return code; pBuilder->offset += pColVal->value.nData; code = tCompress(pBuilder->pValC, pColVal->value.pData, pColVal->value.nData); if (code) return code; } else { code = tCompress(pBuilder->pValC, &pColVal->value.val, tDataTypes[pColVal->type].bytes); if (code) return code; } return code; } static FORCE_INLINE int32_t tDiskColAddVal00(SDiskColBuilder *pBuilder, SColVal *pColVal) { pBuilder->flag = HAS_VALUE; return tDiskColPutValue(pBuilder, pColVal); } static FORCE_INLINE int32_t tDiskColAddVal01(SDiskColBuilder *pBuilder, SColVal *pColVal) { pBuilder->flag = HAS_NONE; return 0; } static FORCE_INLINE int32_t tDiskColAddVal02(SDiskColBuilder *pBuilder, SColVal *pColVal) { pBuilder->flag = HAS_NULL; return 0; } static FORCE_INLINE int32_t tDiskColAddVal10(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; // bit map int32_t nBit = BIT1_SIZE(pBuilder->nVal + 1); code = tRealloc(&pBuilder->pBitMap, nBit); if (code) return code; memset(pBuilder->pBitMap, 0, nBit); SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 1); // value pBuilder->flag |= HAS_VALUE; SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0}); for (int32_t iVal = 0; iVal < pBuilder->nVal; iVal++) { code = tDiskColPutValue(pBuilder, &cv); if (code) return code; } return tDiskColPutValue(pBuilder, pColVal); } static FORCE_INLINE int32_t tDiskColAddVal12(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; int32_t nBit = BIT1_SIZE(pBuilder->nVal + 1); code = tRealloc(&pBuilder->pBitMap, nBit); if (code) return code; memset(pBuilder->pBitMap, 0, nBit); SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 1); pBuilder->flag |= HAS_NULL; return code; } static FORCE_INLINE int32_t tDiskColAddVal20(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; int32_t nBit = BIT1_SIZE(pBuilder->nVal + 1); code = tRealloc(&pBuilder->pBitMap, nBit); if (code) return code; pBuilder->flag |= HAS_VALUE; memset(pBuilder->pBitMap, 0, nBit); SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 1); SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0}); for (int32_t iVal = 0; iVal < pBuilder->nVal; iVal++) { code = tDiskColPutValue(pBuilder, &cv); if (code) return code; } return tDiskColPutValue(pBuilder, pColVal); } static FORCE_INLINE int32_t tDiskColAddVal21(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; int32_t nBit = BIT1_SIZE(pBuilder->nVal + 1); code = tRealloc(&pBuilder->pBitMap, nBit); if (code) return code; pBuilder->flag |= HAS_NONE; memset(pBuilder->pBitMap, 255, nBit); SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 0); return code; } static FORCE_INLINE int32_t tDiskColAddVal30(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; pBuilder->flag |= HAS_VALUE; uint8_t *pBitMap = NULL; code = tRealloc(&pBitMap, BIT2_SIZE(pBuilder->nVal + 1)); if (code) return code; for (int32_t iVal = 0; iVal < pBuilder->nVal; iVal++) { SET_BIT2(pBitMap, iVal, GET_BIT1(pBuilder->pBitMap, iVal)); } SET_BIT2(pBitMap, pBuilder->nVal, 2); tFree(pBuilder->pBitMap); pBuilder->pBitMap = pBitMap; SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0}); for (int32_t iVal = 0; iVal < pBuilder->nVal; iVal++) { code = tDiskColPutValue(pBuilder, &cv); if (code) return code; } return tDiskColPutValue(pBuilder, pColVal); } static FORCE_INLINE int32_t tDiskColAddVal31(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; code = tRealloc(&pBuilder->pBitMap, BIT1_SIZE(pBuilder->nVal + 1)); if (code) return code; SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 0); return code; } static FORCE_INLINE int32_t tDiskColAddVal32(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; code = tRealloc(&pBuilder->pBitMap, BIT1_SIZE(pBuilder->nVal + 1)); if (code) return code; SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 1); return code; } static FORCE_INLINE int32_t tDiskColAddVal40(SDiskColBuilder *pBuilder, SColVal *pColVal) { return tDiskColPutValue(pBuilder, pColVal); } static FORCE_INLINE int32_t tDiskColAddVal41(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; pBuilder->flag |= HAS_NONE; int32_t nBit = BIT1_SIZE(pBuilder->nVal + 1); code = tRealloc(&pBuilder->pBitMap, nBit); if (code) return code; memset(pBuilder->pBitMap, 255, nBit); SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 0); return tDiskColPutValue(pBuilder, pColVal); } static FORCE_INLINE int32_t tDiskColAddVal42(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; pBuilder->flag |= HAS_NULL; int32_t nBit = BIT1_SIZE(pBuilder->nVal + 1); code = tRealloc(&pBuilder->pBitMap, nBit); if (code) return code; memset(pBuilder->pBitMap, 255, nBit); SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 0); return tDiskColPutValue(pBuilder, pColVal); } static FORCE_INLINE int32_t tDiskColAddVal50(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; code = tRealloc(&pBuilder->pBitMap, BIT1_SIZE(pBuilder->nVal + 1)); if (code) return code; SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 1); return tDiskColPutValue(pBuilder, pColVal); } static FORCE_INLINE int32_t tDiskColAddVal51(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; code = tRealloc(&pBuilder->pBitMap, BIT1_SIZE(pBuilder->nVal + 1)); if (code) return code; SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 0); return tDiskColPutValue(pBuilder, pColVal); } static FORCE_INLINE int32_t tDiskColAddVal52(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; pBuilder->flag |= HAS_NULL; uint8_t *pBitMap = NULL; code = tRealloc(&pBitMap, BIT2_SIZE(pBuilder->nVal + 1)); if (code) return code; for (int32_t iVal = 0; iVal < pBuilder->nVal; iVal++) { SET_BIT2(pBitMap, iVal, GET_BIT1(pBuilder->pBitMap, iVal) ? 2 : 0); } SET_BIT2(pBitMap, pBuilder->nVal, 1); tFree(pBuilder->pBitMap); pBuilder->pBitMap = pBitMap; return tDiskColPutValue(pBuilder, pColVal); } static FORCE_INLINE int32_t tDiskColAddVal60(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; code = tRealloc(&pBuilder->pBitMap, BIT1_SIZE(pBuilder->nVal + 1)); if (code) return code; SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 1); return tDiskColPutValue(pBuilder, pColVal); } static FORCE_INLINE int32_t tDiskColAddVal61(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; pBuilder->flag |= HAS_NONE; uint8_t *pBitMap = NULL; code = tRealloc(&pBitMap, BIT2_SIZE(pBuilder->nVal + 1)); if (code) return code; for (int32_t iVal = 0; iVal < pBuilder->nVal; iVal++) { SET_BIT2(pBitMap, iVal, GET_BIT1(pBuilder->pBitMap, iVal) ? 2 : 1); } SET_BIT2(pBitMap, pBuilder->nVal, 0); tFree(pBuilder->pBitMap); pBuilder->pBitMap = pBitMap; return tDiskColPutValue(pBuilder, pColVal); } static FORCE_INLINE int32_t tDiskColAddVal62(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; code = tRealloc(&pBuilder->pBitMap, BIT1_SIZE(pBuilder->nVal + 1)); if (code) return code; SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 0); return tDiskColPutValue(pBuilder, pColVal); } static FORCE_INLINE int32_t tDiskColAddVal70(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; code = tRealloc(&pBuilder->pBitMap, BIT2_SIZE(pBuilder->nVal + 1)); if (code) return code; SET_BIT2(pBuilder->pBitMap, pBuilder->nVal, 2); return tDiskColPutValue(pBuilder, pColVal); } static FORCE_INLINE int32_t tDiskColAddVal71(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; code = tRealloc(&pBuilder->pBitMap, BIT2_SIZE(pBuilder->nVal + 1)); if (code) return code; SET_BIT2(pBuilder->pBitMap, pBuilder->nVal, 0); return tDiskColPutValue(pBuilder, pColVal); } static FORCE_INLINE int32_t tDiskColAddVal72(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; code = tRealloc(&pBuilder->pBitMap, BIT2_SIZE(pBuilder->nVal + 1)); if (code) return code; SET_BIT2(pBuilder->pBitMap, pBuilder->nVal, 1); return tDiskColPutValue(pBuilder, pColVal); } static int32_t (*tDiskColAddValImpl[8][3])(SDiskColBuilder *pBuilder, SColVal *pColVal) = { {tDiskColAddVal00, tDiskColAddVal01, tDiskColAddVal02}, // 0 {tDiskColAddVal10, NULL, tDiskColAddVal12}, // HAS_NONE {tDiskColAddVal20, tDiskColAddVal21, NULL}, // HAS_NULL {tDiskColAddVal30, tDiskColAddVal31, tDiskColAddVal32}, // HAS_NULL|HAS_NONE {tDiskColAddVal40, tDiskColAddVal41, tDiskColAddVal42}, // HAS_VALUE {tDiskColAddVal50, tDiskColAddVal51, tDiskColAddVal52}, // HAS_VALUE|HAS_NONE {tDiskColAddVal60, tDiskColAddVal61, tDiskColAddVal62}, // HAS_VALUE|HAS_NULL {tDiskColAddVal70, tDiskColAddVal71, tDiskColAddVal72} // HAS_VALUE|HAS_NULL|HAS_NONE }; // extern void (*tSmaUpdateImpl[])(SColumnDataAgg *pColAgg, SColVal *pColVal, uint8_t *minSet, uint8_t *maxSet); static int32_t tDiskColAddVal(SDiskColBuilder *pBuilder, SColVal *pColVal) { int32_t code = 0; if (pBuilder->calcSma) { if (COL_VAL_IS_VALUE(pColVal)) { // tSmaUpdateImpl[pBuilder->type](&pBuilder->sma, pColVal, &pBuilder->minSet, &pBuilder->maxSet); } else { pBuilder->sma.numOfNull++; } } if (tDiskColAddValImpl[pBuilder->flag][pColVal->flag]) { code = tDiskColAddValImpl[pBuilder->flag][pColVal->flag](pBuilder, pColVal); if (code) return code; } pBuilder->nVal++; return code; } // SDiskDataBuilder ================================================ int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder) { int32_t code = 0; *ppBuilder = (SDiskDataBuilder *)taosMemoryCalloc(1, sizeof(SDiskDataBuilder)); if (*ppBuilder == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; return code; } return code; } void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder) { if (pBuilder == NULL) return NULL; if (pBuilder->pUidC) tCompressorDestroy(pBuilder->pUidC); if (pBuilder->pVerC) tCompressorDestroy(pBuilder->pVerC); if (pBuilder->pKeyC) tCompressorDestroy(pBuilder->pKeyC); if (pBuilder->aBuilder) { for (int32_t iBuilder = 0; iBuilder < taosArrayGetSize(pBuilder->aBuilder); iBuilder++) { SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iBuilder); tDiskColBuilderDestroy(pDCBuilder); } taosArrayDestroy(pBuilder->aBuilder); } for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) { tFree(pBuilder->aBuf[iBuf]); } tDiskDataDestroy(&pBuilder->dd); taosMemoryFree(pBuilder); return NULL; } int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg, uint8_t calcSma) { int32_t code = 0; ASSERT(pId->suid || pId->uid); pBuilder->suid = pId->suid; pBuilder->uid = pId->uid; pBuilder->nRow = 0; pBuilder->cmprAlg = cmprAlg; pBuilder->calcSma = calcSma; pBuilder->bi = (SBlkInfo){.minUid = INT64_MAX, .maxUid = INT64_MIN, .minKey = TSKEY_MAX, .maxKey = TSKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN, .minTKey = TSDBKEY_MAX, .maxTKey = TSDBKEY_MIN}; if (pBuilder->pUidC == NULL && (code = tCompressorCreate(&pBuilder->pUidC))) return code; code = tCompressStart(pBuilder->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg); if (code) return code; if (pBuilder->pVerC == NULL && (code = tCompressorCreate(&pBuilder->pVerC))) return code; code = tCompressStart(pBuilder->pVerC, TSDB_DATA_TYPE_BIGINT, cmprAlg); if (code) return code; if (pBuilder->pKeyC == NULL && (code = tCompressorCreate(&pBuilder->pKeyC))) return code; code = tCompressStart(pBuilder->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg); if (code) return code; if (pBuilder->aBuilder == NULL) { pBuilder->aBuilder = taosArrayInit(pTSchema->numOfCols - 1, sizeof(SDiskColBuilder)); if (pBuilder->aBuilder == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; return code; } } pBuilder->nBuilder = 0; for (int32_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) { STColumn *pTColumn = &pTSchema->columns[iCol]; if (pBuilder->nBuilder >= taosArrayGetSize(pBuilder->aBuilder)) { SDiskColBuilder dc = tDiskColBuilderCreate(); if (taosArrayPush(pBuilder->aBuilder, &dc) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; return code; } } SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, pBuilder->nBuilder); code = tDiskColBuilderInit(pDCBuilder, pTColumn->colId, pTColumn->type, cmprAlg, (calcSma && (pTColumn->flags & COL_SMA_ON))); if (code) return code; pBuilder->nBuilder++; } return code; } int32_t tDiskDataBuilderClear(SDiskDataBuilder *pBuilder) { int32_t code = 0; pBuilder->suid = 0; pBuilder->uid = 0; pBuilder->nRow = 0; return code; } int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId) { int32_t code = 0; ASSERT(pBuilder->suid || pBuilder->uid); ASSERT(pId->suid == pBuilder->suid); TSDBKEY kRow = TSDBROW_KEY(pRow); if (tsdbKeyCmprFn(&pBuilder->bi.minTKey, &kRow) > 0) pBuilder->bi.minTKey = kRow; if (tsdbKeyCmprFn(&pBuilder->bi.maxTKey, &kRow) < 0) pBuilder->bi.maxTKey = kRow; // uid if (pBuilder->uid && pBuilder->uid != pId->uid) { ASSERT(pBuilder->suid); for (int32_t iRow = 0; iRow < pBuilder->nRow; iRow++) { code = tCompress(pBuilder->pUidC, &pBuilder->uid, sizeof(int64_t)); if (code) return code; } pBuilder->uid = 0; } if (pBuilder->uid == 0) { code = tCompress(pBuilder->pUidC, &pId->uid, sizeof(int64_t)); if (code) return code; } if (pBuilder->bi.minUid > pId->uid) pBuilder->bi.minUid = pId->uid; if (pBuilder->bi.maxUid < pId->uid) pBuilder->bi.maxUid = pId->uid; // version code = tCompress(pBuilder->pVerC, &kRow.version, sizeof(int64_t)); if (code) return code; if (pBuilder->bi.minVer > kRow.version) pBuilder->bi.minVer = kRow.version; if (pBuilder->bi.maxVer < kRow.version) pBuilder->bi.maxVer = kRow.version; // TSKEY code = tCompress(pBuilder->pKeyC, &kRow.ts, sizeof(int64_t)); if (code) return code; if (pBuilder->bi.minKey > kRow.ts) pBuilder->bi.minKey = kRow.ts; if (pBuilder->bi.maxKey < kRow.ts) pBuilder->bi.maxKey = kRow.ts; SRowIter iter = {0}; tRowIterInit(&iter, pRow, pTSchema); SColVal *pColVal = tRowIterNext(&iter); for (int32_t iBuilder = 0; iBuilder < pBuilder->nBuilder; iBuilder++) { SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iBuilder); while (pColVal && pColVal->cid < pDCBuilder->cid) { pColVal = tRowIterNext(&iter); } if (pColVal && pColVal->cid == pDCBuilder->cid) { code = tDiskColAddVal(pDCBuilder, pColVal); if (code) return code; pColVal = tRowIterNext(&iter); } else { code = tDiskColAddVal(pDCBuilder, &COL_VAL_NONE(pDCBuilder->cid, pDCBuilder->type)); if (code) return code; } } pBuilder->nRow++; return code; } int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, const SDiskData **ppDiskData, const SBlkInfo **ppBlkInfo) { int32_t code = 0; ASSERT(pBuilder->nRow); *ppDiskData = NULL; *ppBlkInfo = NULL; SDiskData *pDiskData = &pBuilder->dd; // reset SDiskData pDiskData->hdr = (SDiskDataHdr){.delimiter = TSDB_FILE_DLMT, .fmtVer = 0, .suid = pBuilder->suid, .uid = pBuilder->uid, .szUid = 0, .szVer = 0, .szKey = 0, .szBlkCol = 0, .nRow = pBuilder->nRow, .cmprAlg = pBuilder->cmprAlg}; pDiskData->pUid = NULL; pDiskData->pVer = NULL; pDiskData->pKey = NULL; // UID if (pBuilder->uid == 0) { code = tCompressEnd(pBuilder->pUidC, &pDiskData->pUid, &pDiskData->hdr.szUid, NULL); if (code) return code; } // VERSION code = tCompressEnd(pBuilder->pVerC, &pDiskData->pVer, &pDiskData->hdr.szVer, NULL); if (code) return code; // TSKEY code = tCompressEnd(pBuilder->pKeyC, &pDiskData->pKey, &pDiskData->hdr.szKey, NULL); if (code) return code; // aDiskCol if (pDiskData->aDiskCol) { taosArrayClear(pDiskData->aDiskCol); } else { pDiskData->aDiskCol = taosArrayInit(pBuilder->nBuilder, sizeof(SDiskCol)); if (pDiskData->aDiskCol == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; return code; } } int32_t offset = 0; for (int32_t iBuilder = 0; iBuilder < pBuilder->nBuilder; iBuilder++) { SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iBuilder); if (pDCBuilder->flag == HAS_NONE) continue; SDiskCol dCol; code = tGnrtDiskCol(pDCBuilder, &dCol); if (code) return code; dCol.bCol.offset = offset; offset = offset + dCol.bCol.szBitmap + dCol.bCol.szOffset + dCol.bCol.szValue; if (taosArrayPush(pDiskData->aDiskCol, &dCol) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; return code; } pDiskData->hdr.szBlkCol += tPutBlockCol(NULL, &dCol.bCol); } *ppDiskData = pDiskData; *ppBlkInfo = &pBuilder->bi; return code; }