提交 e32961ce 编写于 作者: H Hongze Cheng

more work

上级 7d1574fe
......@@ -204,7 +204,7 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf);
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2,
SBlockIdx *pBlockIdx, SBlock *pBlock);
SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg);
int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize);
SDFileSet *tsdbDataFWriterGetWSet(SDataFWriter *pWriter);
......@@ -213,8 +213,10 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
SBlockData *pBlockData);
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
int16_t *aColId, int32_t nCol, uint8_t **ppBuf1, uint8_t **ppBuf2);
uint8_t **ppBuf1, uint8_t **ppBuf2);
int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA);
// SDelFWriter
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
......@@ -365,6 +367,8 @@ typedef struct {
} SBlockCol;
typedef struct {
int64_t nRow;
int8_t cmprAlg;
int64_t offset;
int64_t ksize;
int64_t bsize;
......@@ -379,7 +383,6 @@ struct SBlock {
int32_t nRow;
int8_t last;
int8_t hasDup;
int8_t cmprAlg;
int8_t nSubBlock;
SSubBlock aSubBlock[TSDB_MAX_SUBBLOCKS];
};
......@@ -404,6 +407,7 @@ struct SColData {
int32_t *aOffset;
int32_t nData;
uint8_t *pData;
uint8_t *pBuf;
};
struct SBlockData {
......
......@@ -722,8 +722,7 @@ static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, STbData *pTbData) {
row = tBlockDataLastRow(pBlockData);
if (tsdbKeyCmprFn(&pBlock->maxKey, &TSDBROW_KEY(&row)) < 0) pBlock->maxKey = TSDBROW_KEY(&row);
pBlock->last = pBlockData->nRow < pCommitter->minRow ? 1 : 0;
pBlock->cmprAlg = pCommitter->cmprAlg;
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock);
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
if (code) goto _err;
// Design SMA and write SMA to file
......@@ -760,17 +759,10 @@ _err:
static int32_t tsdbCommitDiskData(SCommitter *pCommitter, SBlockIdx *oBlockIdx) {
int32_t code = 0;
SMapData *mBlockO = &pCommitter->oBlockMap;
SMapData *mBlockN = &pCommitter->nBlockMap;
SBlock *pBlockO = &pCommitter->oBlock;
SMapData *mBlockN = &pCommitter->nBlockMap;
SBlock *pBlockN = &pCommitter->nBlock;
SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = oBlockIdx->suid,
.uid = oBlockIdx->uid,
.maxKey = oBlockIdx->maxKey,
.minKey = oBlockIdx->minKey,
.minVersion = oBlockIdx->minVersion,
.maxVersion = oBlockIdx->maxVersion,
.offset = -1,
.size = -1};
SBlockIdx *pBlockIdx = &(SBlockIdx){0};
SBlockData *pBlockDataO = &pCommitter->oBlockData;
// read
......@@ -784,13 +776,12 @@ static int32_t tsdbCommitDiskData(SCommitter *pCommitter, SBlockIdx *oBlockIdx)
if (pBlockO->last) {
ASSERT(iBlock == mBlockO->nItem - 1);
code = tsdbReadBlockData(pCommitter->pReader, oBlockIdx, pBlockO, pBlockDataO, NULL, -1, NULL, NULL);
code = tsdbReadBlockData(pCommitter->pReader, oBlockIdx, pBlockO, pBlockDataO, NULL, NULL);
if (code) goto _err;
tBlockReset(pBlockN);
pBlockN->last = 1;
pBlockN->cmprAlg = pBlockO->cmprAlg;
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockDataO, NULL, NULL, pBlockIdx, pBlockN);
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockDataO, NULL, NULL, pBlockIdx, pBlockN, pCommitter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(mBlockN, pBlockN, tPutBlock);
......@@ -802,6 +793,7 @@ static int32_t tsdbCommitDiskData(SCommitter *pCommitter, SBlockIdx *oBlockIdx)
}
// SBlock
*pBlockIdx = *oBlockIdx;
code = tsdbWriteBlock(pCommitter->pWriter, mBlockN, NULL, pBlockIdx);
if (code) goto _err;
......@@ -812,7 +804,7 @@ static int32_t tsdbCommitDiskData(SCommitter *pCommitter, SBlockIdx *oBlockIdx)
return code;
_err:
tsdbError("vgId:%d tsdb Commit disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d tsdb commit disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
......
......@@ -591,13 +591,112 @@ _err:
return code;
}
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
int16_t *aColId, int32_t nCol, uint8_t **ppBuf1, uint8_t **ppBuf2) {
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
SBlockData *pBlockData) {
int32_t code = 0;
TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD;
// TODO
return code;
}
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
uint8_t **ppBuf1, uint8_t **ppBuf2) {
int32_t code = 0;
TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD;
uint8_t *pBuf1 = NULL;
uint8_t *pBuf2 = NULL;
SBlockCol *pBlockCol = &(SBlockCol){};
if (!ppBuf1) ppBuf1 = &pBuf1;
if (!ppBuf2) ppBuf2 = &pBuf2;
for (int32_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock];
uint8_t *p;
int64_t n;
// realloc
code = tsdbRealloc(ppBuf1, pSubBlock->bsize);
if (code) goto _err;
// seek
n = taosLSeekFile(pFD, pSubBlock->offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// read
n = taosReadFile(pFD, *ppBuf1, pSubBlock->bsize);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < pSubBlock->bsize) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// check
p = *ppBuf1;
SBlockDataHdr *pHdr = (SBlockDataHdr *)p;
ASSERT(pHdr->delimiter == TSDB_FILE_DLMT);
ASSERT(pHdr->suid == pBlockIdx->suid);
ASSERT(pHdr->uid == pBlockIdx->uid);
p += sizeof(*pHdr);
if (!taosCheckChecksumWhole(p, pSubBlock->ksize)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
p += pSubBlock->ksize;
for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) {
tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol);
ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
if (pBlockCol->flag == HAS_NULL) continue;
if (!taosCheckChecksumWhole(p, pBlockCol->size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
p += pBlockCol->size;
}
// recover
pBlockData->nRow = pSubBlock->nRow;
p = *ppBuf1 + sizeof(*pHdr);
code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, pBlockData->nRow * sizeof(int64_t));
if (code) goto _err;
code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, pBlockData->nRow * sizeof(TSKEY));
if (code) goto _err;
p += pSubBlock->ksize;
for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) {
tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol);
if (pBlockCol->flag == HAS_NONE) {
// All NULL value
} else {
// decompress
p += pBlockCol->size;
}
}
}
if (pBuf1) tsdbFree(pBuf1);
if (pBuf2) tsdbFree(pBuf2);
return code;
_err:
tsdbError("vgId:%d tsdb read block data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
if (pBuf1) tsdbFree(pBuf1);
if (pBuf2) tsdbFree(pBuf2);
return code;
}
int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA) {
int32_t code = 0;
// TODO
......@@ -1000,7 +1099,7 @@ _err:
}
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2,
SBlockIdx *pBlockIdx, SBlock *pBlock) {
SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg) {
int32_t code = 0;
SSubBlock *pSubBlock = &pBlock->aSubBlock[pBlock->nSubBlock++];
SBlockCol *pBlockCol = &(SBlockCol){0};
......@@ -1017,6 +1116,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
if (!ppBuf1) ppBuf1 = &pBuf1;
if (!ppBuf2) ppBuf2 = &pBuf2;
pSubBlock->nRow = pBlockData->nRow;
pSubBlock->cmprAlg = cmprAlg;
if (pBlock->last) {
pSubBlock->offset = pWriter->wSet.fLast.size;
} else {
......@@ -1034,7 +1135,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
// TSDBKEY
pSubBlock->ksize = 0;
if (pBlock->cmprAlg == NO_COMPRESSION) {
if (cmprAlg == NO_COMPRESSION) {
// TSKEY
size = sizeof(TSKEY) * pBlockData->nRow;
n = taosWriteFile(pFileFD, pBlockData->aTSKEY, size);
......@@ -1064,21 +1165,21 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
}
pSubBlock->ksize += size;
} else {
ASSERT(pBlock->cmprAlg == ONE_STAGE_COMP || pBlock->cmprAlg == TWO_STAGE_COMP);
ASSERT(cmprAlg == ONE_STAGE_COMP || cmprAlg == TWO_STAGE_COMP);
size = (sizeof(TSKEY) + sizeof(int64_t)) * pBlockData->nRow + COMP_OVERFLOW_BYTES * 2 + sizeof(TSCKSUM);
code = tsdbRealloc(ppBuf1, size);
if (code) goto _err;
if (pBlock->cmprAlg == TWO_STAGE_COMP) {
if (cmprAlg == TWO_STAGE_COMP) {
code = tsdbRealloc(ppBuf2, size);
if (code) goto _err;
}
// TSKEY
n = tsCompressTimestamp((char *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, pBlockData->nRow, *ppBuf1,
size, pBlock->cmprAlg, *ppBuf2, size);
size, cmprAlg, *ppBuf2, size);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
......@@ -1087,7 +1188,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
// version
n = tsCompressBigint((char *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, pBlockData->nRow,
*ppBuf1 + pSubBlock->ksize, size - pSubBlock->ksize, pBlock->cmprAlg, *ppBuf2, size);
*ppBuf1 + pSubBlock->ksize, size - pSubBlock->ksize, cmprAlg, *ppBuf2, size);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
......@@ -1141,7 +1242,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
}
// data
if (pBlock->cmprAlg == NO_COMPRESSION) {
if (cmprAlg == NO_COMPRESSION) {
// data
n = taosWriteFile(pFileFD, pColData->pData, pColData->nData);
if (n < 0) {
......@@ -1164,14 +1265,14 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code = tsdbRealloc(ppBuf1, size);
if (code) goto _err;
if (pBlock->cmprAlg == TWO_STAGE_COMP) {
if (cmprAlg == TWO_STAGE_COMP) {
code = tsdbRealloc(ppBuf2, size);
if (code) goto _err;
}
// data
n = tDataTypes[pColData->type].compFunc(pColData->pData, pColData->nData, pBlockData->nRow, *ppBuf1, size,
pBlock->cmprAlg, *ppBuf2, size);
cmprAlg, *ppBuf2, size);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
......
......@@ -118,14 +118,13 @@ _exit:
int32_t tPutMapData(uint8_t *p, SMapData *pMapData) {
int32_t n = 0;
int32_t maxOffset;
ASSERT(pMapData->flag == TSDB_OFFSET_I32);
ASSERT(pMapData->nItem > 0);
maxOffset = tMapDataGetOffset(pMapData, pMapData->nItem - 1);
n += tPutI32v(p ? p + n : p, pMapData->nItem);
if (pMapData->nItem) {
int32_t maxOffset = tMapDataGetOffset(pMapData, pMapData->nItem - 1);
if (maxOffset <= INT8_MAX) {
n += tPutU8(p ? p + n : p, TSDB_OFFSET_I8);
if (p) {
......@@ -155,6 +154,7 @@ int32_t tPutMapData(uint8_t *p, SMapData *pMapData) {
}
}
n += tPutBinary(p ? p + n : p, pMapData->pData, pMapData->nData);
}
return n;
}
......@@ -163,6 +163,7 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData) {
int32_t n = 0;
n += tGetI32v(p + n, &pMapData->nItem);
if (pMapData->nItem) {
n += tGetU8(p + n, &pMapData->flag);
pMapData->pOfst = p + n;
switch (pMapData->flag) {
......@@ -180,6 +181,7 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData) {
ASSERT(0);
}
n += tGetBinary(p + n, &pMapData->pData, &pMapData->nData);
}
return n;
}
......@@ -330,8 +332,9 @@ void tBlockReset(SBlock *pBlock) {
pBlock->nRow = 0;
pBlock->last = -1;
pBlock->hasDup = 0;
pBlock->cmprAlg = -1;
for (int8_t iSubBlock = 0; iSubBlock < TSDB_MAX_SUBBLOCKS; iSubBlock++) {
pBlock->aSubBlock[iSubBlock].nRow = 0;
pBlock->aSubBlock[iSubBlock].cmprAlg = -1;
pBlock->aSubBlock[iSubBlock].offset = -1;
pBlock->aSubBlock[iSubBlock].ksize = -1;
pBlock->aSubBlock[iSubBlock].bsize = -1;
......@@ -357,9 +360,10 @@ int32_t tPutBlock(uint8_t *p, void *ph) {
n += tPutI32v(p ? p + n : p, pBlock->nRow);
n += tPutI8(p ? p + n : p, pBlock->last);
n += tPutI8(p ? p + n : p, pBlock->hasDup);
n += tPutI8(p ? p + n : p, pBlock->cmprAlg);
n += tPutI8(p ? p + n : p, pBlock->nSubBlock);
for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nRow);
n += tPutI8(p ? p + n : p, pBlock->aSubBlock[iSubBlock].cmprAlg);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].ksize);
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].bsize);
......@@ -380,9 +384,10 @@ int32_t tGetBlock(uint8_t *p, void *ph) {
n += tGetI32v(p + n, &pBlock->nRow);
n += tGetI8(p + n, &pBlock->last);
n += tGetI8(p + n, &pBlock->hasDup);
n += tGetI8(p + n, &pBlock->cmprAlg);
n += tGetI8(p + n, &pBlock->nSubBlock);
for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].nRow);
n += tGetI8(p + n, &pBlock->aSubBlock[iSubBlock].cmprAlg);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].ksize);
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].bsize);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册