提交 94069ec2 编写于 作者: H Hongze Cheng

more work

上级 2d503135
...@@ -1926,6 +1926,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ...@@ -1926,6 +1926,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
} }
// encode ================= // encode =================
// columns
int32_t nBuf1 = 0; int32_t nBuf1 = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
...@@ -1956,27 +1957,34 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ...@@ -1956,27 +1957,34 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
hdr.szBlkCol += tPutBlockCol(NULL, &blockCol); hdr.szBlkCol += tPutBlockCol(NULL, &blockCol);
} }
// (uid + version + tskey + aBlockCol) // uid
if (pBlockData->uid == 0) { if (pBlockData->uid == 0) {
code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg, code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg,
&pWriter->pBuf2, 0, &hdr.szUid, &pWriter->pBuf3); &pWriter->pBuf2, 0, &hdr.szUid, &pWriter->pBuf3);
if (code) goto _err; if (code) goto _err;
} }
// version
code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT,
cmprAlg, &pWriter->pBuf2, hdr.szUid, &hdr.szVer, &pWriter->pBuf3); cmprAlg, &pWriter->pBuf2, hdr.szUid, &hdr.szVer, &pWriter->pBuf3);
if (code) goto _err; if (code) goto _err;
// tskey
code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP, code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP,
cmprAlg, &pWriter->pBuf2, hdr.szUid + hdr.szVer, &hdr.szKey, &pWriter->pBuf3); cmprAlg, &pWriter->pBuf2, hdr.szUid + hdr.szVer, &hdr.szKey, &pWriter->pBuf3);
if (code) goto _err; if (code) goto _err;
// hdr
pBlkInfo->szKey = tPutDiskDataHdr(NULL, &hdr); pBlkInfo->szKey = tPutDiskDataHdr(NULL, &hdr);
code = tRealloc(&pWriter->pBuf3, pBlkInfo->szKey); code = tRealloc(&pWriter->pBuf3, pBlkInfo->szKey);
if (code) goto _err; if (code) goto _err;
tPutDiskDataHdr(pWriter->pBuf3, &hdr); tPutDiskDataHdr(pWriter->pBuf3, &hdr);
TSCKSUM cksm = taosCalcChecksum(0, pWriter->pBuf3, pBlkInfo->szKey); TSCKSUM cksm = taosCalcChecksum(0, pWriter->pBuf3, pBlkInfo->szKey);
code = tRealloc(&pWriter->pBuf2, hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM));
if (code) goto _err;
taosCalcChecksumAppend(cksm, pWriter->pBuf2, hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM));
// write ================= // write =================
TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD; TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD;
...@@ -1988,17 +1996,43 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ...@@ -1988,17 +1996,43 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
} }
// uid + version + tskey + (CKSM) // uid + version + tskey + (CKSM)
taosCalcChecksumAppend(cksm, pWriter->pBuf2, hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM));
n = taosWriteFile(pFD, pWriter->pBuf2, hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM)); n = taosWriteFile(pFD, pWriter->pBuf2, hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM));
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
pBlkInfo->szKey = pBlkInfo->szKey + hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM); pBlkInfo->szKey = pBlkInfo->szKey + hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM);
pBlkInfo->szBlock += pBlkInfo->szKey;
// aBlockCol // aBlockCol
if (hdr.szBlkCol > 0) {
code = tRealloc(&pWriter->pBuf2, hdr.szBlkCol + sizeof(TSCKSUM));
if (code) goto _err;
n = 0;
for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(aBlockCol); iBlockCol++) {
n += tPutBlockCol(pWriter->pBuf2 + n, taosArrayGet(aBlockCol, iBlockCol));
}
ASSERT(n == hdr.szBlkCol);
taosCalcChecksumAppend(0, pWriter->pBuf2, hdr.szBlkCol + sizeof(TSCKSUM));
n = taosWriteFile(pFD, pWriter->pBuf2, hdr.szBlkCol + sizeof(TSCKSUM));
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
}
// colmns // colmns
if (nBuf1 > 0) {
n = taosWriteFile(pFD, pWriter->pBuf1, nBuf1);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pBlkInfo->szBlock += nBuf1;
}
// update info // update info
if (toLast) { if (toLast) {
......
...@@ -1793,90 +1793,3 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol ...@@ -1793,90 +1793,3 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol
_exit: _exit:
return code; return code;
} }
#if 0
int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg) {
int32_t code = 0;
ASSERT(pBlockData->nRow > 0);
pDiskData->cmprAlg = cmprAlg;
pDiskData->nRow = pBlockData->nRow;
pDiskData->suid = pBlockData->suid;
pDiskData->uid = pBlockData->uid;
pDiskData->szUid = 0;
pDiskData->szVer = 0;
pDiskData->szKey = 0;
taosArrayClear(pDiskData->aBlockCol);
pDiskData->nBuf = 0;
{
pDiskData->ppKey = tDiskDataAllocBuf(pDiskData);
if (pDiskData->ppKey == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
int32_t n = 0;
// uid
if (pDiskData->uid == 0) {
code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT,
cmprAlg, pDiskData->ppKey, n, &pDiskData->szUid, NULL);
if (code) goto _exit;
} else {
pDiskData->szUid = 0;
}
n += pDiskData->szUid;
// version
code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT,
cmprAlg, pDiskData->ppKey, n, &pDiskData->szVer, NULL);
if (code) goto _exit;
n += pDiskData->szVer;
// tskey
code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP,
cmprAlg, pDiskData->ppKey, &pDiskData->szKey, NULL);
if (code) goto _exit;
}
// columns
int32_t offset = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
if (pColData->flag == HAS_NONE) continue;
SBlockCol blockCol = {.cid = pColData->cid,
.type = pColData->type,
.smaOn = pColData->smaOn,
.flag = pColData->flag,
.szOrigin = pColData->nData};
if (pColData->flag != HAS_NULL) {
// alloc a buffer
blockCol.ppData = tDiskDataAllocBuf(pDiskData);
if (blockCol.ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// compress
code = tsdbCmprColData(pColData, cmprAlg, &blockCol, NULL);
if (code) goto _exit;
// update offset
blockCol.offset = offset;
offset = offset + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue;
}
if (taosArrayPush(pDiskData->aBlockCol, &blockCol) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
_exit:
return code;
}
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册