提交 16f55635 编写于 作者: H Hongze Cheng

more work

上级 6befe4ec
...@@ -45,7 +45,7 @@ typedef struct SBlockIdx SBlockIdx; ...@@ -45,7 +45,7 @@ typedef struct SBlockIdx SBlockIdx;
typedef struct SBlock SBlock; typedef struct SBlock SBlock;
typedef struct SBlockL SBlockL; typedef struct SBlockL SBlockL;
typedef struct SColData SColData; typedef struct SColData SColData;
typedef struct SBlockDataHdr SBlockDataHdr; typedef struct SDiskDataHdr SDiskDataHdr;
typedef struct SBlockData SBlockData; typedef struct SBlockData SBlockData;
typedef struct SDiskData SDiskData; typedef struct SDiskData SDiskData;
typedef struct SDelFile SDelFile; typedef struct SDelFile SDelFile;
...@@ -155,6 +155,9 @@ int32_t tDiskDataInit(SDiskData *pDiskData); ...@@ -155,6 +155,9 @@ int32_t tDiskDataInit(SDiskData *pDiskData);
void tDiskDataClear(SDiskData *pDiskData); void tDiskDataClear(SDiskData *pDiskData);
int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg); int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg);
int32_t tDiskToBlockData(SDiskData *pDiskData, SBlockData *pBlockData); int32_t tDiskToBlockData(SDiskData *pDiskData, SBlockData *pBlockData);
// SDiskDataHdr
int32_t tPutDiskDataHdr(uint8_t *p, void *ph);
int32_t tGetDiskDataHdr(uint8_t *p, void *ph);
// SDelIdx // SDelIdx
int32_t tPutDelIdx(uint8_t *p, void *ph); int32_t tPutDelIdx(uint8_t *p, void *ph);
int32_t tGetDelIdx(uint8_t *p, void *ph); int32_t tGetDelIdx(uint8_t *p, void *ph);
...@@ -236,10 +239,8 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter); ...@@ -236,10 +239,8 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter);
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **ppBuf); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **ppBuf);
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx); int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx);
int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf); int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf);
int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock *pBlock, uint8_t **ppBuf1, int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
uint8_t **ppBuf2, int8_t cmprAlg); int8_t cmprAlg, int8_t toLast, uint8_t **ppBuf);
int32_t tsdbWriteLastBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockL *pBlockL, uint8_t **ppBuf1,
uint8_t **ppBuf2, int8_t cmprAlg);
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
// SDataFReader // SDataFReader
...@@ -403,9 +404,9 @@ typedef struct { ...@@ -403,9 +404,9 @@ typedef struct {
int8_t smaOn; int8_t smaOn;
int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE
int32_t szOrigin; // original column value size (only save for variant data type) int32_t szOrigin; // original column value size (only save for variant data type)
int32_t szBitmap; // bitmap size int32_t szBitmap; // bitmap size, 0 only for flag == HAS_VAL
int32_t szOffset; // size of offset, only for variant-length data type int32_t szOffset; // offset size, 0 only for non-variant-length type
int32_t szValue; // compressed column value size int32_t szValue; // value size, 0 when flag == (HAS_NULL | HAS_NONE)
int32_t offset; int32_t offset;
uint8_t **ppData; uint8_t **ppData;
} SBlockCol; } SBlockCol;
...@@ -414,28 +415,33 @@ typedef struct { ...@@ -414,28 +415,33 @@ typedef struct {
int64_t offset; // block data offset int64_t offset; // block data offset
int32_t szBlock; int32_t szBlock;
int32_t szKey; int32_t szKey;
} SSubBlock; } SBlockInfo;
typedef struct {
int64_t offset;
int32_t size;
} SSmaInfo;
struct SBlock { struct SBlock {
TSDBKEY minKey; TSDBKEY minKey;
TSDBKEY maxKey; TSDBKEY maxKey;
int64_t minVer; int64_t minVer;
int64_t maxVer; int64_t maxVer;
int32_t nRow; int32_t nRow;
int8_t hasDup; int8_t hasDup;
int8_t nSubBlock; int8_t nSubBlock;
SSubBlock aSubBlock[TSDB_MAX_SUBBLOCKS]; SBlockInfo aSubBlock[TSDB_MAX_SUBBLOCKS];
int64_t sOffset; // sma offset SSmaInfo smaInfo;
int32_t nSma; // sma size
}; };
struct SBlockL { struct SBlockL {
int64_t suid; int64_t suid;
int64_t minUid; int64_t minUid;
int64_t maxUid; int64_t maxUid;
int64_t minVer; int64_t minVer;
int64_t maxVer; int64_t maxVer;
int32_t nRow; int32_t nRow;
SBlockInfo bInfo;
}; };
struct SColData { struct SColData {
...@@ -498,15 +504,15 @@ struct SDelIdx { ...@@ -498,15 +504,15 @@ struct SDelIdx {
int64_t size; int64_t size;
}; };
struct SBlockDataHdr { struct SDiskDataHdr {
uint32_t delimiter; uint32_t delimiter;
int32_t nRow;
int64_t suid; int64_t suid;
int64_t uid; int64_t uid;
int32_t szUid; int32_t szUid;
int32_t szVer; int32_t szVer;
int32_t szKey; int32_t szKey;
int32_t szBlkCol; int32_t szBlkCol;
int32_t nRow;
int8_t cmprAlg; int8_t cmprAlg;
}; };
...@@ -575,6 +581,14 @@ struct SDelFWriter { ...@@ -575,6 +581,14 @@ struct SDelFWriter {
TdFilePtr pWriteH; TdFilePtr pWriteH;
}; };
struct SDiskData {
SDiskDataHdr hdr;
uint8_t **ppKey;
SArray *aBlockCol; // SArray<SBlockCol>
int32_t nBuf;
SArray *aBuf; // SArray<uint8_t*>
};
struct SDataFWriter { struct SDataFWriter {
STsdb *pTsdb; STsdb *pTsdb;
SDFileSet wSet; SDFileSet wSet;
...@@ -588,6 +602,8 @@ struct SDataFWriter { ...@@ -588,6 +602,8 @@ struct SDataFWriter {
SDataFile fData; SDataFile fData;
SLastFile fLast; SLastFile fLast;
SSmaFile fSma; SSmaFile fSma;
SDiskData dData;
}; };
struct STsdbReadSnap { struct STsdbReadSnap {
...@@ -596,24 +612,6 @@ struct STsdbReadSnap { ...@@ -596,24 +612,6 @@ struct STsdbReadSnap {
STsdbFS fs; STsdbFS fs;
}; };
struct SDiskData {
int8_t cmprAlg;
int32_t nRow;
int64_t suid;
int64_t uid;
int32_t szUid;
int32_t szVer;
int32_t szKey;
uint8_t *pUid;
uint8_t *pVer;
uint8_t *pKey;
SArray *aBlockCol; // SArray<SBlockCol>
int32_t nBuf;
SArray *aBuf; // SArray<uint8_t*>
uint8_t *pBuf;
};
// ========== inline functions ========== // ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1; TSDBKEY *pKey1 = (TSDBKEY *)p1;
......
...@@ -494,10 +494,11 @@ _exit: ...@@ -494,10 +494,11 @@ _exit:
} }
static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) { static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) {
int32_t code = 0; int32_t code = 0;
SBlock block; SBlockData *pBlockData = &pCommitter->dWriter.bData;
SBlock block;
ASSERT(pCommitter->dWriter.bData.nRow > 0); ASSERT(pBlockData->nRow > 0);
if (pBlock) { if (pBlock) {
block = *pBlock; // as a subblock block = *pBlock; // as a subblock
...@@ -505,37 +506,84 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) { ...@@ -505,37 +506,84 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) {
tBlockReset(&block); // as a new block tBlockReset(&block); // as a new block
} }
code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bData, &block, NULL, NULL, // statistic
pCommitter->cmprAlg); block.nRow += pBlockData->nRow;
if (code) goto _exit; for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
if (iRow == 0) {
if (tsdbKeyCmprFn(&block.minKey, &key) > 0) {
block.minKey = key;
}
} else {
if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) {
block.hasDup = 1;
}
}
if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&block.maxKey, &key) < 0) {
block.maxKey = key;
}
block.minVer = TMIN(block.minVer, key.version);
block.maxVer = TMAX(block.maxVer, key.version);
}
// write
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &block.aSubBlock[block.nSubBlock++],
&block.smaInfo, pCommitter->cmprAlg, 0, NULL);
if (code) goto _err;
// put SBlock
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutBlock); code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutBlock);
if (code) goto _exit; if (code) goto _err;
tBlockDataClearData(&pCommitter->dWriter.bData); // clear
tBlockDataClearData(pBlockData);
_exit: return code;
_err:
tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
SBlockL blockL; SBlockL blockL;
SBlockData *pBlockData = &pCommitter->dWriter.bDatal;
ASSERT(pCommitter->dWriter.bDatal.nRow > 0);
ASSERT(pBlockData->nRow > 0);
// statistic
blockL.suid = pBlockData->suid;
blockL.nRow = pBlockData->nRow;
blockL.minVer = VERSION_MAX;
blockL.maxVer = VERSION_MIN;
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
blockL.minVer = TMIN(blockL.minVer, pBlockData->aVersion[iRow]);
blockL.maxVer = TMIN(blockL.maxVer, pBlockData->aVersion[iRow]);
}
blockL.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0];
blockL.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
code = tsdbWriteLastBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, &blockL, NULL, NULL, // write
pCommitter->cmprAlg); code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &blockL.bInfo, NULL, pCommitter->cmprAlg, 1, NULL);
if (code) goto _exit; if (code) goto _err;
// push SBlockL
if (taosArrayPush(pCommitter->dWriter.aBlockL, &blockL) == NULL) { if (taosArrayPush(pCommitter->dWriter.aBlockL, &blockL) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _err;
} }
tBlockDataClearData(&pCommitter->dWriter.bDatal); // clear
tBlockDataClearData(pBlockData);
_exit: return code;
_err:
tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
......
...@@ -217,31 +217,30 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs) { ...@@ -217,31 +217,30 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs) {
// SBlock ====================================================== // SBlock ======================================================
void tBlockReset(SBlock *pBlock) { void tBlockReset(SBlock *pBlock) {
*pBlock = *pBlock = (SBlock){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN};
(SBlock){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVersion = VERSION_MAX, .maxVersion = VERSION_MIN};
} }
int32_t tPutBlock(uint8_t *p, void *ph) { int32_t tPutBlock(uint8_t *p, void *ph) {
int32_t n = 0; int32_t n = 0;
SBlock *pBlock = (SBlock *)ph; SBlock *pBlock = (SBlock *)ph;
n += tPutTSDBKEY(p ? p + n : p, &pBlock->minKey); n += tPutI64v(p ? p + n : p, pBlock->minKey.version);
n += tPutTSDBKEY(p ? p + n : p, &pBlock->maxKey); n += tPutI64(p ? p + n : p, pBlock->minKey.ts);
n += tPutI64v(p ? p + n : p, pBlock->maxKey.version);
n += tPutI64(p ? p + n : p, pBlock->maxKey.ts);
n += tPutI64v(p ? p + n : p, pBlock->minVer); n += tPutI64v(p ? p + n : p, pBlock->minVer);
n += tPutI64v(p ? p + n : p, pBlock->maxVer); n += tPutI64v(p ? p + n : p, pBlock->maxVer);
n += tPutI32v(p ? p + n : p, pBlock->nRow); n += tPutI32v(p ? p + n : p, pBlock->nRow);
n += tPutI8(p ? p + n : p, pBlock->hasDup); n += tPutI8(p ? p + n : p, pBlock->hasDup);
n += tPutI8(p ? p + n : p, pBlock->nSubBlock); n += tPutI8(p ? p + n : p, pBlock->nSubBlock);
for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nRow);
n += tPutI8(p ? p + n : p, pBlock->aSubBlock[iSubBlock].cmprAlg);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset);
n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szBlockCol);
n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szVersion);
n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szTSKEY);
n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szBlock); n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szBlock);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].sOffset); n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szKey);
n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nSma); }
if (pBlock->nSubBlock == 1 && !pBlock->hasDup) {
n += tPutI64v(p ? p + n : p, pBlock->smaInfo.offset);
n += tPutI32v(p ? p + n : p, pBlock->smaInfo.size);
} }
return n; return n;
...@@ -251,23 +250,26 @@ int32_t tGetBlock(uint8_t *p, void *ph) { ...@@ -251,23 +250,26 @@ int32_t tGetBlock(uint8_t *p, void *ph) {
int32_t n = 0; int32_t n = 0;
SBlock *pBlock = (SBlock *)ph; SBlock *pBlock = (SBlock *)ph;
n += tGetTSDBKEY(p + n, &pBlock->minKey); n += tGetI64v(p + n, &pBlock->minKey.version);
n += tGetTSDBKEY(p + n, &pBlock->maxKey); n += tGetI64(p + n, &pBlock->minKey.ts);
n += tGetI64v(p + n, &pBlock->maxKey.version);
n += tGetI64(p + n, &pBlock->maxKey.ts);
n += tGetI64v(p + n, &pBlock->minVer); n += tGetI64v(p + n, &pBlock->minVer);
n += tGetI64v(p + n, &pBlock->maxVer); n += tGetI64v(p + n, &pBlock->maxVer);
n += tGetI32v(p + n, &pBlock->nRow); n += tGetI32v(p + n, &pBlock->nRow);
n += tGetI8(p + n, &pBlock->hasDup); n += tGetI8(p + n, &pBlock->hasDup);
n += tGetI8(p + n, &pBlock->nSubBlock); n += tGetI8(p + n, &pBlock->nSubBlock);
for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].nRow);
n += tGetI8(p + n, &pBlock->aSubBlock[iSubBlock].cmprAlg);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset);
n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szBlockCol);
n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szVersion);
n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szTSKEY);
n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szBlock); n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szBlock);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].sOffset); n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szKey);
n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].nSma); }
if (pBlock->nSubBlock == 1 && !pBlock->hasDup) {
n += tGetI64v(p + n, &pBlock->smaInfo.offset);
n += tGetI32v(p + n, &pBlock->smaInfo.size);
} else {
pBlock->smaInfo.offset = 0;
pBlock->smaInfo.size = 0;
} }
return n; return n;
...@@ -290,8 +292,9 @@ bool tBlockHasSma(SBlock *pBlock) { ...@@ -290,8 +292,9 @@ bool tBlockHasSma(SBlock *pBlock) {
if (pBlock->nSubBlock > 1) return false; if (pBlock->nSubBlock > 1) return false;
if (pBlock->hasDup) return false; if (pBlock->hasDup) return false;
return pBlock->aSubBlock[0].nSma > 0; return pBlock->smaInfo.size > 0;
} }
// SBlockL ====================================================== // SBlockL ======================================================
int32_t tPutBlockL(uint8_t *p, void *ph) { int32_t tPutBlockL(uint8_t *p, void *ph) {
int32_t n = 0; int32_t n = 0;
...@@ -303,13 +306,9 @@ int32_t tPutBlockL(uint8_t *p, void *ph) { ...@@ -303,13 +306,9 @@ int32_t tPutBlockL(uint8_t *p, void *ph) {
n += tPutI64v(p ? p + n : p, pBlockL->minVer); n += tPutI64v(p ? p + n : p, pBlockL->minVer);
n += tPutI64v(p ? p + n : p, pBlockL->maxVer); n += tPutI64v(p ? p + n : p, pBlockL->maxVer);
n += tPutI32v(p ? p + n : p, pBlockL->nRow); n += tPutI32v(p ? p + n : p, pBlockL->nRow);
n += tPutI64v(p ? p + n : p, pBlockL->offset); n += tPutI64v(p ? p + n : p, pBlockL->bInfo.offset);
n += tPutI8(p ? p + n : p, pBlockL->cmprAlg); n += tPutI32v(p ? p + n : p, pBlockL->bInfo.szBlock);
n += tPutI32v(p ? p + n : p, pBlockL->szBlockCol); n += tPutI32v(p ? p + n : p, pBlockL->bInfo.szKey);
n += tPutI32v(p ? p + n : p, pBlockL->szUid);
n += tPutI32v(p ? p + n : p, pBlockL->szVer);
n += tPutI32v(p ? p + n : p, pBlockL->szTSKEY);
n += tPutI32v(p ? p + n : p, pBlockL->szBlock);
return n; return n;
} }
...@@ -324,13 +323,9 @@ int32_t tGetBlockL(uint8_t *p, void *ph) { ...@@ -324,13 +323,9 @@ int32_t tGetBlockL(uint8_t *p, void *ph) {
n += tGetI64v(p + n, &pBlockL->minVer); n += tGetI64v(p + n, &pBlockL->minVer);
n += tGetI64v(p + n, &pBlockL->maxVer); n += tGetI64v(p + n, &pBlockL->maxVer);
n += tGetI32v(p + n, &pBlockL->nRow); n += tGetI32v(p + n, &pBlockL->nRow);
n += tGetI64v(p + n, &pBlockL->offset); n += tGetI64v(p + n, &pBlockL->bInfo.offset);
n += tGetI8(p + n, &pBlockL->cmprAlg); n += tGetI32v(p + n, &pBlockL->bInfo.szBlock);
n += tGetI32v(p + n, &pBlockL->szBlockCol); n += tGetI32v(p + n, &pBlockL->bInfo.szKey);
n += tGetI32v(p + n, &pBlockL->szUid);
n += tGetI32v(p + n, &pBlockL->szVer);
n += tGetI32v(p + n, &pBlockL->szTSKEY);
n += tGetI32v(p + n, &pBlockL->szBlock);
return n; return n;
} }
...@@ -346,15 +341,25 @@ int32_t tPutBlockCol(uint8_t *p, void *ph) { ...@@ -346,15 +341,25 @@ int32_t tPutBlockCol(uint8_t *p, void *ph) {
n += tPutI8(p ? p + n : p, pBlockCol->type); n += tPutI8(p ? p + n : p, pBlockCol->type);
n += tPutI8(p ? p + n : p, pBlockCol->smaOn); n += tPutI8(p ? p + n : p, pBlockCol->smaOn);
n += tPutI8(p ? p + n : p, pBlockCol->flag); n += tPutI8(p ? p + n : p, pBlockCol->flag);
n += tPutI32v(p ? p + n : p, pBlockCol->szOrigin);
if (pBlockCol->flag != HAS_NULL) { if (pBlockCol->flag != HAS_NULL) {
n += tPutI32v(p ? p + n : p, pBlockCol->szOrigin); if (pBlockCol->flag != HAS_VALUE) {
n += tPutI32v(p ? p + n : p, pBlockCol->szBitmap); n += tPutI32v(p ? p + n : p, pBlockCol->szBitmap);
n += tPutI32v(p ? p + n : p, pBlockCol->szOffset); }
n += tPutI32v(p ? p + n : p, pBlockCol->szValue);
if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
n += tPutI32v(p ? p + n : p, pBlockCol->szOffset);
}
if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) {
n += tPutI32v(p ? p + n : p, pBlockCol->szValue);
}
n += tPutI32v(p ? p + n : p, pBlockCol->offset); n += tPutI32v(p ? p + n : p, pBlockCol->offset);
} }
_exit:
return n; return n;
} }
...@@ -366,14 +371,28 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) { ...@@ -366,14 +371,28 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) {
n += tGetI8(p + n, &pBlockCol->type); n += tGetI8(p + n, &pBlockCol->type);
n += tGetI8(p + n, &pBlockCol->smaOn); n += tGetI8(p + n, &pBlockCol->smaOn);
n += tGetI8(p + n, &pBlockCol->flag); n += tGetI8(p + n, &pBlockCol->flag);
n += tGetI32v(p + n, &pBlockCol->szOrigin);
ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE)); ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE));
pBlockCol->szBitmap = 0;
pBlockCol->szOffset = 0;
pBlockCol->szValue = 0;
pBlockCol->offset = 0;
if (pBlockCol->flag != HAS_NULL) { if (pBlockCol->flag != HAS_NULL) {
n += tGetI32v(p + n, &pBlockCol->szOrigin); if (pBlockCol->flag != HAS_VALUE) {
n += tGetI32v(p + n, &pBlockCol->szBitmap); n += tGetI32v(p + n, &pBlockCol->szBitmap);
n += tGetI32v(p + n, &pBlockCol->szOffset); }
n += tGetI32v(p + n, &pBlockCol->szValue);
if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
n += tGetI32v(p + n, &pBlockCol->szOffset);
}
if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) {
n += tGetI32v(p + n, &pBlockCol->szValue);
}
n += tGetI32v(p + n, &pBlockCol->offset); n += tGetI32v(p + n, &pBlockCol->offset);
} }
...@@ -1650,22 +1669,35 @@ int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cm ...@@ -1650,22 +1669,35 @@ int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cm
taosArrayClear(pDiskData->aBlockCol); taosArrayClear(pDiskData->aBlockCol);
pDiskData->nBuf = 0; pDiskData->nBuf = 0;
// uid {
if (pDiskData->uid == 0) { pDiskData->ppKey = tDiskDataAllocBuf(pDiskData);
code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg, if (pDiskData->ppKey == NULL) {
&pDiskData->pUid, &pDiskData->szUid, &pDiskData->pBuf); code = TSDB_CODE_OUT_OF_MEMORY;
if (code) goto _exit; goto _exit;
} }
// version int32_t n = 0;
code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, // uid
cmprAlg, &pDiskData->pVer, &pDiskData->szVer, &pDiskData->pBuf); if (pDiskData->uid == 0) {
if (code) goto _exit; code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT,
cmprAlg, pDiskData->ppKey, n, &pDiskData->szUid, NULL);
if (code) goto _exit;
} else {
pDiskData->szUid = 0;
}
n += pDiskData->szUid;
// tskey // version
code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP, code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT,
cmprAlg, &pDiskData->pKey, &pDiskData->szKey, &pDiskData->pBuf); cmprAlg, pDiskData->ppKey, n, &pDiskData->szVer, NULL);
if (code) goto _exit; if (code) goto _exit;
n += pDiskData->szVer;
// tskey
code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP,
cmprAlg, pDiskData->ppKey, &pDiskData->szKey, NULL);
if (code) goto _exit;
}
// columns // columns
int32_t offset = 0; int32_t offset = 0;
...@@ -1689,7 +1721,7 @@ int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cm ...@@ -1689,7 +1721,7 @@ int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cm
} }
// compress // compress
code = tsdbCmprColData(pColData, cmprAlg, &blockCol); code = tsdbCmprColData(pColData, cmprAlg, &blockCol, NULL);
if (code) goto _exit; if (code) goto _exit;
// update offset // update offset
...@@ -1709,6 +1741,41 @@ _exit: ...@@ -1709,6 +1741,41 @@ _exit:
int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg); int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg);
int32_t tDiskToBlockData(SDiskData *pDiskData, SBlockData *pBlockData); int32_t tDiskToBlockData(SDiskData *pDiskData, SBlockData *pBlockData);
// SDiskDataHdr ==============================
int32_t tPutDiskDataHdr(uint8_t *p, void *ph) {
int32_t n = 0;
SDiskDataHdr *pHdr = (SDiskDataHdr *)ph;
n += tPutU32(p ? p + n : p, pHdr->delimiter);
n += tPutI64(p ? p + n : p, pHdr->suid);
n += tPutI64(p ? p + n : p, pHdr->uid);
n += tPutI32v(p ? p + n : p, pHdr->szUid);
n += tPutI32v(p ? p + n : p, pHdr->szVer);
n += tPutI32v(p ? p + n : p, pHdr->szKey);
n += tPutI32v(p ? p + n : p, pHdr->szBlkCol);
n += tPutI32v(p ? p + n : p, pHdr->nRow);
n += tPutI8(p ? p + n : p, pHdr->cmprAlg);
return n;
}
int32_t tGetDiskDataHdr(uint8_t *p, void *ph) {
int32_t n = 0;
SDiskDataHdr *pHdr = (SDiskDataHdr *)ph;
n += tGetU32(p + n, &pHdr->delimiter);
n += tGetI64(p + n, &pHdr->suid);
n += tGetI64(p + n, &pHdr->uid);
n += tGetI32v(p + n, &pHdr->szUid);
n += tGetI32v(p + n, &pHdr->szVer);
n += tGetI32v(p + n, &pHdr->szKey);
n += tGetI32v(p + n, &pHdr->szBlkCol);
n += tGetI32v(p + n, &pHdr->nRow);
n += tGetI8(p + n, &pHdr->cmprAlg);
return n;
}
// ALGORITHM ============================== // ALGORITHM ==============================
void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
SColVal colVal; SColVal colVal;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册