提交 d5c2d594 编写于 作者: H Hongze Cheng

more work

上级 2cb114ca
...@@ -729,21 +729,6 @@ _err: ...@@ -729,21 +729,6 @@ _err:
return code; return code;
} }
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf2) {
int32_t code = 0;
// TODO
return code;
}
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf2) {
int32_t code = 0;
ASSERT(0);
// TODO
return code;
}
static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) { static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) {
int32_t code = 0; int32_t code = 0;
int64_t size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM); int64_t size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
...@@ -892,20 +877,25 @@ _err: ...@@ -892,20 +877,25 @@ _err:
return code; return code;
} }
static int32_t tsdbReadBlockCol(SSubBlock *pSubBlock, uint8_t *p, SArray *aBlockCol) { static int32_t tsdbReadBlockCol(uint8_t *pBuf, int32_t szBlockCol, SBlockDataHdr *pHdr, SArray *aBlockCol) {
int32_t code = 0; int32_t code = 0;
int32_t n = 0; int32_t n = 0;
SBlockCol blockCol; SBlockCol blockCol;
SBlockCol *pBlockCol = &blockCol; SBlockCol *pBlockCol = &blockCol;
if (!taosCheckChecksumWhole(p, pSubBlock->szBlockCol + sizeof(TSCKSUM))) { // checksum
if (!taosCheckChecksumWhole(pBuf, szBlockCol + sizeof(TSCKSUM))) {
code = TSDB_CODE_FILE_CORRUPTED; code = TSDB_CODE_FILE_CORRUPTED;
goto _err; goto _err;
} }
// hdr
*pHdr = *(SBlockDataHdr *)pBuf;
n += sizeof(SBlockDataHdr); n += sizeof(SBlockDataHdr);
while (n < pSubBlock->szBlockCol) {
n += tGetBlockCol(p + n, pBlockCol); // aBlockCol
while (n < szBlockCol) {
n += tGetBlockCol(pBuf + n, pBlockCol);
if (taosArrayPush(aBlockCol, pBlockCol) == NULL) { if (taosArrayPush(aBlockCol, pBlockCol) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -913,7 +903,7 @@ static int32_t tsdbReadBlockCol(SSubBlock *pSubBlock, uint8_t *p, SArray *aBlock ...@@ -913,7 +903,7 @@ static int32_t tsdbReadBlockCol(SSubBlock *pSubBlock, uint8_t *p, SArray *aBlock
} }
} }
ASSERT(n == pSubBlock->szBlockCol); ASSERT(n == szBlockCol);
return code; return code;
...@@ -921,10 +911,48 @@ _err: ...@@ -921,10 +911,48 @@ _err:
return code; return code;
} }
static int32_t tsdbReadDataArray(uint8_t *pInput, int32_t szInput, int32_t nEle, int8_t type, int8_t cmprAlg,
uint8_t **ppOut, uint8_t **ppBuf) {
int32_t code = 0;
int32_t size;
// size
if (IS_VAR_DATA_TYPE(type)) {
size = nEle;
} else {
size = tDataTypes[type].bytes * nEle;
}
// alloc
code = tRealloc(ppOut, size);
if (code) goto _exit;
// decode
if (cmprAlg == NO_COMPRESSION) {
ASSERT(szInput == size);
memcpy(*ppOut, pInput, size);
} else {
if (cmprAlg == TWO_STAGE_COMP) {
code = tRealloc(ppBuf, size + COMP_OVERFLOW_BYTES);
if (code) goto _exit;
int32_t n =
tDataTypes[type].decompFunc(pInput, szInput, nEle, *ppOut, size, cmprAlg, *ppBuf, size + COMP_OVERFLOW_BYTES);
if (n <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _exit;
}
}
}
_exit:
return code;
}
static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock, static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock,
int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf2) { uint8_t **ppBuf2) {
TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; TdFilePtr pFD = pReader->pDataFD;
SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock];
SArray *aBlockCol = NULL; SArray *aBlockCol = NULL;
int32_t code = 0; int32_t code = 0;
...@@ -974,7 +1002,7 @@ static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, S ...@@ -974,7 +1002,7 @@ static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, S
goto _err; goto _err;
} }
code = tsdbReadBlockCol(pSubBlock, *ppBuf1, aBlockCol); code = tsdbReadBlockCol(*ppBuf1, pSubBlock->szBlock, NULL /*todo*/, aBlockCol);
if (code) goto _err; if (code) goto _err;
code = tsdbReadBlockDataKey(pBlockData, pSubBlock, *ppBuf1 + pSubBlock->szBlockCol + sizeof(TSCKSUM), ppBuf2); code = tsdbReadBlockDataKey(pBlockData, pSubBlock, *ppBuf1 + pSubBlock->szBlockCol + sizeof(TSCKSUM), ppBuf2);
...@@ -1093,13 +1121,13 @@ _err: ...@@ -1093,13 +1121,13 @@ _err:
return code; return code;
} }
static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock, static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlock *pBlock, int32_t iSubBlock, SBlockData *pBlockData,
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) { uint8_t **ppBuf1, uint8_t **ppBuf2) {
int32_t code = 0; int32_t code = 0;
uint8_t *p; uint8_t *p;
int64_t size; int64_t size;
int64_t n; int64_t n;
TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; TdFilePtr pFD = pReader->pDataFD;
SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock];
SArray *aBlockCol = NULL; SArray *aBlockCol = NULL;
...@@ -1175,20 +1203,18 @@ _err: ...@@ -1175,20 +1203,18 @@ _err:
return code; return code;
} }
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData, int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf1, uint8_t **ppBuf2) { uint8_t **ppBuf2) {
int32_t code = 0; int32_t code = 0;
TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; uint8_t *pBuf1 = NULL;
uint8_t *pBuf1 = NULL; uint8_t *pBuf2 = NULL;
uint8_t *pBuf2 = NULL;
int32_t iSubBlock;
if (!ppBuf1) ppBuf1 = &pBuf1; if (!ppBuf1) ppBuf1 = &pBuf1;
if (!ppBuf2) ppBuf2 = &pBuf2; if (!ppBuf2) ppBuf2 = &pBuf2;
// read the first sub-block // read the first sub-block
iSubBlock = 0; int32_t iSubBlock = 0;
code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData, ppBuf1, ppBuf2); code = tsdbReadSubBlockData(pReader, pBlock, iSubBlock, pBlockData, ppBuf1, ppBuf2);
if (code) goto _err; if (code) goto _err;
// read remain block data and do merg // read remain block data and do merg
...@@ -1199,7 +1225,7 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p ...@@ -1199,7 +1225,7 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p
tBlockDataInit(pBlockData1); tBlockDataInit(pBlockData1);
tBlockDataInit(pBlockData2); tBlockDataInit(pBlockData2);
for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2); code = tsdbReadSubBlockData(pReader, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2);
if (code) { if (code) {
tBlockDataClear(pBlockData1, 1); tBlockDataClear(pBlockData1, 1);
tBlockDataClear(pBlockData2, 1); tBlockDataClear(pBlockData2, 1);
...@@ -1230,14 +1256,138 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p ...@@ -1230,14 +1256,138 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p
ASSERT(tsdbKeyCmprFn(&pBlock->minKey, &TSDBROW_KEY(&tBlockDataFirstRow(pBlockData))) == 0); ASSERT(tsdbKeyCmprFn(&pBlock->minKey, &TSDBROW_KEY(&tBlockDataFirstRow(pBlockData))) == 0);
ASSERT(tsdbKeyCmprFn(&pBlock->maxKey, &TSDBROW_KEY(&tBlockDataLastRow(pBlockData))) == 0); ASSERT(tsdbKeyCmprFn(&pBlock->maxKey, &TSDBROW_KEY(&tBlockDataLastRow(pBlockData))) == 0);
if (pBuf1) tFree(pBuf1); tFree(pBuf1);
if (pBuf2) tFree(pBuf2); tFree(pBuf2);
return code; return code;
_err: _err:
tsdbError("vgId:%d, tsdb read block data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d, tsdb read block data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
if (pBuf1) tFree(pBuf1); tFree(pBuf1);
if (pBuf2) tFree(pBuf2); tFree(pBuf2);
return code;
}
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1,
uint8_t **ppBuf2) {
int32_t code = 0;
tBlockDataReset(pBlockData);
uint8_t *pBuf1 = NULL;
uint8_t *pBuf2 = NULL;
if (!ppBuf1) ppBuf1 = &pBuf1;
if (!ppBuf2) ppBuf2 = &pBuf2;
// realloc
code = tRealloc(ppBuf1, pBlockL->szBlock);
if (code) goto _err;
// seek
int64_t n = taosLSeekFile(pReader->pLastFD, pBlockL->offset, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// read
n = taosReadFile(pReader->pLastFD, *ppBuf1, pBlockL->szBlock);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < pBlockL->szBlock) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// decode block col
SBlockDataHdr hdr;
SArray *aBlockCol = taosArrayInit(0, sizeof(SBlockCol));
uint8_t *p = *ppBuf1;
if (aBlockCol == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tsdbReadBlockCol(p, pBlockL->szBlockCol, &hdr, aBlockCol);
if (code) goto _err;
p += pBlockL->szBlockCol + sizeof(TSCKSUM);
// checksum
if (!taosCheckChecksumWhole(p, pBlockL->szUid + pBlockL->szVer + pBlockL->szTSKEY + sizeof(TSCKSUM))) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// UID
if (hdr.uid == 0) {
code = tsdbReadDataArray(p, pBlockL->szUid, pBlockL->nRow, TSDB_DATA_TYPE_BIGINT, pBlockL->cmprAlg,
(uint8_t **)&pBlockData->aUid, ppBuf2);
if (code) goto _err;
} else {
ASSERT(pBlockL->szUid == 0);
}
p += pBlockL->szUid;
// VERSION
code = tsdbReadDataArray(p, pBlockL->szVer, pBlockL->nRow, TSDB_DATA_TYPE_BIGINT, pBlockL->cmprAlg,
(uint8_t **)&pBlockData->aVersion, ppBuf2);
if (code) goto _err;
p += pBlockL->szVer;
// TSKEY
code = tsdbReadDataArray(p, pBlockL->szTSKEY, pBlockL->nRow, TSDB_DATA_TYPE_TIMESTAMP, pBlockL->cmprAlg,
(uint8_t **)&pBlockData->aTSKEY, ppBuf2);
if (code) goto _err;
p += pBlockL->szTSKEY;
p += sizeof(TSCKSUM);
// COLUMN
code = tBlockDataSetSchema(pBlockData, NULL, hdr.suid, hdr.uid);
if (code) goto _err;
for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(aBlockCol); iBlockCol++) {
SBlockCol *pBlockCol = (SBlockCol *)taosArrayGet(aBlockCol, iBlockCol);
SColData *pColData;
// checksum
if (!taosCheckChecksumWhole(p, pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM))) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// add SColData
code = tBlockDataAddColData(pBlockData, iBlockCol, &pColData);
if (code) goto _err;
tColDataInit(pColData, pBlockCol->cid, pBlockCol->type, pBlockCol->smaOn);
pColData->nVal = pBlockL->nRow;
pColData->flag = pBlockCol->flag;
// bitmap
if (pBlockCol->szBitmap) {
code = tsdbReadDataArray(p, pBlockCol->szBitmap, );
if (code) goto _err;
}
p += pBlockCol->szBitmap;
// offset
if (pBlockCol->szOffset) {
code = tsdbReadDataArray(p, pBlockCol->szOffset, );
if (code) goto _err;
}
p += pBlockCol->szOffset;
// value
pColData->nData = pBlockCol->szOrigin;
if (pColData->nData) {
// TODO
}
}
taosArrayDestroy(aBlockCol);
return code;
_err:
tsdbError("vgId:%d tsdb read last block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
...@@ -1832,10 +1982,12 @@ static int32_t tsdbWriteColData(SColData *pColData, SBlockCol *pBlockCol, int8_t ...@@ -1832,10 +1982,12 @@ static int32_t tsdbWriteColData(SColData *pColData, SBlockCol *pBlockCol, int8_t
// VALUE // VALUE
if (pColData->flag != (HAS_NULL | HAS_NONE)) { if (pColData->flag != (HAS_NULL | HAS_NONE)) {
ASSERT(pColData->nData);
code = tsdbWriteDataArray(pColData->pData, pColData->nData, pColData->type, cmprAlg, &pBlockCol->szValue, ppBuf1, code = tsdbWriteDataArray(pColData->pData, pColData->nData, pColData->type, cmprAlg, &pBlockCol->szValue, ppBuf1,
nBuf1 + n, ppBuf2); nBuf1 + n, ppBuf2);
if (code) goto _err; if (code) goto _err;
} else { } else {
ASSERT(pColData->nData == 0);
pBlockCol->szValue = 0; pBlockCol->szValue = 0;
} }
n += pBlockCol->szValue; n += pBlockCol->szValue;
...@@ -2073,11 +2225,11 @@ int32_t tsdbWriteLastBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ...@@ -2073,11 +2225,11 @@ int32_t tsdbWriteLastBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
pBlockL->minUid = pBlockData->aUid[0]; pBlockL->minUid = pBlockData->aUid[0];
pBlockL->maxUid = pBlockData->aUid[pBlockData->nRow - 1]; pBlockL->maxUid = pBlockData->aUid[pBlockData->nRow - 1];
} }
pBlockL->minVer = VERSION_MAX;
pBlockL->maxVer = VERSION_MIN;
pBlockL->nRow = pBlockData->nRow; pBlockL->nRow = pBlockData->nRow;
pBlockL->offset = pWriter->fLast.size; pBlockL->offset = pWriter->fLast.size;
pBlockL->cmprAlg = cmprAlg; pBlockL->cmprAlg = cmprAlg;
pBlockL->minVer = VERSION_MAX;
pBlockL->maxVer = VERSION_MIN;
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
pBlockL->minVer = TMIN(pBlockL->minVer, pBlockData->aVersion[iRow]); pBlockL->minVer = TMIN(pBlockL->minVer, pBlockData->aVersion[iRow]);
pBlockL->maxVer = TMAX(pBlockL->maxVer, pBlockData->aVersion[iRow]); pBlockL->maxVer = TMAX(pBlockL->maxVer, pBlockData->aVersion[iRow]);
......
...@@ -348,11 +348,11 @@ int32_t tPutBlockCol(uint8_t *p, void *ph) { ...@@ -348,11 +348,11 @@ int32_t tPutBlockCol(uint8_t *p, void *ph) {
n += tPutI8(p ? p + n : p, pBlockCol->flag); n += tPutI8(p ? p + n : p, pBlockCol->flag);
if (pBlockCol->flag != HAS_NULL) { if (pBlockCol->flag != HAS_NULL) {
n += tPutI32v(p ? p + n : p, pBlockCol->offset); n += tPutI32v(p ? p + n : p, pBlockCol->szOrigin);
n += tPutI32v(p ? p + n : p, pBlockCol->szBitmap); n += tPutI32v(p ? p + n : p, pBlockCol->szBitmap);
n += tPutI32v(p ? p + n : p, pBlockCol->szOffset); n += tPutI32v(p ? p + n : p, pBlockCol->szOffset);
n += tPutI32v(p ? p + n : p, pBlockCol->szValue); n += tPutI32v(p ? p + n : p, pBlockCol->szValue);
n += tPutI32v(p ? p + n : p, pBlockCol->szOrigin); n += tPutI32v(p ? p + n : p, pBlockCol->offset);
} }
return n; return n;
...@@ -370,11 +370,11 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) { ...@@ -370,11 +370,11 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) {
ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE)); ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE));
if (pBlockCol->flag != HAS_NULL) { if (pBlockCol->flag != HAS_NULL) {
n += tGetI32v(p + n, &pBlockCol->offset); n += tGetI32v(p + n, &pBlockCol->szOrigin);
n += tGetI32v(p + n, &pBlockCol->szBitmap); n += tGetI32v(p + n, &pBlockCol->szBitmap);
n += tGetI32v(p + n, &pBlockCol->szOffset); n += tGetI32v(p + n, &pBlockCol->szOffset);
n += tGetI32v(p + n, &pBlockCol->szValue); n += tGetI32v(p + n, &pBlockCol->szValue);
n += tGetI32v(p + n, &pBlockCol->szOrigin); n += tGetI32v(p + n, &pBlockCol->offset);
} }
return n; return n;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册