提交 875566d3 编写于 作者: H Hongze Cheng

adjust more

上级 47cddbfe
...@@ -152,6 +152,8 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **p ...@@ -152,6 +152,8 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **p
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[],
int32_t aBufN[]);
// SDiskDataHdr // SDiskDataHdr
int32_t tPutDiskDataHdr(uint8_t *p, void *ph); int32_t tPutDiskDataHdr(uint8_t *p, void *ph);
int32_t tGetDiskDataHdr(uint8_t *p, void *ph); int32_t tGetDiskDataHdr(uint8_t *p, void *ph);
......
...@@ -1424,111 +1424,38 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ...@@ -1424,111 +1424,38 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
pBlkInfo->szBlock = 0; pBlkInfo->szBlock = 0;
pBlkInfo->szKey = 0; pBlkInfo->szKey = 0;
// ================= DATA ==================== int32_t aBufN[4] = {0};
SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, code = tCmprBlockData(pBlockData, cmprAlg, NULL, NULL, pWriter->aBuf, aBufN);
.fmtVer = 0,
.suid = pBlockData->suid,
.uid = pBlockData->uid,
.nRow = pBlockData->nRow,
.cmprAlg = cmprAlg};
// encode =================
// columns AND SBlockCol
int32_t nBuf1 = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
ASSERT(pColData->flag);
if (pColData->flag == HAS_NONE) continue;
SBlockCol blockCol = {.cid = pColData->cid,
.type = pColData->type,
.smaOn = pColData->smaOn,
.flag = pColData->flag,
.szOrigin = pColData->nData};
if (pColData->flag != HAS_NULL) {
code = tsdbCmprColData(pColData, cmprAlg, &blockCol, &pWriter->aBuf[0], nBuf1, &pWriter->aBuf[2]);
if (code) goto _err;
blockCol.offset = nBuf1;
nBuf1 = nBuf1 + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM);
}
code = tRealloc(&pWriter->aBuf[1], hdr.szBlkCol + tPutBlockCol(NULL, &blockCol));
if (code) goto _err;
hdr.szBlkCol += tPutBlockCol(pWriter->aBuf[1] + hdr.szBlkCol, &blockCol);
}
int32_t nBuf2 = 0;
if (hdr.szBlkCol > 0) {
nBuf2 = hdr.szBlkCol + sizeof(TSCKSUM);
code = tRealloc(&pWriter->aBuf[1], nBuf2);
if (code) goto _err;
taosCalcChecksumAppend(0, pWriter->aBuf[1], nBuf2);
}
// uid + version + tskey
int32_t nBuf3 = 0;
if (pBlockData->uid == 0) {
code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg,
&pWriter->aBuf[2], nBuf3, &hdr.szUid, &pWriter->aBuf[3]);
if (code) goto _err;
}
nBuf3 += hdr.szUid;
code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT,
cmprAlg, &pWriter->aBuf[2], nBuf3, &hdr.szVer, &pWriter->aBuf[3]);
if (code) goto _err;
nBuf3 += hdr.szVer;
code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP,
cmprAlg, &pWriter->aBuf[2], nBuf3, &hdr.szKey, &pWriter->aBuf[3]);
if (code) goto _err;
nBuf3 += hdr.szKey;
nBuf3 += sizeof(TSCKSUM);
code = tRealloc(&pWriter->aBuf[2], nBuf3);
if (code) goto _err;
// hdr
int32_t nBuf4 = tPutDiskDataHdr(NULL, &hdr);
code = tRealloc(&pWriter->aBuf[3], nBuf4);
if (code) goto _err; if (code) goto _err;
tPutDiskDataHdr(pWriter->aBuf[3], &hdr);
taosCalcChecksumAppend(taosCalcChecksum(0, pWriter->aBuf[3], nBuf4), pWriter->aBuf[2], nBuf3);
// write ================= // write =================
TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD; TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD;
pBlkInfo->szKey = nBuf4 + nBuf3; pBlkInfo->szKey = aBufN[3] + aBufN[2];
pBlkInfo->szBlock = nBuf1 + nBuf2 + nBuf3 + nBuf4; pBlkInfo->szBlock = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
int64_t n = taosWriteFile(pFD, pWriter->aBuf[3], nBuf4); int64_t n = taosWriteFile(pFD, pWriter->aBuf[3], aBufN[3]);
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
n = taosWriteFile(pFD, pWriter->aBuf[2], nBuf3); n = taosWriteFile(pFD, pWriter->aBuf[2], aBufN[2]);
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
if (nBuf2) { if (aBufN[1]) {
n = taosWriteFile(pFD, pWriter->aBuf[1], nBuf2); n = taosWriteFile(pFD, pWriter->aBuf[1], aBufN[1]);
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
} }
if (nBuf1) { if (aBufN[0]) {
n = taosWriteFile(pFD, pWriter->aBuf[0], nBuf1); n = taosWriteFile(pFD, pWriter->aBuf[0], aBufN[0]);
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
......
...@@ -1500,7 +1500,8 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD ...@@ -1500,7 +1500,8 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD
*ppColData = NULL; *ppColData = NULL;
} }
int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[]) { int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[],
int32_t aBufN[]) {
int32_t code = 0; int32_t code = 0;
SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT,
...@@ -1512,7 +1513,7 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, ...@@ -1512,7 +1513,7 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut,
// encode ================= // encode =================
// columns AND SBlockCol // columns AND SBlockCol
int32_t nBuf0 = 0; aBufN[0] = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
...@@ -1527,11 +1528,11 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, ...@@ -1527,11 +1528,11 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut,
.szOrigin = pColData->nData}; .szOrigin = pColData->nData};
if (pColData->flag != HAS_NULL) { if (pColData->flag != HAS_NULL) {
code = tsdbCmprColData(pColData, cmprAlg, &blockCol, &aBuf[0], nBuf0, &aBuf[2]); code = tsdbCmprColData(pColData, cmprAlg, &blockCol, &aBuf[0], aBufN[0], &aBuf[2]);
if (code) goto _exit; if (code) goto _exit;
blockCol.offset = nBuf0; blockCol.offset = aBufN[0];
nBuf0 = nBuf0 + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM); aBufN[0] = aBufN[0] + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM);
} }
code = tRealloc(&aBuf[1], hdr.szBlkCol + tPutBlockCol(NULL, &blockCol)); code = tRealloc(&aBuf[1], hdr.szBlkCol + tPutBlockCol(NULL, &blockCol));
...@@ -1539,59 +1540,59 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, ...@@ -1539,59 +1540,59 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut,
hdr.szBlkCol += tPutBlockCol(aBuf[1] + hdr.szBlkCol, &blockCol); hdr.szBlkCol += tPutBlockCol(aBuf[1] + hdr.szBlkCol, &blockCol);
} }
int32_t nBuf1 = 0; aBufN[1] = 0;
if (hdr.szBlkCol > 0) { if (hdr.szBlkCol > 0) {
nBuf1 = hdr.szBlkCol + sizeof(TSCKSUM); aBufN[1] = hdr.szBlkCol + sizeof(TSCKSUM);
code = tRealloc(&aBuf[1], nBuf1); code = tRealloc(&aBuf[1], aBufN[1]);
if (code) goto _exit; if (code) goto _exit;
taosCalcChecksumAppend(0, aBuf[1], nBuf1); taosCalcChecksumAppend(0, aBuf[1], aBufN[1]);
} }
// uid + version + tskey // uid + version + tskey
int32_t nBuf2 = 0; aBufN[2] = 0;
if (pBlockData->uid == 0) { if (pBlockData->uid == 0) {
code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg, code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg,
&aBuf[2], nBuf2, &hdr.szUid, &aBuf[3]); &aBuf[2], aBufN[2], &hdr.szUid, &aBuf[3]);
if (code) goto _exit; if (code) goto _exit;
} }
nBuf2 += hdr.szUid; aBufN[2] += hdr.szUid;
code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT,
cmprAlg, &aBuf[2], nBuf2, &hdr.szVer, &aBuf[3]); cmprAlg, &aBuf[2], aBufN[2], &hdr.szVer, &aBuf[3]);
if (code) goto _exit; if (code) goto _exit;
nBuf2 += hdr.szVer; aBufN[2] += hdr.szVer;
code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP, code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP,
cmprAlg, &aBuf[2], nBuf2, &hdr.szKey, &aBuf[3]); cmprAlg, &aBuf[2], aBufN[2], &hdr.szKey, &aBuf[3]);
if (code) goto _exit; if (code) goto _exit;
nBuf2 += hdr.szKey; aBufN[2] += hdr.szKey;
nBuf2 += sizeof(TSCKSUM); aBufN[2] += sizeof(TSCKSUM);
code = tRealloc(&aBuf[2], nBuf2); code = tRealloc(&aBuf[2], aBufN[2]);
if (code) goto _exit; if (code) goto _exit;
// hdr // hdr
int32_t nBuf3 = tPutDiskDataHdr(NULL, &hdr); aBufN[3] = tPutDiskDataHdr(NULL, &hdr);
code = tRealloc(&aBuf[3], nBuf3); code = tRealloc(&aBuf[3], aBufN[3]);
if (code) goto _exit; if (code) goto _exit;
tPutDiskDataHdr(aBuf[3], &hdr); tPutDiskDataHdr(aBuf[3], &hdr);
taosCalcChecksumAppend(taosCalcChecksum(0, aBuf[3], nBuf3), aBuf[2], nBuf2); taosCalcChecksumAppend(taosCalcChecksum(0, aBuf[3], aBufN[3]), aBuf[2], aBufN[2]);
// aggragate // aggragate
if (ppOut) { if (ppOut) {
*szOut = nBuf0 + nBuf1 + nBuf2 + nBuf3; *szOut = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3];
code = tRealloc(ppOut, *szOut); code = tRealloc(ppOut, *szOut);
if (code) goto _exit; if (code) goto _exit;
memcpy(*ppOut, aBuf[3], nBuf3); memcpy(*ppOut, aBuf[3], aBufN[3]);
memcpy(*ppOut + nBuf3, aBuf[2], nBuf2); memcpy(*ppOut + aBufN[3], aBuf[2], aBufN[2]);
if (nBuf1) { if (aBufN[1]) {
memcpy(*ppOut + nBuf3 + nBuf2, aBuf[1], nBuf1); memcpy(*ppOut + aBufN[3] + aBufN[2], aBuf[1], aBufN[1]);
} }
if (nBuf0) { if (aBufN[0]) {
memcpy(*ppOut + nBuf3 + nBuf2 + nBuf1, aBuf[0], nBuf0); memcpy(*ppOut + aBufN[3] + aBufN[2] + aBufN[1], aBuf[0], aBufN[0]);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册