提交 cc181e0b 编写于 作者: H Hongze Cheng

refact

上级 1abc5911
......@@ -394,9 +394,9 @@ typedef struct {
int64_t nRow;
int8_t cmprAlg;
int64_t offset;
int64_t vsize; // VERSION size
int64_t ksize; // TSKEY size
int64_t bsize; // total block size
int64_t szVersion; // VERSION size
int64_t szTSKEY; // TSKEY size
int64_t szBlock; // total block size
SMapData mBlockCol; // SMapData<SBlockCol>
} SSubBlock;
......
......@@ -635,7 +635,7 @@ _err:
static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) {
int32_t code = 0;
int64_t size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM);
int64_t size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
int64_t n;
if (!taosCheckChecksumWhole(pBuf, size)) {
......@@ -649,15 +649,15 @@ static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBl
if (code) goto _err;
if (pSubBlock->cmprAlg == NO_COMPRESSION) {
ASSERT(pSubBlock->vsize == sizeof(int64_t) * pSubBlock->nRow);
ASSERT(pSubBlock->ksize == sizeof(TSKEY) * pSubBlock->nRow);
ASSERT(pSubBlock->szVersion == sizeof(int64_t) * pSubBlock->nRow);
ASSERT(pSubBlock->szTSKEY == sizeof(TSKEY) * pSubBlock->nRow);
// VERSION
memcpy(pBlockData->aVersion, pBuf, pSubBlock->vsize);
memcpy(pBlockData->aVersion, pBuf, pSubBlock->szVersion);
// TSKEY
pBuf = pBuf + pSubBlock->vsize;
memcpy(pBlockData->aTSKEY, pBuf + pSubBlock->vsize, pSubBlock->ksize);
pBuf = pBuf + pSubBlock->szVersion;
memcpy(pBlockData->aTSKEY, pBuf + pSubBlock->szVersion, pSubBlock->szTSKEY);
} else {
size = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES;
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
......@@ -666,7 +666,7 @@ static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBl
}
// VERSION
n = tsDecompressBigint(pBuf, pSubBlock->vsize, pSubBlock->nRow, (char *)pBlockData->aVersion,
n = tsDecompressBigint(pBuf, pSubBlock->szVersion, pSubBlock->nRow, (char *)pBlockData->aVersion,
sizeof(int64_t) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf, size);
if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR;
......@@ -674,8 +674,8 @@ static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBl
}
// TSKEY
pBuf = pBuf + pSubBlock->vsize;
n = tsDecompressTimestamp(pBuf, pSubBlock->ksize, pSubBlock->nRow, (char *)pBlockData->aTSKEY,
pBuf = pBuf + pSubBlock->szVersion;
n = tsDecompressTimestamp(pBuf, pSubBlock->szTSKEY, pSubBlock->nRow, (char *)pBlockData->aTSKEY,
sizeof(TSKEY) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf, size);
if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR;
......@@ -768,7 +768,7 @@ static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx,
// TSDBKEY
offset = pSubBlock->offset + sizeof(SBlockDataHdr);
size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM);
size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
code = tsdbRealloc(ppBuf1, size);
if (code) goto _err;
......@@ -809,8 +809,8 @@ static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx,
if (code) goto _err;
}
} else {
offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM) +
pBlockCol->offset;
offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->szVersion + pSubBlock->szTSKEY +
sizeof(TSCKSUM) + pBlockCol->offset;
size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
code = tsdbRealloc(ppBuf1, size);
......@@ -909,7 +909,7 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx,
tBlockDataReset(pBlockData);
// realloc
code = tsdbRealloc(ppBuf1, pSubBlock->bsize);
code = tsdbRealloc(ppBuf1, pSubBlock->szBlock);
if (code) goto _err;
// seek
......@@ -920,11 +920,11 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx,
}
// read
n = taosReadFile(pFD, *ppBuf1, pSubBlock->bsize);
n = taosReadFile(pFD, *ppBuf1, pSubBlock->szBlock);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < pSubBlock->bsize) {
} else if (n < pSubBlock->szBlock) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
......@@ -935,7 +935,7 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx,
code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, p, ppBuf2);
if (code) goto _err;
p = p + pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM);
p = p + pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) {
SColData *pColData;
......@@ -1411,7 +1411,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
} else {
pSubBlock->offset = pWriter->wSet.fData.size;
}
pSubBlock->bsize = 0;
pSubBlock->szBlock = 0;
// HDR
n = taosWriteFile(pFileFD, &hdr, sizeof(hdr));
......@@ -1419,29 +1419,29 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pSubBlock->bsize += n;
pSubBlock->szBlock += n;
// TSDBKEY
if (cmprAlg == NO_COMPRESSION) {
cksm = 0;
// version
pSubBlock->vsize = sizeof(int64_t) * pBlockData->nRow;
n = taosWriteFile(pFileFD, pBlockData->aVersion, pSubBlock->vsize);
pSubBlock->szVersion = sizeof(int64_t) * pBlockData->nRow;
n = taosWriteFile(pFileFD, pBlockData->aVersion, pSubBlock->szVersion);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aVersion, pSubBlock->vsize);
cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aVersion, pSubBlock->szVersion);
// TSKEY
pSubBlock->ksize = sizeof(TSKEY) * pBlockData->nRow;
n = taosWriteFile(pFileFD, pBlockData->aTSKEY, pSubBlock->ksize);
pSubBlock->szTSKEY = sizeof(TSKEY) * pBlockData->nRow;
n = taosWriteFile(pFileFD, pBlockData->aTSKEY, pSubBlock->szTSKEY);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aTSKEY, pSubBlock->ksize);
cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aTSKEY, pSubBlock->szTSKEY);
// cksm
size = sizeof(cksm);
......@@ -1470,19 +1470,19 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
pSubBlock->vsize = n;
pSubBlock->szVersion = n;
// TSKEY
n = tsCompressTimestamp((char *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, pBlockData->nRow,
*ppBuf1 + pSubBlock->vsize, size - pSubBlock->vsize, cmprAlg, *ppBuf2, size);
*ppBuf1 + pSubBlock->szVersion, size - pSubBlock->szVersion, cmprAlg, *ppBuf2, size);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
pSubBlock->ksize = n;
pSubBlock->szTSKEY = n;
// cksm
n = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM);
n = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
ASSERT(n <= size);
taosCalcChecksumAppend(0, *ppBuf1, n);
......@@ -1493,7 +1493,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
goto _err;
}
}
pSubBlock->bsize += (pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM));
pSubBlock->szBlock += (pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM));
// other columns
offset = 0;
......@@ -1580,7 +1580,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
// state
offset = offset + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
pSubBlock->bsize = pSubBlock->bsize + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
pSubBlock->szBlock = pSubBlock->szBlock + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
}
code = tMapDataPutItem(&pSubBlock->mBlockCol, pBlockCol, tPutBlockCol);
......@@ -1588,9 +1588,9 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
}
if (pBlock->last) {
pWriter->wSet.fLast.size += pSubBlock->bsize;
pWriter->wSet.fLast.size += pSubBlock->szBlock;
} else {
pWriter->wSet.fData.size += pSubBlock->bsize;
pWriter->wSet.fData.size += pSubBlock->szBlock;
}
tsdbFree(pBuf1);
......
......@@ -388,9 +388,9 @@ void tBlockReset(SBlock *pBlock) {
pBlock->aSubBlock[iSubBlock].nRow = 0;
pBlock->aSubBlock[iSubBlock].cmprAlg = -1;
pBlock->aSubBlock[iSubBlock].offset = -1;
pBlock->aSubBlock[iSubBlock].vsize = -1;
pBlock->aSubBlock[iSubBlock].ksize = -1;
pBlock->aSubBlock[iSubBlock].bsize = -1;
pBlock->aSubBlock[iSubBlock].szVersion = -1;
pBlock->aSubBlock[iSubBlock].szTSKEY = -1;
pBlock->aSubBlock[iSubBlock].szBlock = -1;
tMapDataReset(&pBlock->aSubBlock->mBlockCol);
}
pBlock->nSubBlock = 0;
......@@ -417,9 +417,9 @@ int32_t tBlockCopy(SBlock *pBlockSrc, SBlock *pBlockDest) {
pBlockDest->aSubBlock[iSubBlock].nRow = pBlockSrc->aSubBlock[iSubBlock].nRow;
pBlockDest->aSubBlock[iSubBlock].cmprAlg = pBlockSrc->aSubBlock[iSubBlock].cmprAlg;
pBlockDest->aSubBlock[iSubBlock].offset = pBlockSrc->aSubBlock[iSubBlock].offset;
pBlockDest->aSubBlock[iSubBlock].vsize = pBlockSrc->aSubBlock[iSubBlock].vsize;
pBlockDest->aSubBlock[iSubBlock].ksize = pBlockSrc->aSubBlock[iSubBlock].ksize;
pBlockDest->aSubBlock[iSubBlock].bsize = pBlockSrc->aSubBlock[iSubBlock].bsize;
pBlockDest->aSubBlock[iSubBlock].szVersion = pBlockSrc->aSubBlock[iSubBlock].szVersion;
pBlockDest->aSubBlock[iSubBlock].szTSKEY = pBlockSrc->aSubBlock[iSubBlock].szTSKEY;
pBlockDest->aSubBlock[iSubBlock].szBlock = pBlockSrc->aSubBlock[iSubBlock].szBlock;
code = tMapDataCopy(&pBlockSrc->aSubBlock[iSubBlock].mBlockCol, &pBlockDest->aSubBlock[iSubBlock].mBlockCol);
if (code) goto _exit;
}
......@@ -444,9 +444,9 @@ int32_t tPutBlock(uint8_t *p, void *ph) {
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nRow);
n += tPutI8(p ? p + n : p, pBlock->aSubBlock[iSubBlock].cmprAlg);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].vsize);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].ksize);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].bsize);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szVersion);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szTSKEY);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szBlock);
n += tPutMapData(p ? p + n : p, &pBlock->aSubBlock[iSubBlock].mBlockCol);
}
......@@ -469,9 +469,9 @@ int32_t tGetBlock(uint8_t *p, void *ph) {
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].nRow);
n += tGetI8(p + n, &pBlock->aSubBlock[iSubBlock].cmprAlg);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].vsize);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].ksize);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].bsize);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].szVersion);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].szTSKEY);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].szBlock);
n += tGetMapData(p + n, &pBlock->aSubBlock[iSubBlock].mBlockCol);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册