提交 d032dec4 编写于 作者: H Hongze Cheng

more work

上级 1d012a47
......@@ -117,6 +117,7 @@ int32_t tGetKEYINFO(uint8_t *p, KEYINFO *pKeyInfo);
// SBlockCol
int32_t tPutBlockCol(uint8_t *p, void *ph);
int32_t tGetBlockCol(uint8_t *p, void *ph);
int32_t tBlockColCmprFn(const void *p1, const void *p2);
// SBlock
#define tBlockInit() ((SBlock){0})
void tBlockReset(SBlock *pBlock);
......@@ -229,7 +230,7 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
SBlockData *pBlockData);
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2);
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData,
uint8_t **ppBuf1, uint8_t **ppBuf2);
int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA);
......
......@@ -618,11 +618,228 @@ _err:
return code;
}
static int32_t tsdbRecoverBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) {
int32_t code = 0;
int64_t size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM);
int64_t n;
if (!taosCheckChecksumWhole(pBuf, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * pSubBlock->nRow);
if (code) goto _err;
code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * pSubBlock->nRow);
if (code) goto _err;
if (pSubBlock->cmprAlg == NO_COMPRESSION) {
ASSERT(pSubBlock->vsize == sizeof(int64_t) * pSubBlock->nRow);
ASSERT(pSubBlock->ksize == sizeof(TSKEY) * pSubBlock->nRow);
// VERSION
memcpy(pBlockData->aVersion, pBuf, pSubBlock->vsize);
// TSKEY
pBuf = pBuf + pSubBlock->vsize;
memcpy(pBlockData->aTSKEY, pBuf + pSubBlock->vsize, pSubBlock->ksize);
} else {
size = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES;
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
code = tsdbRealloc(ppBuf, size);
if (code) goto _err;
}
// VERSION
n = tsDecompressBigint(pBuf, pSubBlock->vsize, pSubBlock->nRow, (char *)pBlockData->aVersion,
sizeof(int64_t) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf, size);
if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
// TSKEY
pBuf = pBuf + pSubBlock->vsize;
n = tsDecompressTimestamp(pBuf, pSubBlock->ksize, pSubBlock->nRow, (char *)pBlockData->aTSKEY,
sizeof(TSKEY) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf, size);
if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
}
return code;
_err:
return code;
}
static int32_t tsdbRecoverColData(SBlockData *pBlockData, SSubBlock *pSubBlock, SBlockCol *pBlockCol,
SColData *pColData, uint8_t *pBuf, uint8_t **ppBuf) {
int32_t code = 0;
int64_t size;
int64_t n;
ASSERT(pBlockCol->flag != HAS_NULL);
if (!taosCheckChecksumWhole(pBuf, pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM))) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
pColData->nVal = pSubBlock->nRow;
pColData->flag = pBlockCol->flag;
// bitmap
if (pBlockCol->flag != HAS_VALUE) {
size = BIT2_SIZE(pSubBlock->nRow);
code = tsdbRealloc(&pColData->pBitMap, size);
if (code) goto _err;
ASSERT(pBlockCol->bsize == size);
memcpy(pColData->pBitMap, pBuf, size);
} else {
ASSERT(pBlockCol->bsize == 0);
}
pBuf = pBuf + pBlockCol->bsize;
// value
if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
pColData->nData = pBlockCol->osize;
} else {
pColData->nData = tDataTypes[pBlockCol->type].bytes * pSubBlock->nRow;
}
code = tsdbRealloc(&pColData->pData, pColData->nData);
if (code) goto _err;
if (pSubBlock->cmprAlg == NO_COMPRESSION) {
memcpy(pColData->pData, pBuf, pColData->nData);
} else {
size = pColData->nData + COMP_OVERFLOW_BYTES;
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
code = tsdbRealloc(ppBuf, size);
if (code) goto _err;
}
n = tDataTypes[pBlockCol->type].decompFunc(pBuf, pBlockCol->csize, pSubBlock->nRow, pColData->pData,
pColData->nData, pSubBlock->cmprAlg, *ppBuf, size);
if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
ASSERT(n == pColData->nData);
}
return code;
_err:
return code;
}
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
SBlockData *pBlockData) {
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) {
int32_t code = 0;
TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD;
// TODO
uint8_t *pBuf1 = NULL;
uint8_t *pBuf2 = NULL;
ASSERT(nCol == 0 || aColId[0] != PRIMARYKEY_TIMESTAMP_COL_ID);
if (!ppBuf1) ppBuf1 = &pBuf1;
if (!ppBuf2) ppBuf2 = &pBuf2;
for (int32_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock];
int64_t offset;
int64_t size;
int64_t n;
tBlockDataReset(pBlockData);
pBlockData->nRow = pSubBlock->nRow;
// TSDBKEY
offset = pSubBlock->offset + sizeof(SBlockDataHdr);
size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM);
code = tsdbRealloc(ppBuf1, size);
if (code) goto _err;
n = taosLSeekFile(pFD, offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosReadFile(pFD, *ppBuf1, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2);
if (code) goto _err;
// OTHER
SBlockCol blockCol;
SBlockCol *pBlockCol = &blockCol;
SColData *pColData;
for (int32_t iCol = 0; iCol < nCol; iCol++) {
int16_t cid = aColId[iCol];
if (tMapDataSearch(&pSubBlock->mBlockCol, &(SBlockCol){.cid = cid}, tGetBlockCol, tBlockColCmprFn, pBlockCol) ==
0) {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData);
if (code) goto _err;
tColDataReset(pColData, pBlockCol->cid, pBlockCol->type);
if (pBlockCol->flag == HAS_NULL) {
for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
if (code) goto _err;
}
} else {
offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->vsize + pSubBlock->ksize + pBlockCol->offset;
size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
code = tsdbRealloc(ppBuf1, size);
if (code) goto _err;
// seek
n = taosLSeekFile(pFD, offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// read
n = taosReadFile(pFD, *ppBuf1, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2);
if (code) goto _err;
}
}
}
}
tsdbFree(pBuf1);
tsdbFree(pBuf2);
return code;
_err:
tsdbError("vgId:%d tsdb read col data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
tsdbFree(pBuf1);
tsdbFree(pBuf2);
return code;
}
......@@ -691,42 +908,8 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx,
pBlockData->nRow = pSubBlock->nRow;
p = *ppBuf1 + sizeof(*pHdr);
code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, pBlockData->nRow * sizeof(int64_t));
if (code) goto _err;
code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, pBlockData->nRow * sizeof(TSKEY));
code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2);
if (code) goto _err;
if (pSubBlock->cmprAlg == NO_COMPRESSION) {
ASSERT(pSubBlock->vsize == sizeof(int64_t) * pSubBlock->nRow);
ASSERT(pSubBlock->ksize == sizeof(TSKEY) * pSubBlock->nRow);
// VERSION
memcpy(pBlockData->aVersion, p, pSubBlock->vsize);
// TSKEY
memcpy(pBlockData->aTSKEY, p + pSubBlock->vsize, pSubBlock->ksize);
} else {
size = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES;
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
code = tsdbRealloc(ppBuf2, size);
if (code) goto _err;
}
// VERSION
n = tsDecompressBigint(p, pSubBlock->vsize, pSubBlock->nRow, (char *)pBlockData->aVersion,
sizeof(int64_t) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf2, size);
if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
// TSKEY
n = tsDecompressTimestamp(p + pSubBlock->vsize, pSubBlock->ksize, pSubBlock->nRow, (char *)pBlockData->aTSKEY,
sizeof(TSKEY) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf2, size);
if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
}
p = p + pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM);
for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) {
......@@ -744,56 +927,14 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx,
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
if (code) goto _err;
}
continue;
}
pColData->nVal = pSubBlock->nRow;
pColData->flag = pBlockCol->flag;
// bitmap
if (pBlockCol->flag != HAS_VALUE) {
size = BIT2_SIZE(pSubBlock->nRow);
code = tsdbRealloc(&pColData->pBitMap, size);
if (code) goto _err;
ASSERT(pBlockCol->bsize == size);
memcpy(pColData->pBitMap, p, size);
} else {
ASSERT(pBlockCol->bsize == 0);
}
p = p + pBlockCol->bsize;
// value
if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
pColData->nData = pBlockCol->osize;
} else {
pColData->nData = tDataTypes[pBlockCol->type].bytes * pSubBlock->nRow;
}
code = tsdbRealloc(&pColData->pData, pColData->nData);
if (code) goto _err;
if (pSubBlock->cmprAlg == NO_COMPRESSION) {
memcpy(pColData->pData, p, pColData->nData);
} else {
size = pColData->nData + COMP_OVERFLOW_BYTES;
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
code = tsdbRealloc(ppBuf2, size);
if (code) goto _err;
}
n = tDataTypes[pBlockCol->type].decompFunc(p, pBlockCol->csize, pSubBlock->nRow, pColData->pData, pColData->nData,
pSubBlock->cmprAlg, *ppBuf2, size);
if (n < 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _err;
}
code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, p, ppBuf2);
if (code) goto _err;
ASSERT(n == pColData->nData);
p = p + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM);
}
p = p + pBlockCol->csize + sizeof(TSCKSUM);
}
// TODO
return code;
_err:
......
......@@ -545,6 +545,16 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) {
return n;
}
int32_t tBlockColCmprFn(const void *p1, const void *p2) {
if (((SBlockCol *)p1)->cid < ((SBlockCol *)p2)->cid) {
return -1;
} else if (((SBlockCol *)p1)->cid > ((SBlockCol *)p2)->cid) {
return 1;
}
return 0;
}
// SDelIdx ======================================================
int32_t tCmprDelIdx(void const *lhs, void const *rhs) {
SDelIdx *lDelIdx = *(SDelIdx **)lhs;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册