提交 787e1662 编写于 作者: H Hongze Cheng

refact more code

上级 08feade5
......@@ -1159,8 +1159,9 @@ struct SCompressor {
int8_t cmprAlg;
int8_t autoAlloc;
int32_t nVal;
uint8_t *aBuf[2];
int64_t nBuf[2];
uint8_t *pBuf;
int32_t nBuf;
uint8_t *aBuf[1];
union {
// Timestamp ----
struct {
......@@ -1193,93 +1194,91 @@ struct SCompressor {
// Timestamp =====================================================
static int32_t tCompTimestampStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
int32_t code = 0;
pCmprsor->ts_prev_val = 0;
pCmprsor->ts_prev_delta = 0;
pCmprsor->ts_flag_p = NULL;
pCmprsor->aBuf[0][0] = 1;
pCmprsor->nBuf[0] = 1;
pCmprsor->nBuf = 1;
code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf);
if (code) return code;
pCmprsor->pBuf[0] = 1;
return code;
}
static int32_t tCompSetCopyMode(SCompressor *pCmprsor) {
static int32_t tCompSwitchToCopyMode(SCompressor *pCmprsor) {
int32_t code = 0;
if (pCmprsor->nVal) {
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * pCmprsor->nVal);
if (code) return code;
if (pCmprsor->nVal == 0) goto _exit;
if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], sizeof(int64_t) * pCmprsor->nVal + 1))) {
return code;
}
pCmprsor->nBuf[1] = 0;
int64_t n = 1;
int32_t n = 1;
int32_t nBuf = 1;
int64_t value;
int64_t delta;
uint64_t vZigzag;
while (n < pCmprsor->nBuf[0]) {
uint8_t aN[2];
aN[0] = pCmprsor->aBuf[0][n] & 0xf;
aN[1] = pCmprsor->aBuf[0][n] >> 4;
for (int32_t iVal = 0; iVal < pCmprsor->nVal;) {
uint8_t aN[2] = {(pCmprsor->pBuf[n] & 0xf), (pCmprsor->pBuf[n] >> 4)};
n++;
for (int32_t i = 0; i < 2; i++) {
vZigzag = 0;
uint64_t vZigzag = 0;
for (uint8_t j = 0; j < aN[i]; j++) {
vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (8 * j));
vZigzag |= (((uint64_t)pCmprsor->pBuf[n]) << (8 * j));
n++;
}
int64_t delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag);
if (pCmprsor->nBuf[1] == 0) {
delta = 0;
value = delta_of_delta;
} else {
if (iVal) {
delta = delta_of_delta + delta;
value = delta + value;
} else {
delta = 0;
value = delta_of_delta;
}
memcpy(pCmprsor->aBuf[1] + pCmprsor->nBuf[1], &value, sizeof(int64_t));
pCmprsor->nBuf[1] += sizeof(int64_t);
memcpy(pCmprsor->aBuf[0] + nBuf, &value, sizeof(value));
nBuf += sizeof(int64_t);
if (n >= pCmprsor->nBuf[0]) break;
iVal++;
if (iVal >= pCmprsor->nVal) break;
}
}
ASSERT(n == pCmprsor->nBuf[0]);
ASSERT(n == pCmprsor->nBuf && nBuf == sizeof(int64_t) * pCmprsor->nBuf + 1);
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[1] + 1);
if (code) return code;
}
memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->aBuf[1], pCmprsor->nBuf[1]);
pCmprsor->nBuf[0] = 1 + pCmprsor->nBuf[1];
}
pCmprsor->aBuf[0][0] = 0;
uint8_t *pBuf = pCmprsor->pBuf;
pCmprsor->pBuf = pCmprsor->aBuf[0];
pCmprsor->aBuf[0] = pBuf;
pCmprsor->nBuf = nBuf;
_exit:
pCmprsor->pBuf[0] = 0;
return code;
}
static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0;
int64_t ts = *(int64_t *)pData;
ASSERT(pCmprsor->type == TSDB_DATA_TYPE_TIMESTAMP);
ASSERT(nData == 8);
if (pCmprsor->aBuf[0][0] == 1) {
if (pCmprsor->pBuf[0] == 1) {
if (pCmprsor->nVal == 0) {
pCmprsor->ts_prev_val = ts;
pCmprsor->ts_prev_delta = -ts;
}
if (!I64_SAFE_ADD(ts, -pCmprsor->ts_prev_val)) {
code = tCompSetCopyMode(pCmprsor);
code = tCompSwitchToCopyMode(pCmprsor);
if (code) return code;
goto _copy_cmpr;
}
int64_t delta = ts - pCmprsor->ts_prev_val;
if (!I64_SAFE_ADD(delta, -pCmprsor->ts_prev_delta)) {
code = tCompSetCopyMode(pCmprsor);
code = tCompSwitchToCopyMode(pCmprsor);
if (code) return code;
goto _copy_cmpr;
}
......@@ -1290,37 +1289,35 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t
pCmprsor->ts_prev_delta = delta;
if ((pCmprsor->nVal & 0x1) == 0) {
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17);
if (code) return code;
if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + 17))) {
return code;
}
pCmprsor->ts_flag_p = pCmprsor->aBuf[0] + pCmprsor->nBuf[0];
pCmprsor->nBuf[0]++;
pCmprsor->ts_flag_p = pCmprsor->pBuf + pCmprsor->nBuf;
pCmprsor->nBuf++;
pCmprsor->ts_flag_p[0] = 0;
while (vZigzag) {
pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (vZigzag & 0xff);
pCmprsor->nBuf[0]++;
pCmprsor->pBuf[pCmprsor->nBuf] = (vZigzag & 0xff);
pCmprsor->nBuf++;
pCmprsor->ts_flag_p[0]++;
vZigzag >>= 8;
}
} else {
while (vZigzag) {
pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (vZigzag & 0xff);
pCmprsor->nBuf[0]++;
pCmprsor->pBuf[pCmprsor->nBuf] = (vZigzag & 0xff);
pCmprsor->nBuf++;
pCmprsor->ts_flag_p[0] += 0x10;
vZigzag >>= 8;
}
}
} else {
_copy_cmpr:
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(ts));
if (code) return code;
if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + sizeof(ts)))) {
return code;
}
memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts));
pCmprsor->nBuf[0] += sizeof(ts);
memcpy(pCmprsor->pBuf + pCmprsor->nBuf, &ts, sizeof(ts));
pCmprsor->nBuf += sizeof(ts);
}
pCmprsor->nVal++;
......@@ -1329,7 +1326,34 @@ static int32_t tCompTimestamp(SCompressor *pCmprsor, const void *pData, int32_t
static int32_t tCompTimestampEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) {
int32_t code = 0;
// TODO
if (pCmprsor->nBuf >= sizeof(int64_t) * pCmprsor->nVal + 1 && pCmprsor->pBuf[0] == 1) {
code = tCompSwitchToCopyMode(pCmprsor);
if (code) return code;
}
if (pCmprsor->cmprAlg == TWO_STAGE_COMP) {
if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf + 1))) {
return code;
}
*ppData = pCmprsor->aBuf[0];
int32_t szComp = LZ4_compress_default(pCmprsor->pBuf, pCmprsor->aBuf[0] + 1, pCmprsor->nBuf, pCmprsor->nBuf);
if (szComp && szComp < pCmprsor->nBuf) {
pCmprsor->aBuf[0][0] = 1;
*nData = szComp + 1;
} else {
pCmprsor->aBuf[0][0] = 0;
memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->pBuf, pCmprsor->nBuf);
*nData = pCmprsor->nBuf + 1;
}
} else if (pCmprsor->cmprAlg == ONE_STAGE_COMP) {
*ppData = pCmprsor->pBuf;
*nData = pCmprsor->nBuf;
} else {
ASSERT(0);
}
return code;
}
......@@ -1344,12 +1368,24 @@ static const uint8_t BIT_TO_SELECTOR[] = {0, 2, 3, 4, 5, 6, 7, 8, 9, 10
static int32_t tCompIntStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
int32_t code = 0;
pCmprsor->i_prev = 0;
pCmprsor->i_selector = 0;
pCmprsor->i_start = 0;
pCmprsor->i_end = 0;
pCmprsor->aBuf[0][0] = 0;
pCmprsor->nBuf[0] = 1;
pCmprsor->nBuf = 1;
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf);
if (code) return code;
pCmprsor->pBuf[0] = 0;
return code;
}
static int32_t tCompIntSwitchToCopy(SCompressor *pCmprsor) {
int32_t code = 0;
// ASSERT(0);
return code;
}
......@@ -1363,26 +1399,18 @@ static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData)
switch (pCmprsor->type) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
val = *(int8_t *)pData;
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
val = *(int16_t *)pData;
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
val = *(int32_t *)pData;
break;
case TSDB_DATA_TYPE_BIGINT:
val = *(int64_t *)pData;
break;
case TSDB_DATA_TYPE_UTINYINT:
val = *(uint8_t *)pData;
break;
case TSDB_DATA_TYPE_USMALLINT:
val = *(uint16_t *)pData;
break;
case TSDB_DATA_TYPE_UINT:
val = *(uint32_t *)pData;
break;
case TSDB_DATA_TYPE_UBIGINT:
val = *(int64_t *)pData;
break;
......@@ -1392,14 +1420,16 @@ static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData)
}
if (!I64_SAFE_ADD(val, -pCmprsor->i_prev)) {
// TODO
code = tCompIntSwitchToCopy(pCmprsor);
if (code) return code;
goto _copy_cmpr;
}
int64_t diff = val - pCmprsor->i_prev;
uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, diff);
if (vZigzag >= SIMPLE8B_MAX) {
// TODO
code = tCompIntSwitchToCopy(pCmprsor);
if (code) return code;
goto _copy_cmpr;
}
......@@ -1424,12 +1454,12 @@ static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData)
nEle = SELECTOR_TO_ELEMS[pCmprsor->i_selector];
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(uint64_t));
code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + sizeof(uint64_t));
if (code) return code;
}
uint64_t *bp = (uint64_t *)(pCmprsor->aBuf[0] + pCmprsor->nBuf[0]);
pCmprsor->nBuf[0] += sizeof(uint64_t);
uint64_t *bp = (uint64_t *)(pCmprsor->pBuf + pCmprsor->nBuf);
pCmprsor->nBuf += sizeof(uint64_t);
bp[0] = pCmprsor->i_selector;
uint8_t bits = BIT_PER_INTEGER[pCmprsor->i_selector];
for (int32_t iVal = 0; iVal < nEle; iVal++) {
......@@ -1448,11 +1478,11 @@ static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData)
}
} else {
_copy_cmpr:
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData);
code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + nData);
if (code) return code;
memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData);
pCmprsor->nBuf[0] += nData;
memcpy(pCmprsor->pBuf + pCmprsor->nBuf, pData, nData);
pCmprsor->nBuf += nData;
}
pCmprsor->nVal++;
......@@ -1461,20 +1491,107 @@ static int32_t tCompInt(SCompressor *pCmprsor, const void *pData, int32_t nData)
static int32_t tCompIntEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) {
int32_t code = 0;
// TODO
if (pCmprsor->nBuf >= DATA_TYPE_INFO[pCmprsor->type].bytes * pCmprsor->nVal + 1 && pCmprsor->pBuf[0] == 0) {
code = tCompIntSwitchToCopy(pCmprsor);
if (code) return code;
}
if (pCmprsor->cmprAlg == TWO_STAGE_COMP) {
if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf + 1))) {
return code;
}
*ppData = pCmprsor->aBuf[0];
int32_t szComp = LZ4_compress_default(pCmprsor->pBuf, pCmprsor->aBuf[0] + 1, pCmprsor->nBuf, pCmprsor->nBuf);
if (szComp && szComp < pCmprsor->nBuf) {
pCmprsor->aBuf[0][0] = 1;
*nData = szComp + 1;
} else {
pCmprsor->aBuf[0][0] = 0;
memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->pBuf, pCmprsor->nBuf);
*nData = pCmprsor->nBuf + 1;
}
} else if (pCmprsor->cmprAlg == ONE_STAGE_COMP) {
*ppData = pCmprsor->pBuf;
*nData = pCmprsor->nBuf;
} else {
ASSERT(0);
}
return code;
}
// Float =====================================================
static int32_t tCompFloatStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
int32_t code = 0;
pCmprsor->f_prev = 0;
pCmprsor->f_flag_p = NULL;
pCmprsor->aBuf[0][0] = 0;
pCmprsor->nBuf[0] = 1;
pCmprsor->nBuf = 1;
code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf);
if (code) return code;
pCmprsor->pBuf[0] = 0;
return code;
}
static int32_t tCompFloatSwitchToCopy(SCompressor *pCmprsor) {
int32_t code = 0;
if (pCmprsor->nVal == 0) goto _exit;
if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], sizeof(float) * pCmprsor->nVal + 1))) {
return code;
}
int32_t n = 1;
int32_t nBuf = 1;
union {
float f;
uint32_t u;
} val = {.u = 0};
for (int32_t iVal = 0; iVal < pCmprsor->nVal;) {
uint8_t flags[2] = {(pCmprsor->pBuf[n] & 0xf), (pCmprsor->pBuf[n] >> 4)};
n++;
for (int8_t i = 0; i < 2; i++) {
uint8_t flag = flags[i];
uint32_t diff = 0;
int8_t nBytes = (flag & 0x7) + 1;
for (int j = 0; j < nBytes; j++) {
diff |= (((uint32_t)pCmprsor->pBuf[n]) << (8 * j));
n++;
}
if (flag & 0x8) {
diff <<= (32 - nBytes * 8);
}
val.u ^= diff;
memcpy(pCmprsor->aBuf[0] + nBuf, &val.f, sizeof(val));
nBuf += sizeof(val);
iVal++;
if (iVal >= pCmprsor->nVal) break;
}
}
uint8_t *pBuf = pCmprsor->pBuf;
pCmprsor->pBuf = pCmprsor->aBuf[0];
pCmprsor->aBuf[0] = pBuf;
pCmprsor->nBuf = nBuf;
_exit:
pCmprsor->pBuf[0] = 1;
return code;
}
static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0;
......@@ -1507,13 +1624,12 @@ static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nDat
if (nBytes == 0) nBytes++;
if ((pCmprsor->nVal & 0x1) == 0) {
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 9);
if (code) return code;
if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + 9))) {
return code;
}
pCmprsor->f_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]];
pCmprsor->nBuf[0]++;
pCmprsor->f_flag_p = &pCmprsor->pBuf[pCmprsor->nBuf];
pCmprsor->nBuf++;
if (clz < ctz) {
pCmprsor->f_flag_p[0] = (0x08 | (nBytes - 1));
......@@ -1528,8 +1644,8 @@ static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nDat
}
}
for (; nBytes; nBytes--) {
pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff);
pCmprsor->nBuf[0]++;
pCmprsor->pBuf[pCmprsor->nBuf] = (diff & 0xff);
pCmprsor->nBuf++;
diff >>= BITS_PER_BYTE;
}
pCmprsor->nVal++;
......@@ -1539,20 +1655,107 @@ static int32_t tCompFloat(SCompressor *pCmprsor, const void *pData, int32_t nDat
static int32_t tCompFloatEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) {
int32_t code = 0;
// TODO
if (pCmprsor->nBuf >= sizeof(float) * pCmprsor->nVal + 1) {
code = tCompFloatSwitchToCopy(pCmprsor);
if (code) return code;
}
if (pCmprsor->cmprAlg == TWO_STAGE_COMP) {
if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf + 1))) {
return code;
}
*ppData = pCmprsor->aBuf[0];
int32_t szComp = LZ4_compress_default(pCmprsor->pBuf, pCmprsor->aBuf[0] + 1, pCmprsor->nBuf, pCmprsor->nBuf);
if (szComp && szComp < pCmprsor->nBuf) {
pCmprsor->aBuf[0][0] = 1;
*nData = szComp + 1;
} else {
pCmprsor->aBuf[0][0] = 0;
memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->pBuf, pCmprsor->nBuf);
*nData = pCmprsor->nBuf + 1;
}
} else if (pCmprsor->cmprAlg == ONE_STAGE_COMP) {
*ppData = pCmprsor->pBuf;
*nData = pCmprsor->nBuf;
} else {
ASSERT(0);
}
return code;
}
// Double =====================================================
static int32_t tCompDoubleStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
int32_t code = 0;
pCmprsor->d_prev = 0;
pCmprsor->d_flag_p = NULL;
pCmprsor->aBuf[0][0] = 0;
pCmprsor->nBuf[0] = 1;
pCmprsor->nBuf = 1;
code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf);
if (code) return code;
pCmprsor->pBuf[0] = 0;
return code;
}
static int32_t tCompDoubleSwitchToCopy(SCompressor *pCmprsor) {
int32_t code = 0;
if (pCmprsor->nVal == 0) goto _exit;
if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], sizeof(double) * pCmprsor->nVal + 1))) {
return code;
}
int32_t n = 1;
int32_t nBuf = 1;
union {
double f;
uint64_t u;
} val = {.u = 0};
for (int32_t iVal = 0; iVal < pCmprsor->nVal;) {
uint8_t flags[2] = {(pCmprsor->pBuf[n] & 0xf), (pCmprsor->pBuf[n] >> 4)};
n++;
for (int8_t i = 0; i < 2; i++) {
uint8_t flag = flags[i];
uint64_t diff = 0;
int8_t nBytes = (flag & 0x7) + 1;
for (int j = 0; j < nBytes; j++) {
diff |= (((uint64_t)pCmprsor->pBuf[n]) << (8 * j));
n++;
}
if (flag & 0x8) {
diff <<= (64 - nBytes * 8);
}
val.u ^= diff;
memcpy(pCmprsor->aBuf[0] + nBuf, &val.f, sizeof(val));
nBuf += sizeof(val);
iVal++;
if (iVal >= pCmprsor->nVal) break;
}
}
uint8_t *pBuf = pCmprsor->pBuf;
pCmprsor->pBuf = pCmprsor->aBuf[0];
pCmprsor->aBuf[0] = pBuf;
pCmprsor->nBuf = nBuf;
_exit:
pCmprsor->pBuf[0] = 1;
return code;
}
static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0;
......@@ -1585,13 +1788,12 @@ static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nDa
if (nBytes == 0) nBytes++;
if ((pCmprsor->nVal & 0x1) == 0) {
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17);
if (code) return code;
if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + 17))) {
return code;
}
pCmprsor->d_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]];
pCmprsor->nBuf[0]++;
pCmprsor->d_flag_p = &pCmprsor->pBuf[pCmprsor->nBuf];
pCmprsor->nBuf++;
if (clz < ctz) {
pCmprsor->d_flag_p[0] = (0x08 | (nBytes - 1));
......@@ -1606,8 +1808,8 @@ static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nDa
}
}
for (; nBytes; nBytes--) {
pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (diff & 0xff);
pCmprsor->nBuf[0]++;
pCmprsor->pBuf[pCmprsor->nBuf] = (diff & 0xff);
pCmprsor->nBuf++;
diff >>= BITS_PER_BYTE;
}
pCmprsor->nVal++;
......@@ -1617,28 +1819,53 @@ static int32_t tCompDouble(SCompressor *pCmprsor, const void *pData, int32_t nDa
static int32_t tCompDoubleEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) {
int32_t code = 0;
// TODO
if (pCmprsor->nBuf >= sizeof(double) * pCmprsor->nVal + 1) {
code = tCompDoubleSwitchToCopy(pCmprsor);
if (code) return code;
}
if (pCmprsor->cmprAlg == TWO_STAGE_COMP) {
if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf + 1))) {
return code;
}
*ppData = pCmprsor->aBuf[0];
int32_t szComp = LZ4_compress_default(pCmprsor->pBuf, pCmprsor->aBuf[0] + 1, pCmprsor->nBuf, pCmprsor->nBuf);
if (szComp && szComp < pCmprsor->nBuf) {
pCmprsor->aBuf[0][0] = 1;
*nData = szComp + 1;
} else {
pCmprsor->aBuf[0][0] = 0;
memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->pBuf, pCmprsor->nBuf);
*nData = pCmprsor->nBuf + 1;
}
} else if (pCmprsor->cmprAlg == ONE_STAGE_COMP) {
*ppData = pCmprsor->pBuf;
*nData = pCmprsor->nBuf;
} else {
ASSERT(0);
}
return code;
}
// Binary =====================================================
static int32_t tCompBinaryStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
int32_t code = 0;
pCmprsor->nBuf[0] = 0;
return code;
pCmprsor->nBuf = 1;
return 0;
}
static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nData) {
int32_t code = 0;
if (nData) {
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + nData);
if (code) return code;
if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf + nData))) {
return code;
}
memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], pData, nData);
pCmprsor->nBuf[0] += nData;
memcpy(pCmprsor->pBuf + pCmprsor->nBuf, pData, nData);
pCmprsor->nBuf += nData;
}
pCmprsor->nVal++;
......@@ -1647,7 +1874,25 @@ static int32_t tCompBinary(SCompressor *pCmprsor, const void *pData, int32_t nDa
static int32_t tCompBinaryEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) {
int32_t code = 0;
// TODO
if (pCmprsor->nBuf == 1) return code;
if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf))) {
return code;
}
int32_t szComp =
LZ4_compress_default(pCmprsor->pBuf + 1, pCmprsor->aBuf[0] + 1, pCmprsor->nBuf - 1, pCmprsor->nBuf - 1);
if (szComp && szComp < pCmprsor->nBuf - 1) {
pCmprsor->aBuf[0][0] = 1;
*ppData = pCmprsor->aBuf[0];
*nData = szComp + 1;
} else {
pCmprsor->pBuf[0] = 0;
*ppData = pCmprsor->pBuf;
*nData = pCmprsor->nBuf;
}
return code;
}
......@@ -1655,9 +1900,8 @@ static int32_t tCompBinaryEnd(SCompressor *pCmprsor, const uint8_t **ppData, int
static const uint8_t BOOL_CMPR_TABLE[] = {0b01, 0b0100, 0b010000, 0b01000000};
static int32_t tCompBoolStart(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
int32_t code = 0;
pCmprsor->nBuf[0] = 0;
return code;
pCmprsor->nBuf = 0;
return 0;
}
static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData) {
......@@ -1665,19 +1909,18 @@ static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData
bool vBool = *(int8_t *)pData;
int32_t mod4 = pCmprsor->nVal & 3;
int32_t mod4 = (pCmprsor->nVal & 3);
if (mod4 == 0) {
pCmprsor->nBuf[0]++;
pCmprsor->nBuf++;
if (pCmprsor->autoAlloc) {
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0]);
if (code) return code;
if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->pBuf, pCmprsor->nBuf))) {
return code;
}
pCmprsor->aBuf[0][pCmprsor->nBuf[0] - 1] = 0;
pCmprsor->pBuf[pCmprsor->nBuf - 1] = 0;
}
if (vBool) {
pCmprsor->aBuf[0][pCmprsor->nBuf[0] - 1] |= BOOL_CMPR_TABLE[mod4];
pCmprsor->pBuf[pCmprsor->nBuf - 1] |= BOOL_CMPR_TABLE[mod4];
}
pCmprsor->nVal++;
......@@ -1686,7 +1929,30 @@ static int32_t tCompBool(SCompressor *pCmprsor, const void *pData, int32_t nData
static int32_t tCompBoolEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) {
int32_t code = 0;
// TODO
if (pCmprsor->cmprAlg == TWO_STAGE_COMP) {
if (pCmprsor->autoAlloc && (code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf + 1))) {
return code;
}
*ppData = pCmprsor->aBuf[0];
int32_t szComp = LZ4_compress_default(pCmprsor->pBuf, pCmprsor->aBuf[0] + 1, pCmprsor->nBuf, pCmprsor->nBuf);
if (szComp && szComp < pCmprsor->nBuf) {
pCmprsor->aBuf[0][0] = 1;
*nData = szComp + 1;
} else {
pCmprsor->aBuf[0][0] = 0;
memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->pBuf, pCmprsor->nBuf);
*nData = pCmprsor->nBuf + 1;
}
} else if (pCmprsor->cmprAlg == ONE_STAGE_COMP) {
*ppData = pCmprsor->pBuf;
*nData = pCmprsor->nBuf;
} else {
ASSERT(0);
}
return code;
}
......@@ -1697,23 +1963,17 @@ int32_t tCompressorCreate(SCompressor **ppCmprsor) {
*ppCmprsor = (SCompressor *)taosMemoryCalloc(1, sizeof(SCompressor));
if ((*ppCmprsor) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
code = tRealloc(&(*ppCmprsor)->aBuf[0], 1024);
if (code) {
taosMemoryFree(*ppCmprsor);
*ppCmprsor = NULL;
goto _exit;
return code;
}
_exit:
return code;
}
int32_t tCompressorDestroy(SCompressor *pCmprsor) {
int32_t code = 0;
tFree(pCmprsor->pBuf);
int32_t nBuf = sizeof(pCmprsor->aBuf) / sizeof(pCmprsor->aBuf[0]);
for (int32_t iBuf = 0; iBuf < nBuf; iBuf++) {
tFree(pCmprsor->aBuf[iBuf]);
......@@ -1745,6 +2005,8 @@ int32_t tCompressEnd(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nDa
*ppData = NULL;
*nData = 0;
if (pCmprsor->nVal == 0) return code;
if (DATA_TYPE_INFO[pCmprsor->type].endFn) {
return DATA_TYPE_INFO[pCmprsor->type].endFn(pCmprsor, ppData, nData);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册