提交 465816e5 编写于 作者: H Hongze Cheng

more work

上级 92559ef2
......@@ -50,6 +50,7 @@ typedef struct SBlock SBlock;
typedef struct SBlockStatis SBlockStatis;
typedef struct SAggrBlkCol SAggrBlkCol;
typedef struct SColData SColData;
typedef struct SBlockDataHdr SBlockDataHdr;
typedef struct SBlockData SBlockData;
typedef struct SReadH SReadH;
......@@ -105,8 +106,8 @@ int32_t tsdbDataFWriterClose(SDataFWriter *pWriter, int8_t sync);
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf);
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf, SBlockIdx *pBlockIdx,
SBlock *pBlock);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2,
SBlockIdx *pBlockIdx, SBlock *pBlock);
int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize);
// SDataFReader
......@@ -343,12 +344,11 @@ struct SBlock {
int32_t nRow;
int8_t last;
int8_t hasDup;
int8_t cmprAlg;
int8_t nSubBlock;
SSubBlock aSubBlock[TSDB_MAX_SUBBLOCKS];
};
int a = sizeof(SBlock);
struct SAggrBlkCol {
int16_t colId;
int16_t maxIndex;
......@@ -437,6 +437,12 @@ struct SBlockSMA {
SColSMA *aColSMA;
};
struct SBlockDataHdr {
uint8_t delimiter;
int64_t suid;
int64_t uid;
};
#ifdef __cplusplus
}
#endif
......
......@@ -721,20 +721,110 @@ _err:
return code;
}
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf, SBlockIdx *pBlockIdx,
SBlock *pBlock) {
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2,
SBlockIdx *pBlockIdx, SBlock *pBlock) {
int32_t code = 0;
SSubBlock *pSubBlock = &pBlock->aSubBlock[pBlock->nSubBlock++];
SBlockCol bCol;
int64_t size;
int64_t n;
TdFilePtr *pFileFD = pWriter->pDataFD; // TODO
SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
TSCKSUM cksm;
uint8_t *p;
int64_t offset;
pSubBlock->offset = 0; // TODO: set as file offset
pSubBlock->offset = 0;
pSubBlock->ksize = 0;
pSubBlock->bsize = 0;
tMapDataClear(&pSubBlock->mBlockCol);
// HDR
n = taosWriteFile(pFileFD, &hdr, sizeof(hdr));
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pSubBlock->bsize += n;
// TSDBKEY
pSubBlock->ksize = 0;
if (pBlock->cmprAlg == NO_COMPRESSION) {
// TSKEY
size = sizeof(TSKEY) * pBlockData->nRow;
n = taosWriteFile(pFileFD, pBlockData->aTSKEY, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pSubBlock->ksize += size;
cksm = taosCalcChecksum(0, (uint8_t *)pBlockData->aTSKEY, size);
// version
size = sizeof(int64_t) * pBlockData->nRow;
n = taosWriteFile(pFileFD, pBlockData->aVersion, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pSubBlock->ksize += size;
cksm = taosCalcChecksum(cksm, (uint8_t *)pBlockData->aVersion, size);
// cksm
size = sizeof(cksm);
n = taosWriteFile(pFileFD, (uint8_t *)&cksm, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pSubBlock->ksize += size;
} else {
ASSERT(pBlock->cmprAlg == ONE_STAGE_COMP || pBlock->cmprAlg == TWO_STAGE_COMP);
size = (sizeof(TSKEY) + sizeof(int64_t)) * pBlockData->nRow + COMP_OVERFLOW_BYTES * 2 + sizeof(TSCKSUM);
code = tsdbRealloc(ppBuf1, size);
if (code) goto _err;
if (pBlock->cmprAlg == TWO_STAGE_COMP) {
code = tsdbRealloc(ppBuf2, size);
if (code) goto _err;
}
// TSKEY
n = tsCompressTimestamp(pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, pBlockData->nRow, *ppBuf1, size,
pBlock->cmprAlg, *ppBuf2, size);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
pSubBlock->ksize += n;
// version
n = tsCompressBigint(pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, pBlockData->nRow,
*ppBuf1 + pSubBlock->ksize, size - pSubBlock->ksize, pBlock->cmprAlg, *ppBuf2, size);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
pSubBlock->ksize += n;
// cksm
pSubBlock->ksize += sizeof(TSCKSUM);
ASSERT(pSubBlock->ksize <= size);
taosCalcChecksumAppend(0, *ppBuf1, pSubBlock->ksize);
// write
n = taosWriteFile(pFileFD, *ppBuf1, pSubBlock->ksize);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
}
pSubBlock->bsize += pSubBlock->ksize;
// other columns
offset = 0;
tMapDataClear(&pSubBlock->mBlockCol);
for (int32_t iCol = 0; iCol < pBlockData->nCol; iCol++) {
SColData *pColData = &pBlockData->aColData[iCol];
......@@ -747,14 +837,77 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
bCol.flag = pColData->flags;
if (pColData->flags != HAS_NULL) {
cksm = 0;
bCol.offset = offset;
bCol.size = 0;
// bitmap
if (pColData->flags != HAS_VALUE) {
// handle bitmap
// TODO: optimize bitmap part
n = taosWriteFile(pFileFD, pColData->pBitMap, BIT2_SIZE(pBlockData->nRow));
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
cksm = taosCalcChecksum(cksm, pColData->pBitMap, n);
bCol.size += n;
}
// data
if (pBlock->cmprAlg == NO_COMPRESSION) {
// data
n = taosWriteFile(pFileFD, pColData->pData, pColData->nData);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
bCol.size += n;
// checksum
cksm = taosCalcChecksum(cksm, pColData->pData, pColData->nData);
n = taosWriteFile(pFileFD, &cksm, sizeof(cksm));
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
bCol.size += n;
} else {
size = pColData->nData + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM);
code = tsdbRealloc(ppBuf1, size);
if (code) goto _err;
if (pBlock->cmprAlg == TWO_STAGE_COMP) {
code = tsdbRealloc(ppBuf2, size);
if (code) goto _err;
}
// handle real data
// data
n = tDataTypes->compFunc(pColData->pData, pColData->nData, pBlockData->nRow, *ppBuf1, size, pBlock->cmprAlg,
*ppBuf2, size);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
// cksm
n += sizeof(TSCKSUM);
ASSERT(n <= size);
taosCalcChecksumAppend(cksm, *ppBuf1, n);
bCol.size += n;
// write
n = taosWriteFile(pFileFD, *ppBuf1, bCol.size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
}
// bCol.offset = ;
// bCol.size = ;
// state
offset += bCol.size;
pSubBlock->bsize += bCol.size;
}
code = tMapDataPutItem(&pSubBlock->mBlockCol, &bCol, tPutBlockCol);
......@@ -764,7 +917,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
return code;
_err:
tsdbError("vgId:%d write block data failed since %s", pWriter->pTsdb, tstrerror(code));
tsdbError("vgId:%d write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code;
}
......
......@@ -443,8 +443,11 @@ void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minK
// TSDBROW ======================================================
TSDBKEY tsdbRowKey(TSDBROW *pRow) {
// TODO: support SBlockData version
// if (pRow->type == 0) {
return (TSDBKEY){.version = pRow->version, .ts = pRow->pTSRow->ts};
// } else {
// return (TSDBKEY){.version = pRow->pBlockData->aVersion[pRow->iRow], .ts = pRow->pBlockData->aTSKEY[pRow->iRow]};
// }
}
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册