From 787e166215551d657884a4e063f2fd30ee8e3361 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 26 Sep 2022 17:45:37 +0800 Subject: [PATCH] refact more code --- source/util/src/tcompression.c | 544 ++++++++++++++++++++++++--------- 1 file changed, 403 insertions(+), 141 deletions(-) diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index f1d9b860b8..65e8cabe32 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -1159,8 +1159,9 @@ struct SCompressor { int8_t cmprAlg; int8_t autoAlloc; int32_t nVal; - uint8_t *aBuf[2]; - int64_t nBuf[2]; + uint8_t *pBuf; + int32_t nBuf; + uint8_t *aBuf[1]; union { // Timestamp ---- struct { @@ -1193,93 +1194,91 @@ struct SCompressor { // Timestamp ===================================================== static int32_t tCompTimestampStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { int32_t code = 0; - pCmprsor->ts_prev_val = 0; - pCmprsor->ts_prev_delta = 0; - pCmprsor->ts_flag_p = NULL; - pCmprsor->aBuf[0][0] = 1; - pCmprsor->nBuf[0] = 1; + + pCmprsor->nBuf = 1; + + code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf); + if (code) return code; + + pCmprsor->pBuf[0] = 1; + return code; } -static int32_t tCompSetCopyMode(SCompressor *pCmprsor) { +static int32_t tCompSwitchToCopyMode(SCompressor *pCmprsor) { int32_t code = 0; - if (pCmprsor->nVal) { - if (pCmprsor->autoAlloc) { - code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * pCmprsor->nVal); - if (code) return code; - } - pCmprsor->nBuf[1] = 0; - - int64_t n = 1; - int64_t value; - int64_t delta; - uint64_t vZigzag; - while (n < pCmprsor->nBuf[0]) { - uint8_t aN[2]; - aN[0] = pCmprsor->aBuf[0][n] & 0xf; - aN[1] = pCmprsor->aBuf[0][n] >> 4; - - n++; - - for (int32_t i = 0; i < 2; i++) { - vZigzag = 0; - for (uint8_t j = 0; j < aN[i]; j++) { - vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (8 * j)); - n++; - } + if (pCmprsor->nVal == 0) goto _exit; - int64_t delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag); - if (pCmprsor->nBuf[1] == 0) { - delta = 0; - value = delta_of_delta; - } else { - delta = delta_of_delta + delta; - value = delta + value; - } + if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], sizeof(int64_t) * pCmprsor->nVal + 1))) { + return code; + } + + int32_t n = 1; + int32_t nBuf = 1; + int64_t value; + int64_t delta; + for (int32_t iVal = 0; iVal < pCmprsor->nVal;) { + uint8_t aN[2] = {(pCmprsor->pBuf[n] & 0xf), (pCmprsor->pBuf[n] >> 4)}; - memcpy(pCmprsor->aBuf[1] + pCmprsor->nBuf[1], &value, sizeof(int64_t)); - pCmprsor->nBuf[1] += sizeof(int64_t); + n++; - if (n >= pCmprsor->nBuf[0]) break; + for (int32_t i = 0; i < 2; i++) { + uint64_t vZigzag = 0; + for (uint8_t j = 0; j < aN[i]; j++) { + vZigzag |= (((uint64_t)pCmprsor->pBuf[n]) << (8 * j)); + n++; + } + + int64_t delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag); + if (iVal) { + delta = delta_of_delta + delta; + value = delta + value; + } else { + delta = 0; + value = delta_of_delta; } - } - ASSERT(n == pCmprsor->nBuf[0]); + memcpy(pCmprsor->aBuf[0] + nBuf, &value, sizeof(value)); + nBuf += sizeof(int64_t); - if (pCmprsor->autoAlloc) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[1] + 1); - if (code) return code; + iVal++; + if (iVal >= pCmprsor->nVal) break; } - memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->aBuf[1], pCmprsor->nBuf[1]); - pCmprsor->nBuf[0] = 1 + pCmprsor->nBuf[1]; } - pCmprsor->aBuf[0][0] = 0; + ASSERT(n == pCmprsor->nBuf && nBuf == sizeof(int64_t) * pCmprsor->nBuf + 1); + + uint8_t *pBuf = pCmprsor->pBuf; + pCmprsor->pBuf = pCmprsor->aBuf[0]; + pCmprsor->aBuf[0] = pBuf; + pCmprsor->nBuf = nBuf; + +_exit: + pCmprsor->pBuf[0] = 0; return code; } static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; int64_t ts = *(int64_t *)pData; - ASSERT(pCmprsor->type == TSDB_DATA_TYPE_TIMESTAMP); ASSERT(nData == 8); - if (pCmprsor->aBuf[0][0] == 1) { + if (pCmprsor->pBuf[0] == 1) { if (pCmprsor->nVal == 0) { pCmprsor->ts_prev_val = ts; pCmprsor->ts_prev_delta = -ts; } if (!I64_SAFE_ADD(ts, -pCmprsor->ts_prev_val)) { - code = tCompSetCopyMode(pCmprsor); + code = tCompSwitchToCopyMode(pCmprsor); if (code) return code; goto _copy_cmpr; } int64_t delta = ts - pCmprsor->ts_prev_val; if (!I64_SAFE_ADD(delta, -pCmprsor->ts_prev_delta)) { - code = tCompSetCopyMode(pCmprsor); + code = tCompSwitchToCopyMode(pCmprsor); if (code) return code; goto _copy_cmpr; } @@ -1290,37 +1289,35 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t pCmprsor->ts_prev_delta = delta; if ((pCmprsor->nVal & 0x1) == 0) { - if (pCmprsor->autoAlloc) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17); - if (code) return code; + if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + 17))) { + return code; } - pCmprsor->ts_flag_p = pCmprsor->aBuf[0] + pCmprsor->nBuf[0]; - pCmprsor->nBuf[0]++; + pCmprsor->ts_flag_p = pCmprsor->pBuf + pCmprsor->nBuf; + pCmprsor->nBuf++; pCmprsor->ts_flag_p[0] = 0; while (vZigzag) { - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (vZigzag & 0xff); - pCmprsor->nBuf[0]++; + pCmprsor->pBuf[pCmprsor->nBuf] = (vZigzag & 0xff); + pCmprsor->nBuf++; pCmprsor->ts_flag_p[0]++; vZigzag >>= 8; } } else { while (vZigzag) { - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (vZigzag & 0xff); - pCmprsor->nBuf[0]++; + pCmprsor->pBuf[pCmprsor->nBuf] = (vZigzag & 0xff); + pCmprsor->nBuf++; pCmprsor->ts_flag_p[0] += 0x10; vZigzag >>= 8; } } } else { _copy_cmpr: - if (pCmprsor->autoAlloc) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(ts)); - if (code) return code; + if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + sizeof(ts)))) { + return code; } - memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts)); - pCmprsor->nBuf[0] += sizeof(ts); + memcpy(pCmprsor->pBuf + pCmprsor->nBuf, &ts, sizeof(ts)); + pCmprsor->nBuf += sizeof(ts); } pCmprsor->nVal++; @@ -1329,7 +1326,34 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t static int32_t tCompTimestampEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { int32_t code = 0; - // TODO + + if (pCmprsor->nBuf >= sizeof(int64_t) * pCmprsor->nVal + 1 && pCmprsor->pBuf[0] == 1) { + code = tCompSwitchToCopyMode(pCmprsor); + if (code) return code; + } + + if (pCmprsor->cmprAlg == TWO_STAGE_COMP) { + if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf + 1))) { + return code; + } + + *ppData = pCmprsor->aBuf[0]; + int32_t szComp = LZ4_compress_default(pCmprsor->pBuf, pCmprsor->aBuf[0] + 1, pCmprsor->nBuf, pCmprsor->nBuf); + if (szComp && szComp < pCmprsor->nBuf) { + pCmprsor->aBuf[0][0] = 1; + *nData = szComp + 1; + } else { + pCmprsor->aBuf[0][0] = 0; + memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->pBuf, pCmprsor->nBuf); + *nData = pCmprsor->nBuf + 1; + } + } else if (pCmprsor->cmprAlg == ONE_STAGE_COMP) { + *ppData = pCmprsor->pBuf; + *nData = pCmprsor->nBuf; + } else { + ASSERT(0); + } + return code; } @@ -1344,12 +1368,24 @@ static const uint8_t BIT_TO_SELECTOR[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10 static int32_t tCompIntStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { int32_t code = 0; + pCmprsor->i_prev = 0; pCmprsor->i_selector = 0; pCmprsor->i_start = 0; pCmprsor->i_end = 0; - pCmprsor->aBuf[0][0] = 0; - pCmprsor->nBuf[0] = 1; + pCmprsor->nBuf = 1; + + code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf); + if (code) return code; + + pCmprsor->pBuf[0] = 0; + + return code; +} + +static int32_t tCompIntSwitchToCopy(SCompressor *pCmprsor) { + int32_t code = 0; + // ASSERT(0); return code; } @@ -1363,26 +1399,18 @@ static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData) switch (pCmprsor->type) { case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: val = *(int8_t *)pData; break; case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: val = *(int16_t *)pData; break; case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: val = *(int32_t *)pData; break; case TSDB_DATA_TYPE_BIGINT: - val = *(int64_t *)pData; - break; - case TSDB_DATA_TYPE_UTINYINT: - val = *(uint8_t *)pData; - break; - case TSDB_DATA_TYPE_USMALLINT: - val = *(uint16_t *)pData; - break; - case TSDB_DATA_TYPE_UINT: - val = *(uint32_t *)pData; - break; case TSDB_DATA_TYPE_UBIGINT: val = *(int64_t *)pData; break; @@ -1392,14 +1420,16 @@ static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData) } if (!I64_SAFE_ADD(val, -pCmprsor->i_prev)) { - // TODO + code = tCompIntSwitchToCopy(pCmprsor); + if (code) return code; goto _copy_cmpr; } int64_t diff = val - pCmprsor->i_prev; uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, diff); if (vZigzag >= SIMPLE8B_MAX) { - // TODO + code = tCompIntSwitchToCopy(pCmprsor); + if (code) return code; goto _copy_cmpr; } @@ -1424,12 +1454,12 @@ static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData) nEle = SELECTOR_TO_ELEMS[pCmprsor->i_selector]; if (pCmprsor->autoAlloc) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(uint64_t)); + code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + sizeof(uint64_t)); if (code) return code; } - uint64_t *bp = (uint64_t *)(pCmprsor->aBuf[0] + pCmprsor->nBuf[0]); - pCmprsor->nBuf[0] += sizeof(uint64_t); + uint64_t *bp = (uint64_t *)(pCmprsor->pBuf + pCmprsor->nBuf); + pCmprsor->nBuf += sizeof(uint64_t); bp[0] = pCmprsor->i_selector; uint8_t bits = BIT_PER_INTEGER[pCmprsor->i_selector]; for (int32_t iVal = 0; iVal < nEle; iVal++) { @@ -1448,11 +1478,11 @@ static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData) } } else { _copy_cmpr: - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData); + code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + nData); if (code) return code; - memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData); - pCmprsor->nBuf[0] += nData; + memcpy(pCmprsor->pBuf + pCmprsor->nBuf, pData, nData); + pCmprsor->nBuf += nData; } pCmprsor->nVal++; @@ -1461,20 +1491,107 @@ static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData) static int32_t tCompIntEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { int32_t code = 0; - // TODO + + if (pCmprsor->nBuf >= DATA_TYPE_INFO[pCmprsor->type].bytes * pCmprsor->nVal + 1 && pCmprsor->pBuf[0] == 0) { + code = tCompIntSwitchToCopy(pCmprsor); + if (code) return code; + } + + if (pCmprsor->cmprAlg == TWO_STAGE_COMP) { + if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf + 1))) { + return code; + } + + *ppData = pCmprsor->aBuf[0]; + int32_t szComp = LZ4_compress_default(pCmprsor->pBuf, pCmprsor->aBuf[0] + 1, pCmprsor->nBuf, pCmprsor->nBuf); + if (szComp && szComp < pCmprsor->nBuf) { + pCmprsor->aBuf[0][0] = 1; + *nData = szComp + 1; + } else { + pCmprsor->aBuf[0][0] = 0; + memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->pBuf, pCmprsor->nBuf); + *nData = pCmprsor->nBuf + 1; + } + } else if (pCmprsor->cmprAlg == ONE_STAGE_COMP) { + *ppData = pCmprsor->pBuf; + *nData = pCmprsor->nBuf; + } else { + ASSERT(0); + } + return code; } // Float ===================================================== static int32_t tCompFloatStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { int32_t code = 0; + pCmprsor->f_prev = 0; pCmprsor->f_flag_p = NULL; - pCmprsor->aBuf[0][0] = 0; - pCmprsor->nBuf[0] = 1; + + pCmprsor->nBuf = 1; + + code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf); + if (code) return code; + + pCmprsor->pBuf[0] = 0; + return code; } +static int32_t tCompFloatSwitchToCopy(SCompressor *pCmprsor) { + int32_t code = 0; + + if (pCmprsor->nVal == 0) goto _exit; + + if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], sizeof(float) * pCmprsor->nVal + 1))) { + return code; + } + + int32_t n = 1; + int32_t nBuf = 1; + union { + float f; + uint32_t u; + } val = {.u = 0}; + + for (int32_t iVal = 0; iVal < pCmprsor->nVal;) { + uint8_t flags[2] = {(pCmprsor->pBuf[n] & 0xf), (pCmprsor->pBuf[n] >> 4)}; + + n++; + + for (int8_t i = 0; i < 2; i++) { + uint8_t flag = flags[i]; + + uint32_t diff = 0; + int8_t nBytes = (flag & 0x7) + 1; + for (int j = 0; j < nBytes; j++) { + diff |= (((uint32_t)pCmprsor->pBuf[n]) << (8 * j)); + n++; + } + + if (flag & 0x8) { + diff <<= (32 - nBytes * 8); + } + + val.u ^= diff; + + memcpy(pCmprsor->aBuf[0] + nBuf, &val.f, sizeof(val)); + nBuf += sizeof(val); + + iVal++; + if (iVal >= pCmprsor->nVal) break; + } + } + uint8_t *pBuf = pCmprsor->pBuf; + pCmprsor->pBuf = pCmprsor->aBuf[0]; + pCmprsor->aBuf[0] = pBuf; + pCmprsor->nBuf = nBuf; + +_exit: + pCmprsor->pBuf[0] = 1; + return code; +} static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; @@ -1507,13 +1624,12 @@ static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nDat if (nBytes == 0) nBytes++; if ((pCmprsor->nVal & 0x1) == 0) { - if (pCmprsor->autoAlloc) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 9); - if (code) return code; + if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + 9))) { + return code; } - pCmprsor->f_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; - pCmprsor->nBuf[0]++; + pCmprsor->f_flag_p = &pCmprsor->pBuf[pCmprsor->nBuf]; + pCmprsor->nBuf++; if (clz < ctz) { pCmprsor->f_flag_p[0] = (0x08 | (nBytes - 1)); @@ -1528,8 +1644,8 @@ static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nDat } } for (; nBytes; nBytes--) { - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff); - pCmprsor->nBuf[0]++; + pCmprsor->pBuf[pCmprsor->nBuf] = (diff & 0xff); + pCmprsor->nBuf++; diff >>= BITS_PER_BYTE; } pCmprsor->nVal++; @@ -1539,20 +1655,107 @@ static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nDat static int32_t tCompFloatEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { int32_t code = 0; - // TODO + + if (pCmprsor->nBuf >= sizeof(float) * pCmprsor->nVal + 1) { + code = tCompFloatSwitchToCopy(pCmprsor); + if (code) return code; + } + + if (pCmprsor->cmprAlg == TWO_STAGE_COMP) { + if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf + 1))) { + return code; + } + + *ppData = pCmprsor->aBuf[0]; + int32_t szComp = LZ4_compress_default(pCmprsor->pBuf, pCmprsor->aBuf[0] + 1, pCmprsor->nBuf, pCmprsor->nBuf); + if (szComp && szComp < pCmprsor->nBuf) { + pCmprsor->aBuf[0][0] = 1; + *nData = szComp + 1; + } else { + pCmprsor->aBuf[0][0] = 0; + memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->pBuf, pCmprsor->nBuf); + *nData = pCmprsor->nBuf + 1; + } + } else if (pCmprsor->cmprAlg == ONE_STAGE_COMP) { + *ppData = pCmprsor->pBuf; + *nData = pCmprsor->nBuf; + } else { + ASSERT(0); + } + return code; } // Double ===================================================== static int32_t tCompDoubleStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { int32_t code = 0; + pCmprsor->d_prev = 0; pCmprsor->d_flag_p = NULL; - pCmprsor->aBuf[0][0] = 0; - pCmprsor->nBuf[0] = 1; + + pCmprsor->nBuf = 1; + + code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf); + if (code) return code; + + pCmprsor->pBuf[0] = 0; + return code; } +static int32_t tCompDoubleSwitchToCopy(SCompressor *pCmprsor) { + int32_t code = 0; + + if (pCmprsor->nVal == 0) goto _exit; + + if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], sizeof(double) * pCmprsor->nVal + 1))) { + return code; + } + + int32_t n = 1; + int32_t nBuf = 1; + union { + double f; + uint64_t u; + } val = {.u = 0}; + + for (int32_t iVal = 0; iVal < pCmprsor->nVal;) { + uint8_t flags[2] = {(pCmprsor->pBuf[n] & 0xf), (pCmprsor->pBuf[n] >> 4)}; + + n++; + + for (int8_t i = 0; i < 2; i++) { + uint8_t flag = flags[i]; + + uint64_t diff = 0; + int8_t nBytes = (flag & 0x7) + 1; + for (int j = 0; j < nBytes; j++) { + diff |= (((uint64_t)pCmprsor->pBuf[n]) << (8 * j)); + n++; + } + + if (flag & 0x8) { + diff <<= (64 - nBytes * 8); + } + + val.u ^= diff; + + memcpy(pCmprsor->aBuf[0] + nBuf, &val.f, sizeof(val)); + nBuf += sizeof(val); + + iVal++; + if (iVal >= pCmprsor->nVal) break; + } + } + uint8_t *pBuf = pCmprsor->pBuf; + pCmprsor->pBuf = pCmprsor->aBuf[0]; + pCmprsor->aBuf[0] = pBuf; + pCmprsor->nBuf = nBuf; + +_exit: + pCmprsor->pBuf[0] = 1; + return code; +} static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; @@ -1585,13 +1788,12 @@ static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nDa if (nBytes == 0) nBytes++; if ((pCmprsor->nVal & 0x1) == 0) { - if (pCmprsor->autoAlloc) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17); - if (code) return code; + if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + 17))) { + return code; } - pCmprsor->d_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; - pCmprsor->nBuf[0]++; + pCmprsor->d_flag_p = &pCmprsor->pBuf[pCmprsor->nBuf]; + pCmprsor->nBuf++; if (clz < ctz) { pCmprsor->d_flag_p[0] = (0x08 | (nBytes - 1)); @@ -1606,8 +1808,8 @@ static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nDa } } for (; nBytes; nBytes--) { - pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff); - pCmprsor->nBuf[0]++; + pCmprsor->pBuf[pCmprsor->nBuf] = (diff & 0xff); + pCmprsor->nBuf++; diff >>= BITS_PER_BYTE; } pCmprsor->nVal++; @@ -1617,28 +1819,53 @@ static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nDa static int32_t tCompDoubleEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { int32_t code = 0; - // TODO + + if (pCmprsor->nBuf >= sizeof(double) * pCmprsor->nVal + 1) { + code = tCompDoubleSwitchToCopy(pCmprsor); + if (code) return code; + } + + if (pCmprsor->cmprAlg == TWO_STAGE_COMP) { + if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf + 1))) { + return code; + } + + *ppData = pCmprsor->aBuf[0]; + int32_t szComp = LZ4_compress_default(pCmprsor->pBuf, pCmprsor->aBuf[0] + 1, pCmprsor->nBuf, pCmprsor->nBuf); + if (szComp && szComp < pCmprsor->nBuf) { + pCmprsor->aBuf[0][0] = 1; + *nData = szComp + 1; + } else { + pCmprsor->aBuf[0][0] = 0; + memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->pBuf, pCmprsor->nBuf); + *nData = pCmprsor->nBuf + 1; + } + } else if (pCmprsor->cmprAlg == ONE_STAGE_COMP) { + *ppData = pCmprsor->pBuf; + *nData = pCmprsor->nBuf; + } else { + ASSERT(0); + } + return code; } // Binary ===================================================== static int32_t tCompBinaryStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { - int32_t code = 0; - pCmprsor->nBuf[0] = 0; - return code; + pCmprsor->nBuf = 1; + return 0; } static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData) { int32_t code = 0; if (nData) { - if (pCmprsor->autoAlloc) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData); - if (code) return code; + if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + nData))) { + return code; } - memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData); - pCmprsor->nBuf[0] += nData; + memcpy(pCmprsor->pBuf + pCmprsor->nBuf, pData, nData); + pCmprsor->nBuf += nData; } pCmprsor->nVal++; @@ -1647,7 +1874,25 @@ static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nDa static int32_t tCompBinaryEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { int32_t code = 0; - // TODO + + if (pCmprsor->nBuf == 1) return code; + + if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf))) { + return code; + } + + int32_t szComp = + LZ4_compress_default(pCmprsor->pBuf + 1, pCmprsor->aBuf[0] + 1, pCmprsor->nBuf - 1, pCmprsor->nBuf - 1); + if (szComp && szComp < pCmprsor->nBuf - 1) { + pCmprsor->aBuf[0][0] = 1; + *ppData = pCmprsor->aBuf[0]; + *nData = szComp + 1; + } else { + pCmprsor->pBuf[0] = 0; + *ppData = pCmprsor->pBuf; + *nData = pCmprsor->nBuf; + } + return code; } @@ -1655,9 +1900,8 @@ static int32_t tCompBinaryEnd(SCompressor *pCmprsor, const uint8_t **ppData, int static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000}; static int32_t tCompBoolStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { - int32_t code = 0; - pCmprsor->nBuf[0] = 0; - return code; + pCmprsor->nBuf = 0; + return 0; } static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData) { @@ -1665,19 +1909,18 @@ static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData bool vBool = *(int8_t *)pData; - int32_t mod4 = pCmprsor->nVal & 3; + int32_t mod4 = (pCmprsor->nVal & 3); if (mod4 == 0) { - pCmprsor->nBuf[0]++; + pCmprsor->nBuf++; - if (pCmprsor->autoAlloc) { - code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0]); - if (code) return code; + if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf))) { + return code; } - pCmprsor->aBuf[0][pCmprsor->nBuf[0] - 1] = 0; + pCmprsor->pBuf[pCmprsor->nBuf - 1] = 0; } if (vBool) { - pCmprsor->aBuf[0][pCmprsor->nBuf[0] - 1] |= BOOL_CMPR_TABLE[mod4]; + pCmprsor->pBuf[pCmprsor->nBuf - 1] |= BOOL_CMPR_TABLE[mod4]; } pCmprsor->nVal++; @@ -1686,7 +1929,30 @@ static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData static int32_t tCompBoolEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { int32_t code = 0; - // TODO + + if (pCmprsor->cmprAlg == TWO_STAGE_COMP) { + if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf + 1))) { + return code; + } + + *ppData = pCmprsor->aBuf[0]; + + int32_t szComp = LZ4_compress_default(pCmprsor->pBuf, pCmprsor->aBuf[0] + 1, pCmprsor->nBuf, pCmprsor->nBuf); + if (szComp && szComp < pCmprsor->nBuf) { + pCmprsor->aBuf[0][0] = 1; + *nData = szComp + 1; + } else { + pCmprsor->aBuf[0][0] = 0; + memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->pBuf, pCmprsor->nBuf); + *nData = pCmprsor->nBuf + 1; + } + } else if (pCmprsor->cmprAlg == ONE_STAGE_COMP) { + *ppData = pCmprsor->pBuf; + *nData = pCmprsor->nBuf; + } else { + ASSERT(0); + } + return code; } @@ -1697,23 +1963,17 @@ int32_t tCompressorCreate(SCompressor **ppCmprsor) { *ppCmprsor = (SCompressor *)taosMemoryCalloc(1, sizeof(SCompressor)); if ((*ppCmprsor) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + return code; } - code = tRealloc(&(*ppCmprsor)->aBuf[0], 1024); - if (code) { - taosMemoryFree(*ppCmprsor); - *ppCmprsor = NULL; - goto _exit; - } - -_exit: return code; } int32_t tCompressorDestroy(SCompressor *pCmprsor) { int32_t code = 0; + tFree(pCmprsor->pBuf); + int32_t nBuf = sizeof(pCmprsor->aBuf) / sizeof(pCmprsor->aBuf[0]); for (int32_t iBuf = 0; iBuf < nBuf; iBuf++) { tFree(pCmprsor->aBuf[iBuf]); @@ -1745,6 +2005,8 @@ int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nDa *ppData = NULL; *nData = 0; + if (pCmprsor->nVal == 0) return code; + if (DATA_TYPE_INFO[pCmprsor->type].endFn) { return DATA_TYPE_INFO[pCmprsor->type].endFn(pCmprsor, ppData, nData); } -- GitLab