提交 0929326c 编写于 作者: H Hongze Cheng

more code

上级 f870f7e4
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
typedef struct SDiskDataBuilder SDiskDataBuilder; typedef struct SDiskDataBuilder SDiskDataBuilder;
typedef struct SDiskColBuilder SDiskColBuilder; typedef struct SDiskColBuilder SDiskColBuilder;
typedef struct SDiskCol SDiskCol;
typedef struct SDiskData SDiskData; typedef struct SDiskData SDiskData;
struct SDiskColBuilder { struct SDiskColBuilder {
...@@ -31,6 +32,34 @@ struct SDiskColBuilder { ...@@ -31,6 +32,34 @@ struct SDiskColBuilder {
SCompressor *pValC; SCompressor *pValC;
}; };
struct SDiskDataBuilder {
int64_t suid;
int64_t uid;
int32_t nRow;
uint8_t cmprAlg;
SCompressor *pUidC;
SCompressor *pVerC;
SCompressor *pKeyC;
int32_t nBuilder;
SArray *aBuilder;
uint8_t *aBuf[2];
};
struct SDiskCol {
SBlockCol bCol;
const uint8_t *pBit;
const uint8_t *pOff;
const uint8_t *pVal;
};
struct SDiskData {
SDiskDataHdr hdr;
const uint8_t *pUid;
const uint8_t *pVer;
const uint8_t *pKey;
SArray *aDiskCol; // SArray<SDiskCol>
};
// SDiskColBuilder ================================================ // SDiskColBuilder ================================================
static int32_t tDiskColInit(SDiskColBuilder *pBuilder, int16_t cid, int8_t type, uint8_t cmprAlg) { static int32_t tDiskColInit(SDiskColBuilder *pBuilder, int16_t cid, int8_t type, uint8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
...@@ -71,28 +100,40 @@ static int32_t tDiskColClear(SDiskColBuilder *pBuilder) { ...@@ -71,28 +100,40 @@ static int32_t tDiskColClear(SDiskColBuilder *pBuilder) {
return code; return code;
} }
static int32_t tGnrtDiskCol(SDiskColBuilder *pBuilder, SBlockCol *pBlockCol, const uint8_t **ppData) { static int32_t tGnrtDiskCol(SDiskColBuilder *pBuilder, SDiskCol *pDiskCol) {
int32_t code = 0; int32_t code = 0;
ASSERT(pBuilder->flag && pBuilder->flag != HAS_NONE); ASSERT(pBuilder->flag && pBuilder->flag != HAS_NONE);
if (pBuilder->flag == HAS_NULL) { pDiskCol->bCol = (SBlockCol){.cid = pBuilder->cid,
return code; .type = pBuilder->type,
} .smaOn = 1, /* todo */
.flag = pBuilder->flag,
// bitmap (todo) .szOrigin = 0, // todo
.szBitmap = 0, // todo
.szOffset = 0,
.szValue = 0, // todo
.offset = 0};
pDiskCol->pBit = NULL;
pDiskCol->pOff = NULL;
pDiskCol->pVal = NULL;
if (pBuilder->flag == HAS_NULL) return code;
// BITMAP
if (pBuilder->flag != HAS_VALUE) { if (pBuilder->flag != HAS_VALUE) {
// TODO
} }
// offset (todo) // OFFSET
if (IS_VAR_DATA_TYPE(pBuilder->type)) { if (IS_VAR_DATA_TYPE(pBuilder->type)) {
code = tCompGen(pBuilder->pOffC, NULL /* todo */, NULL /* todo */); code = tCompGen(pBuilder->pOffC, &pDiskCol->pOff, &pDiskCol->bCol.szOffset);
if (code) return code; if (code) return code;
} }
// value (todo) // VALUE
if (pBuilder->flag != (HAS_NULL | HAS_NONE)) { if (pBuilder->flag != (HAS_NULL | HAS_NONE)) {
code = tCompGen(pBuilder->pValC, NULL /* todo */, NULL /* todo */); code = tCompGen(pBuilder->pValC, &pDiskCol->pVal, &pDiskCol->bCol.szValue);
if (code) return code; if (code) return code;
} }
...@@ -368,19 +409,6 @@ static int32_t (*tDiskColAddValImpl[])(SDiskColBuilder *pBuilder, SColVal *pColV ...@@ -368,19 +409,6 @@ static int32_t (*tDiskColAddValImpl[])(SDiskColBuilder *pBuilder, SColVal *pColV
}; };
// SDiskDataBuilder ================================================ // SDiskDataBuilder ================================================
struct SDiskDataBuilder {
int64_t suid;
int64_t uid;
int32_t nRow;
uint8_t cmprAlg;
SCompressor *pUidC;
SCompressor *pVerC;
SCompressor *pKeyC;
int32_t nDiskCol;
SArray *aDiskColBuilder;
uint8_t *aBuf[2];
};
int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg) { int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
...@@ -410,32 +438,32 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB ...@@ -410,32 +438,32 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB
code = tCompressorReset(pBuilder->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg); code = tCompressorReset(pBuilder->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg);
if (code) return code; if (code) return code;
if (pBuilder->aDiskColBuilder == NULL) { if (pBuilder->aBuilder == NULL) {
pBuilder->aDiskColBuilder = taosArrayInit(pTSchema->numOfCols - 1, sizeof(SDiskColBuilder)); pBuilder->aBuilder = taosArrayInit(pTSchema->numOfCols - 1, sizeof(SDiskColBuilder));
if (pBuilder->aDiskColBuilder == NULL) { if (pBuilder->aBuilder == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
return code; return code;
} }
} }
pBuilder->nDiskCol = 0; pBuilder->nBuilder = 0;
for (int32_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) { for (int32_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
STColumn *pTColumn = &pTSchema->columns[iCol]; STColumn *pTColumn = &pTSchema->columns[iCol];
if (pBuilder->nDiskCol >= taosArrayGetSize(pBuilder->aDiskColBuilder)) { if (pBuilder->nBuilder >= taosArrayGetSize(pBuilder->aBuilder)) {
SDiskColBuilder dc = (SDiskColBuilder){0}; SDiskColBuilder dc = (SDiskColBuilder){0};
if (taosArrayPush(pBuilder->aDiskColBuilder, &dc) == NULL) { if (taosArrayPush(pBuilder->aBuilder, &dc) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
return code; return code;
} }
} }
SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aDiskColBuilder, pBuilder->nDiskCol); SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, pBuilder->nBuilder);
code = tDiskColInit(pDiskColBuilder, pTColumn->colId, pTColumn->type, cmprAlg); code = tDiskColInit(pDiskColBuilder, pTColumn->colId, pTColumn->type, cmprAlg);
if (code) return code; if (code) return code;
pBuilder->nDiskCol++; pBuilder->nBuilder++;
} }
return code; return code;
...@@ -448,13 +476,12 @@ int32_t tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder) { ...@@ -448,13 +476,12 @@ int32_t tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder) {
if (pBuilder->pVerC) tCompressorDestroy(pBuilder->pVerC); if (pBuilder->pVerC) tCompressorDestroy(pBuilder->pVerC);
if (pBuilder->pKeyC) tCompressorDestroy(pBuilder->pKeyC); if (pBuilder->pKeyC) tCompressorDestroy(pBuilder->pKeyC);
if (pBuilder->aDiskColBuilder) { if (pBuilder->aBuilder) {
for (int32_t iDiskColBuilder = 0; iDiskColBuilder < taosArrayGetSize(pBuilder->aDiskColBuilder); for (int32_t iDiskColBuilder = 0; iDiskColBuilder < taosArrayGetSize(pBuilder->aBuilder); iDiskColBuilder++) {
iDiskColBuilder++) { SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iDiskColBuilder);
SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aDiskColBuilder, iDiskColBuilder);
tDiskColClear(pDiskColBuilder); tDiskColClear(pDiskColBuilder);
} }
taosArrayDestroy(pBuilder->aDiskColBuilder); taosArrayDestroy(pBuilder->aBuilder);
} }
for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) { for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) {
tFree(pBuilder->aBuf[iBuf]); tFree(pBuilder->aBuf[iBuf]);
...@@ -495,8 +522,8 @@ int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSche ...@@ -495,8 +522,8 @@ int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSche
tRowIterInit(&iter, pRow, pTSchema); tRowIterInit(&iter, pRow, pTSchema);
SColVal *pColVal = tRowIterNext(&iter); SColVal *pColVal = tRowIterNext(&iter);
for (int32_t iDiskColBuilder = 0; iDiskColBuilder < pBuilder->nDiskCol; iDiskColBuilder++) { for (int32_t iDiskColBuilder = 0; iDiskColBuilder < pBuilder->nBuilder; iDiskColBuilder++) {
SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aDiskColBuilder, iDiskColBuilder); SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iDiskColBuilder);
while (pColVal && pColVal->cid < pDiskColBuilder->cid) { while (pColVal && pColVal->cid < pDiskColBuilder->cid) {
pColVal = tRowIterNext(&iter); pColVal = tRowIterNext(&iter);
...@@ -523,102 +550,74 @@ int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) { ...@@ -523,102 +550,74 @@ int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) {
ASSERT(pBuilder->nRow); ASSERT(pBuilder->nRow);
SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, // reset SDiskData
.fmtVer = 0, pDiskData->hdr = (SDiskDataHdr){.delimiter = TSDB_FILE_DLMT,
.suid = pBuilder->suid, .fmtVer = 0,
.uid = pBuilder->uid, .suid = pBuilder->suid,
.szUid = 0, .uid = pBuilder->uid,
.szVer = 0, .szUid = 0,
.szKey = 0, .szVer = 0,
.szBlkCol = 0, .szKey = 0,
.nRow = pBuilder->nRow, .szBlkCol = 0,
.cmprAlg = pBuilder->cmprAlg}; .nRow = pBuilder->nRow,
.cmprAlg = pBuilder->cmprAlg};
pDiskData->pUid = NULL;
pDiskData->pVer = NULL;
pDiskData->pKey = NULL;
if (pDiskData->aDiskCol) {
taosArrayClear(pDiskData->aDiskCol);
} else {
pDiskData->aDiskCol = taosArrayInit(pBuilder->nBuilder, sizeof(SDiskCol));
if (pDiskData->aDiskCol == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
}
// UID // UID
const uint8_t *pUid = NULL;
if (pBuilder->uid == 0) { if (pBuilder->uid == 0) {
code = tCompGen(pBuilder->pUidC, &pUid, &hdr.szUid); code = tCompGen(pBuilder->pUidC, &pDiskData->pUid, &pDiskData->hdr.szUid);
if (code) return code; if (code) return code;
} }
// VERSION // VERSION
const uint8_t *pVer = NULL; code = tCompGen(pBuilder->pVerC, &pDiskData->pVer, &pDiskData->hdr.szVer);
code = tCompGen(pBuilder->pVerC, &pVer, &hdr.szVer);
if (code) return code; if (code) return code;
// TSKEY // TSKEY
const uint8_t *pKey = NULL; code = tCompGen(pBuilder->pKeyC, &pDiskData->pKey, &pDiskData->hdr.szKey);
code = tCompGen(pBuilder->pKeyC, &pKey, &hdr.szKey);
if (code) return code; if (code) return code;
int32_t offset = 0; int32_t offset = 0;
for (int32_t iDiskColBuilder = 0; iDiskColBuilder < pBuilder->nDiskCol; iDiskColBuilder++) { for (int32_t iBuilder = 0; iBuilder < pBuilder->nBuilder; iBuilder++) {
SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aDiskColBuilder, iDiskColBuilder); SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iBuilder);
if (pDCBuilder->flag == HAS_NONE) continue;
if (pDiskColBuilder->flag == HAS_NONE) continue; SDiskCol dCol;
code = tGnrtDiskCol(pDiskColBuilder, NULL, NULL); code = tGnrtDiskCol(pDCBuilder, &dCol);
if (code) return code; if (code) return code;
SBlockCol bCol = {.cid = pDiskColBuilder->cid, dCol.bCol.offset = offset;
.type = pDiskColBuilder->type, offset = offset + dCol.bCol.szBitmap + dCol.bCol.szOffset + dCol.bCol.szValue;
// .smaOn = ,
.flag = pDiskColBuilder->flag,
// .szOrigin =
// .szBitmap =
// .szOffset =
// .szValue =
.offset = offset};
hdr.szBlkCol += tPutBlockCol(NULL, &bCol);
offset = offset + bCol.szBitmap + bCol.szOffset + bCol.szValue;
}
#if 0 if (taosArrayPush(pDiskData->aDiskCol, &dCol) == NULL) {
*nData = tPutDiskDataHdr(NULL, &hdr) + hdr.szUid + hdr.szVer + hdr.szKey + hdr.szBlkCol + offset; code = TSDB_CODE_OUT_OF_MEMORY;
code = tRealloc(&pBuilder->aBuf[0], *nData); return code;
if (code) return code; }
*ppData = pBuilder->aBuf[0];
int32_t n = 0; if (pDCBuilder->flag != HAS_NULL) {
n += tPutDiskDataHdr(pBuilder->aBuf[0] + n, &hdr); pDiskData->hdr.szBlkCol += tPutBlockCol(NULL, &dCol.bCol);
if (hdr.szUid) { }
memcpy(pBuilder->aBuf[0] + n, pUid, hdr.szUid);
n += hdr.szUid;
}
memcpy(pBuilder->aBuf[0] + n, pVer, hdr.szVer);
n += hdr.szVer;
memcpy(pBuilder->aBuf[0] + n, pKey, hdr.szKey);
n += hdr.szKey;
for (int32_t iDiskColBuilder = 0; iDiskColBuilder < pBuilder->nDiskCol; iDiskColBuilder++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskColBuilder, iDiskColBuilder);
n += tPutBlockCol(pBuilder->aBuf[0] + n, NULL /*pDiskCol->bCol (todo) */);
}
for (int32_t iDiskColBuilder = 0; iDiskColBuilder < pBuilder->nDiskCol; iDiskColBuilder++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskColBuilder, iDiskColBuilder);
// memcpy(pDiskData->aBuf[0] + n, NULL, );
// n += 0;
} }
#endif
return code; return code;
} }
// SDiskData ================================================ // SDiskData ================================================
struct SDiskData {
SDiskDataHdr hdr;
const uint8_t *pUid;
const uint8_t *pVer;
const uint8_t *pKey;
SArray *aBlockCol;
SArray *aColData;
};
int32_t tDiskDataDestroy(SDiskData *pDiskData) { int32_t tDiskDataDestroy(SDiskData *pDiskData) {
int32_t code = 0; int32_t code = 0;
if (pDiskData->aDiskCol) taosArrayDestroy(pDiskData->aDiskCol);
if (pDiskData->aBlockCol) taosArrayDestroy(pDiskData->aBlockCol);
if (pDiskData->aColData) taosArrayDestroy(pDiskData->aColData);
return code; return code;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册