提交 1bab3c1b 编写于 作者: H Hongze Cheng

optimize more code

上级 11d41db7
......@@ -18,15 +18,20 @@
typedef struct SDiskColBuilder SDiskColBuilder;
struct SDiskColBuilder {
int16_t cid;
int8_t type;
int8_t flag;
uint8_t cmprAlg;
int32_t nVal;
uint8_t *pBitMap;
int32_t offset;
SCompressor *pOffC;
SCompressor *pValC;
int16_t cid;
int8_t type;
uint8_t cmprAlg;
uint8_t calcSma;
int8_t flag;
int32_t nVal;
uint8_t *pBitMap;
int32_t offset;
SCompressor *pOffC;
SCompressor *pValC;
SColumnDataAgg sma;
uint8_t minSet;
uint8_t maxSet;
uint8_t *aBuf[1];
};
struct SDiskDataBuilder {
......@@ -53,36 +58,41 @@ static int32_t tDiskColBuilderDestroy(SDiskColBuilder *pBuilder) {
tFree(pBuilder->pBitMap);
if (pBuilder->pOffC) tCompressorDestroy(pBuilder->pOffC);
if (pBuilder->pValC) tCompressorDestroy(pBuilder->pValC);
for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) {
tFree(pBuilder->aBuf[iBuf]);
}
return code;
}
static int32_t tDiskColBuilderInit(SDiskColBuilder *pBuilder, int16_t cid, int8_t type, uint8_t cmprAlg) {
static int32_t tDiskColBuilderInit(SDiskColBuilder *pBuilder, int16_t cid, int8_t type, uint8_t cmprAlg,
uint8_t calcSma) {
int32_t code = 0;
pBuilder->cid = cid;
pBuilder->type = type;
pBuilder->flag = 0;
pBuilder->cmprAlg = cmprAlg;
pBuilder->calcSma = IS_VAR_DATA_TYPE(type) ? 0 : calcSma;
pBuilder->flag = 0;
pBuilder->nVal = 0;
pBuilder->offset = 0;
if (IS_VAR_DATA_TYPE(type)) {
if (pBuilder->pOffC == NULL) {
code = tCompressorCreate(&pBuilder->pOffC);
if (code) return code;
}
if (pBuilder->pOffC == NULL && (code = tCompressorCreate(&pBuilder->pOffC))) return code;
code = tCompressorInit(pBuilder->pOffC, TSDB_DATA_TYPE_INT, cmprAlg);
if (code) return code;
}
if (pBuilder->pValC == NULL) {
code = tCompressorCreate(&pBuilder->pValC);
if (code) return code;
}
if (pBuilder->pValC == NULL && (code = tCompressorCreate(&pBuilder->pValC))) return code;
code = tCompressorInit(pBuilder->pValC, type, cmprAlg);
if (code) return code;
if (pBuilder->calcSma) {
pBuilder->sma = (SColumnDataAgg){.colId = cid};
pBuilder->minSet = 0;
pBuilder->maxSet = 0;
}
return code;
}
......@@ -91,24 +101,31 @@ static int32_t tGnrtDiskCol(SDiskColBuilder *pBuilder, SDiskCol *pDiskCol) {
ASSERT(pBuilder->flag && pBuilder->flag != HAS_NONE);
pDiskCol->bCol = (SBlockCol){.cid = pBuilder->cid,
.type = pBuilder->type,
.smaOn = 1, /* todo */
.flag = pBuilder->flag,
.szOrigin = 0, // todo
.szBitmap = 0, // todo
.szOffset = 0,
.szValue = 0, // todo
.offset = 0};
pDiskCol->pBit = NULL;
pDiskCol->pOff = NULL;
pDiskCol->pVal = NULL;
*pDiskCol = (SDiskCol){(SBlockCol){.cid = pBuilder->cid,
.type = pBuilder->type,
.smaOn = pBuilder->calcSma,
.flag = pBuilder->flag,
.szOrigin = 0, // todo
.szBitmap = 0,
.szOffset = 0,
.szValue = 0,
.offset = 0},
.pBit = NULL, .pOff = NULL, .pVal = NULL, .agg = pBuilder->sma};
if (pBuilder->flag == HAS_NULL) return code;
// BITMAP
if (pBuilder->flag != HAS_VALUE) {
// TODO
int32_t nBit;
if (pBuilder->flag == (HAS_VALUE | HAS_NULL | HAS_NONE)) {
nBit = BIT2_SIZE(pBuilder->nVal);
} else {
nBit = BIT1_SIZE(pBuilder->nVal);
}
pDiskCol->bCol.szBitmap = tsCompressTinyint(pBuilder->pBitMap, nBit, nBit, pBuilder->aBuf[0], 0, pBuilder->cmprAlg,
NULL, 0); // todo: alloc
pDiskCol->pBit = pBuilder->aBuf[0];
}
// OFFSET
......@@ -126,22 +143,114 @@ static int32_t tGnrtDiskCol(SDiskColBuilder *pBuilder, SDiskCol *pDiskCol) {
return code;
}
static int32_t tDiskColAddValue(SDiskColBuilder *pBuilder, SColVal *pColVal) {
static FORCE_INLINE void tDiskColUpdateSmaBool(SDiskColBuilder *pBuilder, SColVal *pColVal) {
// TODO
}
static FORCE_INLINE void tDiskColUpdateSmaTinyint(SDiskColBuilder *pBuilder, SColVal *pColVal) {
int8_t val = *(int8_t *)&pColVal->value.val;
pBuilder->sma.sum += val;
if (!pBuilder->minSet || pBuilder->sma.min > val) pBuilder->sma.min = val;
if (!pBuilder->maxSet || pBuilder->sma.max < val) pBuilder->sma.max = val;
}
static FORCE_INLINE void tDiskColUpdateSmaSmallint(SDiskColBuilder *pBuilder, SColVal *pColVal) {
int16_t val = *(int16_t *)&pColVal->value.val;
pBuilder->sma.sum += val;
if (!pBuilder->minSet || pBuilder->sma.min > val) pBuilder->sma.min = val;
if (!pBuilder->maxSet || pBuilder->sma.max < val) pBuilder->sma.max = val;
}
static FORCE_INLINE void tDiskColUpdateSmaInt(SDiskColBuilder *pBuilder, SColVal *pColVal) {
int32_t val = *(int32_t *)&pColVal->value.val;
pBuilder->sma.sum += val;
if (!pBuilder->minSet || pBuilder->sma.min > val) pBuilder->sma.min = val;
if (!pBuilder->maxSet || pBuilder->sma.max < val) pBuilder->sma.max = val;
}
static FORCE_INLINE void tDiskColUpdateBigint(SDiskColBuilder *pBuilder, SColVal *pColVal) {
int64_t val = *(int64_t *)&pColVal->value.val;
pBuilder->sma.sum += val;
if (!pBuilder->minSet || pBuilder->sma.min > val) pBuilder->sma.min = val;
if (!pBuilder->maxSet || pBuilder->sma.max < val) pBuilder->sma.max = val;
}
static FORCE_INLINE void tDiskColUpdateFloat(SDiskColBuilder *pBuilder, SColVal *pColVal) {
float val = *(float *)&pColVal->value.val;
*(double *)&pBuilder->sma.sum += val;
if (!pBuilder->minSet || *(double *)&pBuilder->sma.min > val) *(double *)&pBuilder->sma.min = val;
if (!pBuilder->maxSet || *(double *)&pBuilder->sma.max < val) *(double *)&pBuilder->sma.max = val;
}
static FORCE_INLINE void tDiskColUpdateDouble(SDiskColBuilder *pBuilder, SColVal *pColVal) {
double val = *(double *)&pColVal->value.val;
*(double *)&pBuilder->sma.sum += val;
if (!pBuilder->minSet || *(double *)&pBuilder->sma.min > val) *(double *)&pBuilder->sma.min = val;
if (!pBuilder->maxSet || *(double *)&pBuilder->sma.max < val) *(double *)&pBuilder->sma.max = val;
}
static FORCE_INLINE void tDiskColUpdateUTinyint(SDiskColBuilder *pBuilder, SColVal *pColVal) {
uint8_t val = *(uint8_t *)&pColVal->value.val;
pBuilder->sma.sum += val;
if (!pBuilder->minSet || pBuilder->sma.min > val) pBuilder->sma.min = val;
if (!pBuilder->maxSet || pBuilder->sma.max < val) pBuilder->sma.max = val;
}
static FORCE_INLINE void tDiskColUpdateUSmallint(SDiskColBuilder *pBuilder, SColVal *pColVal) {
uint16_t val = *(uint16_t *)&pColVal->value.val;
pBuilder->sma.sum += val;
if (!pBuilder->minSet || pBuilder->sma.min > val) pBuilder->sma.min = val;
if (!pBuilder->maxSet || pBuilder->sma.max < val) pBuilder->sma.max = val;
}
static FORCE_INLINE void tDiskColUpdateUInt(SDiskColBuilder *pBuilder, SColVal *pColVal) {
uint32_t val = *(uint32_t *)&pColVal->value.val;
pBuilder->sma.sum += val;
if (!pBuilder->minSet || pBuilder->sma.min > val) pBuilder->sma.min = val;
if (!pBuilder->maxSet || pBuilder->sma.max < val) pBuilder->sma.max = val;
}
static FORCE_INLINE void tDiskColUpdateUBigint(SDiskColBuilder *pBuilder, SColVal *pColVal) {
uint64_t val = *(uint64_t *)&pColVal->value.val;
pBuilder->sma.sum += val;
if (!pBuilder->minSet || pBuilder->sma.min > val) pBuilder->sma.min = val;
if (!pBuilder->maxSet || pBuilder->sma.max < val) pBuilder->sma.max = val;
}
static void (*tDiskColUpdateSmaImpl[])(SDiskColBuilder *pBuilder, SColVal *pColVal) = {
NULL, // TSDB_DATA_TYPE_NULL
tDiskColUpdateSmaBool, // TSDB_DATA_TYPE_BOOL
tDiskColUpdateSmaTinyint, // TSDB_DATA_TYPE_TINYINT
tDiskColUpdateSmaSmallint, // TSDB_DATA_TYPE_SMALLINT
tDiskColUpdateSmaInt, // TSDB_DATA_TYPE_INT
tDiskColUpdateBigint, // TSDB_DATA_TYPE_BIGINT
tDiskColUpdateFloat, // TSDB_DATA_TYPE_FLOAT
tDiskColUpdateDouble, // TSDB_DATA_TYPE_DOUBLE
NULL, // TSDB_DATA_TYPE_VARCHAR
tDiskColUpdateBigint, // TSDB_DATA_TYPE_TIMESTAMP
NULL, // TSDB_DATA_TYPE_NCHAR
tDiskColUpdateUTinyint, // TSDB_DATA_TYPE_UTINYINT
tDiskColUpdateUSmallint, // TSDB_DATA_TYPE_USMALLINT
tDiskColUpdateUInt, // TSDB_DATA_TYPE_UINT
tDiskColUpdateUBigint, // TSDB_DATA_TYPE_UBIGINT
NULL, // TSDB_DATA_TYPE_JSON
NULL, // TSDB_DATA_TYPE_VARBINARY
NULL, // TSDB_DATA_TYPE_DECIMAL
NULL, // TSDB_DATA_TYPE_BLOB
NULL, // TSDB_DATA_TYPE_MEDIUMBLOB
};
static FORCE_INLINE void tDiskColUpdateSma(SDiskColBuilder *pBuilder, SColVal *pColVal) {
if (pColVal->isNone || pColVal->isNull) {
pBuilder->sma.numOfNull++;
} else {
tDiskColUpdateSmaImpl[pBuilder->type](pBuilder, pColVal);
}
}
static int32_t tDiskColPutValue(SDiskColBuilder *pBuilder, SColVal *pColVal) {
int32_t code = 0;
if (IS_VAR_DATA_TYPE(pColVal->type)) {
code = tCompress(pBuilder->pOffC, &pBuilder->offset, sizeof(int32_t));
if (code) goto _exit;
if (code) return code;
pBuilder->offset += pColVal->value.nData;
code = tCompress(pBuilder->pValC, pColVal->value.pData, pColVal->value.nData);
if (code) goto _exit;
if (code) return code;
} else {
code = tCompress(pBuilder->pValC, &pColVal->value.val, tDataTypes[pColVal->type].bytes);
if (code) goto _exit;
if (code) return code;
}
_exit:
return code;
}
static int32_t tDiskColAddVal0(SDiskColBuilder *pBuilder, SColVal *pColVal) { // 0
......@@ -153,12 +262,10 @@ static int32_t tDiskColAddVal0(SDiskColBuilder *pBuilder, SColVal *pColVal) { /
pBuilder->flag = HAS_NULL;
} else {
pBuilder->flag = HAS_VALUE;
code = tDiskColAddValue(pBuilder, pColVal);
if (code) goto _exit;
code = tDiskColPutValue(pBuilder, pColVal);
if (code) return code;
}
pBuilder->nVal++;
_exit:
return code;
}
static int32_t tDiskColAddVal1(SDiskColBuilder *pBuilder, SColVal *pColVal) { // HAS_NONE
......@@ -169,7 +276,7 @@ static int32_t tDiskColAddVal1(SDiskColBuilder *pBuilder, SColVal *pColVal) { /
int32_t nBit = BIT1_SIZE(pBuilder->nVal + 1);
code = tRealloc(&pBuilder->pBitMap, nBit);
if (code) goto _exit;
if (code) return code;
memset(pBuilder->pBitMap, 0, nBit);
SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 1);
......@@ -182,17 +289,15 @@ static int32_t tDiskColAddVal1(SDiskColBuilder *pBuilder, SColVal *pColVal) { /
SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0});
for (int32_t iVal = 0; iVal < pBuilder->nVal; iVal++) {
code = tDiskColAddValue(pBuilder, &cv);
if (code) goto _exit;
code = tDiskColPutValue(pBuilder, &cv);
if (code) return code;
}
code = tDiskColAddValue(pBuilder, pColVal);
if (code) goto _exit;
code = tDiskColPutValue(pBuilder, pColVal);
if (code) return code;
}
}
pBuilder->nVal++;
_exit:
return code;
}
static int32_t tDiskColAddVal2(SDiskColBuilder *pBuilder, SColVal *pColVal) { // HAS_NULL
......@@ -216,15 +321,14 @@ static int32_t tDiskColAddVal2(SDiskColBuilder *pBuilder, SColVal *pColVal) { /
SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0});
for (int32_t iVal = 0; iVal < pBuilder->nVal; iVal++) {
code = tDiskColAddValue(pBuilder, &cv);
code = tDiskColPutValue(pBuilder, &cv);
if (code) goto _exit;
}
code = tDiskColAddValue(pBuilder, pColVal);
code = tDiskColPutValue(pBuilder, pColVal);
if (code) goto _exit;
}
}
pBuilder->nVal++;
_exit:
return code;
......@@ -259,14 +363,13 @@ static int32_t tDiskColAddVal3(SDiskColBuilder *pBuilder, SColVal *pColVal) { /
SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0});
for (int32_t iVal = 0; iVal < pBuilder->nVal; iVal++) {
code = tDiskColAddValue(pBuilder, &cv);
code = tDiskColPutValue(pBuilder, &cv);
if (code) goto _exit;
}
code = tDiskColAddValue(pBuilder, pColVal);
code = tDiskColPutValue(pBuilder, pColVal);
if (code) goto _exit;
}
pBuilder->nVal++;
_exit:
return code;
......@@ -288,13 +391,12 @@ static int32_t tDiskColAddVal4(SDiskColBuilder *pBuilder, SColVal *pColVal) { /
memset(pBuilder->pBitMap, 255, nBit);
SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 0);
code = tDiskColAddValue(pBuilder, pColVal);
code = tDiskColPutValue(pBuilder, pColVal);
if (code) goto _exit;
} else {
code = tDiskColAddValue(pBuilder, pColVal);
code = tDiskColPutValue(pBuilder, pColVal);
if (code) goto _exit;
}
pBuilder->nVal++;
_exit:
return code;
......@@ -326,9 +428,8 @@ static int32_t tDiskColAddVal5(SDiskColBuilder *pBuilder, SColVal *pColVal) { /
SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 1);
}
}
code = tDiskColAddValue(pBuilder, pColVal);
code = tDiskColPutValue(pBuilder, pColVal);
if (code) goto _exit;
pBuilder->nVal++;
_exit:
return code;
......@@ -360,9 +461,8 @@ static int32_t tDiskColAddVal6(SDiskColBuilder *pBuilder, SColVal *pColVal) { /
SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 1);
}
}
code = tDiskColAddValue(pBuilder, pColVal);
code = tDiskColPutValue(pBuilder, pColVal);
if (code) goto _exit;
pBuilder->nVal++;
_exit:
return code;
......@@ -380,9 +480,8 @@ static int32_t tDiskColAddVal7(SDiskColBuilder *pBuilder, SColVal *pColVal) { /
} else {
SET_BIT2(pBuilder->pBitMap, pBuilder->nVal, 2);
}
code = tDiskColAddValue(pBuilder, pColVal);
code = tDiskColPutValue(pBuilder, pColVal);
if (code) goto _exit;
pBuilder->nVal++;
_exit:
return code;
......@@ -397,6 +496,18 @@ static int32_t (*tDiskColAddValImpl[])(SDiskColBuilder *pBuilder, SColVal *pColV
tDiskColAddVal6, // HAS_VALUE|HAS_NULL
tDiskColAddVal7, // HAS_VALUE|HAS_NULL|HAS_NONE
};
static int32_t tDiskColAddVal(SDiskColBuilder *pBuilder, SColVal *pColVal) {
int32_t code = 0;
if (pBuilder->calcSma) tDiskColUpdateSma(pBuilder, pColVal);
code = tDiskColAddValImpl[pBuilder->type](pBuilder, pColVal);
if (code) return code;
pBuilder->nVal++;
return code;
}
// SDiskDataBuilder ================================================
int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder) {
......@@ -477,7 +588,8 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB
SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, pBuilder->nBuilder);
code = tDiskColBuilderInit(pDiskColBuilder, pTColumn->colId, pTColumn->type, cmprAlg);
code = tDiskColBuilderInit(pDiskColBuilder, pTColumn->colId, pTColumn->type, cmprAlg,
(calcSma && (pTColumn->flags & COL_SMA_ON)));
if (code) return code;
pBuilder->nBuilder++;
......@@ -528,10 +640,10 @@ int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSche
if (pColVal == NULL || pColVal->cid > pDCBuilder->cid) {
SColVal cv = COL_VAL_NONE(pDCBuilder->cid, pDCBuilder->type);
code = tDiskColAddValImpl[pDCBuilder->flag](pDCBuilder, &cv);
code = tDiskColAddVal(pDCBuilder, &cv);
if (code) return code;
} else {
code = tDiskColAddValImpl[pDCBuilder->flag](pDCBuilder, pColVal);
code = tDiskColAddVal(pDCBuilder, pColVal);
if (code) return code;
pColVal = tRowIterNext(&iter);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册