提交 2d503135 编写于 作者: H Hongze Cheng

more work

上级 530c3a55
...@@ -63,6 +63,7 @@ typedef struct SRowMerger SRowMerger; ...@@ -63,6 +63,7 @@ typedef struct SRowMerger SRowMerger;
typedef struct STsdbReadSnap STsdbReadSnap; typedef struct STsdbReadSnap STsdbReadSnap;
typedef struct SBlockInfo SBlockInfo; typedef struct SBlockInfo SBlockInfo;
typedef struct SSmaInfo SSmaInfo; typedef struct SSmaInfo SSmaInfo;
typedef struct SBlockCol SBlockCol;
#define TSDB_MAX_SUBBLOCKS 8 #define TSDB_MAX_SUBBLOCKS 8
#define TSDB_FHDR_SIZE 512 #define TSDB_FHDR_SIZE 512
...@@ -181,7 +182,8 @@ int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg); ...@@ -181,7 +182,8 @@ int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg);
int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg); int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg);
int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut, int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut,
int32_t *szOut, uint8_t **ppBuf); int32_t *szOut, uint8_t **ppBuf);
int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppBuf); int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppOut, int8_t nOut,
uint8_t **ppBuf);
// tsdbMemTable ============================================================================================== // tsdbMemTable ==============================================================================================
// SMemTable // SMemTable
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
...@@ -400,7 +402,7 @@ struct SMapData { ...@@ -400,7 +402,7 @@ struct SMapData {
uint8_t *pData; uint8_t *pData;
}; };
typedef struct { struct SBlockCol {
int16_t cid; int16_t cid;
int8_t type; int8_t type;
int8_t smaOn; int8_t smaOn;
...@@ -410,7 +412,7 @@ typedef struct { ...@@ -410,7 +412,7 @@ typedef struct {
int32_t szOffset; // offset size, 0 only for non-variant-length type int32_t szOffset; // offset size, 0 only for non-variant-length type
int32_t szValue; // value size, 0 when flag == (HAS_NULL | HAS_NONE) int32_t szValue; // value size, 0 when flag == (HAS_NULL | HAS_NONE)
int32_t offset; int32_t offset;
} SBlockCol; };
struct SBlockInfo { struct SBlockInfo {
int64_t offset; // block data offset int64_t offset; // block data offset
...@@ -600,6 +602,7 @@ struct SDataFWriter { ...@@ -600,6 +602,7 @@ struct SDataFWriter {
uint8_t *pBuf1; uint8_t *pBuf1;
uint8_t *pBuf2; uint8_t *pBuf2;
uint8_t *pBuf3;
}; };
struct STsdbReadSnap { struct STsdbReadSnap {
......
...@@ -1414,7 +1414,6 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS ...@@ -1414,7 +1414,6 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
code = tDiskDataInit(&pWriter->dData);
if (code) goto _err; if (code) goto _err;
pWriter->pTsdb = pTsdb; pWriter->pTsdb = pTsdb;
pWriter->wSet = (SDFileSet){.diskId = pSet->diskId, pWriter->wSet = (SDFileSet){.diskId = pSet->diskId,
...@@ -1594,9 +1593,9 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) { ...@@ -1594,9 +1593,9 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) {
goto _err; goto _err;
} }
tDiskDataClear(&(*ppWriter)->dData);
tFree((*ppWriter)->pBuf1); tFree((*ppWriter)->pBuf1);
tFree((*ppWriter)->pBuf2); tFree((*ppWriter)->pBuf2);
tFree((*ppWriter)->pBuf3);
taosMemoryFree(*ppWriter); taosMemoryFree(*ppWriter);
_exit: _exit:
*ppWriter = NULL; *ppWriter = NULL;
...@@ -1905,11 +1904,14 @@ _err: ...@@ -1905,11 +1904,14 @@ _err:
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
int8_t cmprAlg, int8_t toLast) { int8_t cmprAlg, int8_t toLast) {
int32_t code = 0; int32_t code = 0;
TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD;
ASSERT(pBlockData->nRow > 0); ASSERT(pBlockData->nRow > 0);
pBlkInfo->offset = toLast ? pWriter->fLast.size : pWriter->fData.size;
pBlkInfo->szBlock = 0;
pBlkInfo->szKey = 0;
// ================= DATA ==================== // ================= DATA ====================
SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT,
.suid = pBlockData->suid, .suid = pBlockData->suid,
...@@ -1923,23 +1925,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ...@@ -1923,23 +1925,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
goto _err; goto _err;
} }
// uid // encode =================
if (pBlockData->uid == 0) { int32_t nBuf1 = 0;
ASSERT(toLast);
code = tsdbCmprData();
if (code) goto _err;
}
// version
code = tsdbCmprData();
if (code) goto _err;
// ts
code = tsdbCmprData();
if (code) goto _err;
// columns
int32_t offset = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
...@@ -1954,15 +1941,71 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ...@@ -1954,15 +1941,71 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
.szOrigin = pColData->nData}; .szOrigin = pColData->nData};
if (pColData->flag != HAS_NULL) { if (pColData->flag != HAS_NULL) {
code = tsdbCmprColData(pColData, cmprAlg, &blockCol, &pWriter->pBuf1, nBuf1, &pWriter->pBuf3);
if (code) goto _err;
blockCol.offset = nBuf1;
nBuf1 = nBuf1 + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM);
} }
if (taosArrayPush(aBlockCol, &blockCol) == NULL) { if (taosArrayPush(aBlockCol, &blockCol) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
hdr.szBlkCol += tPutBlockCol(NULL, &blockCol);
} }
// write // (uid + version + tskey + aBlockCol)
if (pBlockData->uid == 0) {
code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg,
&pWriter->pBuf2, 0, &hdr.szUid, &pWriter->pBuf3);
if (code) goto _err;
}
code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT,
cmprAlg, &pWriter->pBuf2, hdr.szUid, &hdr.szVer, &pWriter->pBuf3);
if (code) goto _err;
code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP,
cmprAlg, &pWriter->pBuf2, hdr.szUid + hdr.szVer, &hdr.szKey, &pWriter->pBuf3);
if (code) goto _err;
pBlkInfo->szKey = tPutDiskDataHdr(NULL, &hdr);
code = tRealloc(&pWriter->pBuf3, pBlkInfo->szKey);
if (code) goto _err;
tPutDiskDataHdr(pWriter->pBuf3, &hdr);
TSCKSUM cksm = taosCalcChecksum(0, pWriter->pBuf3, pBlkInfo->szKey);
// write =================
TdFilePtr pFD = toLast ? pWriter->pLastFD : pWriter->pDataFD;
// hdr
int64_t n = taosWriteFile(pFD, pWriter->pBuf3, pBlkInfo->szKey);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// uid + version + tskey + (CKSM)
taosCalcChecksumAppend(cksm, pWriter->pBuf2, hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM));
n = taosWriteFile(pFD, pWriter->pBuf2, hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM));
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pBlkInfo->szKey = pBlkInfo->szKey + hdr.szUid + hdr.szVer + hdr.szKey + sizeof(TSCKSUM);
// aBlockCol
// colmns
// update info
if (toLast) {
pWriter->fLast.size += pBlkInfo->szBlock;
} else {
pWriter->fData.size += pBlkInfo->szBlock;
}
// ================= SMA ==================== // ================= SMA ====================
if (pSmaInfo) { if (pSmaInfo) {
...@@ -1971,10 +2014,12 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ...@@ -1971,10 +2014,12 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
} }
_exit: _exit:
taosArrayDestroy(aBlockCol);
return code; return code;
_err: _err:
tsdbError("vgId:%d tsdb write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d tsdb write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
taosArrayDestroy(aBlockCol);
return code; return code;
} }
......
...@@ -1373,11 +1373,11 @@ int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlock ...@@ -1373,11 +1373,11 @@ int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlock
c = tsdbKeyCmprFn(&TSDBROW_KEY(&row1), &TSDBROW_KEY(&row2)); c = tsdbKeyCmprFn(&TSDBROW_KEY(&row1), &TSDBROW_KEY(&row2));
if (c < 0) { if (c < 0) {
code = tBlockDataAppendRow(pBlockData, &row1, NULL); // code = tBlockDataAppendRow(pBlockData, &row1, NULL);
if (code) goto _exit; if (code) goto _exit;
iRow1++; iRow1++;
} else if (c > 0) { } else if (c > 0) {
code = tBlockDataAppendRow(pBlockData, &row2, NULL); // code = tBlockDataAppendRow(pBlockData, &row2, NULL);
if (code) goto _exit; if (code) goto _exit;
iRow2++; iRow2++;
} else { } else {
...@@ -1387,14 +1387,14 @@ int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlock ...@@ -1387,14 +1387,14 @@ int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlock
while (iRow1 < nRow1) { while (iRow1 < nRow1) {
row1 = tsdbRowFromBlockData(pBlockData1, iRow1); row1 = tsdbRowFromBlockData(pBlockData1, iRow1);
code = tBlockDataAppendRow(pBlockData, &row1, NULL); // code = tBlockDataAppendRow(pBlockData, &row1, NULL);
if (code) goto _exit; if (code) goto _exit;
iRow1++; iRow1++;
} }
while (iRow2 < nRow2) { while (iRow2 < nRow2) {
row2 = tsdbRowFromBlockData(pBlockData2, iRow2); row2 = tsdbRowFromBlockData(pBlockData2, iRow2);
code = tBlockDataAppendRow(pBlockData, &row2, NULL); // code = tBlockDataAppendRow(pBlockData, &row2, NULL);
if (code) goto _exit; if (code) goto _exit;
iRow2++; iRow2++;
} }
...@@ -1751,43 +1751,44 @@ _exit: ...@@ -1751,43 +1751,44 @@ _exit:
return code; return code;
} }
int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppBuf) { int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppOut, int8_t nOut,
uint8_t **ppBuf) {
int32_t code = 0; int32_t code = 0;
int32_t n = 0; pBlockCol->szBitmap = 0;
pBlockCol->szOffset = 0;
pBlockCol->szValue = 0;
int32_t size = 0;
// bitmap // bitmap
if (pColData->flag != HAS_VALUE) { if (pColData->flag != HAS_VALUE) {
code = tsdbCmprData(pColData->pBitMap, BIT2_SIZE(pColData->nVal), TSDB_DATA_TYPE_TINYINT, cmprAlg, code = tsdbCmprData(pColData->pBitMap, BIT2_SIZE(pColData->nVal), TSDB_DATA_TYPE_TINYINT, cmprAlg, ppOut,
pBlockCol->ppData, n, &pBlockCol->szBitmap, ppBuf); nOut + size, &pBlockCol->szBitmap, ppBuf);
if (code) goto _exit; if (code) goto _exit;
} else {
pBlockCol->szBitmap = 0;
} }
n += pBlockCol->szBitmap; size += pBlockCol->szBitmap;
// offset // offset
if (IS_VAR_DATA_TYPE(pColData->type)) { if (IS_VAR_DATA_TYPE(pColData->type)) {
code = tsdbCmprData((uint8_t *)pColData->aOffset, sizeof(int32_t) * pColData->nVal, TSDB_DATA_TYPE_INT, cmprAlg, code = tsdbCmprData((uint8_t *)pColData->aOffset, sizeof(int32_t) * pColData->nVal, TSDB_DATA_TYPE_INT, cmprAlg,
pBlockCol->ppData, n, &pBlockCol->szOffset, ppBuf); ppOut, nOut + size, &pBlockCol->szOffset, ppBuf);
if (code) goto _exit; if (code) goto _exit;
} else {
pBlockCol->szOffset = 0;
} }
n += pBlockCol->szOffset; size += pBlockCol->szOffset;
// value // value
if (pColData->flag != (HAS_NULL | HAS_NONE)) { if (pColData->flag != (HAS_NULL | HAS_NONE)) {
code = tsdbCmprData((uint8_t *)pColData->pData, pColData->nData, pColData->type, cmprAlg, pBlockCol->ppData, n, code = tsdbCmprData((uint8_t *)pColData->pData, pColData->nData, pColData->type, cmprAlg, ppOut, nOut + size,
&pBlockCol->szValue, ppBuf); &pBlockCol->szValue, ppBuf);
if (code) goto _exit; if (code) goto _exit;
} else {
pBlockCol->szValue = 0;
} }
n += pBlockCol->szValue; size += pBlockCol->szValue;
// checksum // checksum
n += sizeof(TSCKSUM); size += sizeof(TSCKSUM);
taosCalcChecksumAppend(0, *ppBuf, n); code = tRealloc(ppOut, nOut + size);
if (code) goto _exit;
taosCalcChecksumAppend(0, *ppOut + nOut, size);
_exit: _exit:
return code; return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册