提交 f2cd319d 编写于 作者: H Hongze Cheng

more code

上级 7a21781a
...@@ -1004,12 +1004,12 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co ...@@ -1004,12 +1004,12 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co
typedef struct { typedef struct {
int8_t type; int8_t type;
int8_t cmprAlg; int8_t cmprAlg;
int8_t autoAlloc;
uint8_t *aBuf[2]; uint8_t *aBuf[2];
int64_t nBuf[2]; int64_t nBuf[2];
union { union {
// Timestamp ---- // Timestamp ----
struct { struct {
int8_t ts_copy;
int32_t ts_n; int32_t ts_n;
int64_t ts_prev_val; int64_t ts_prev_val;
int64_t ts_prev_delta; int64_t ts_prev_delta;
...@@ -1051,124 +1051,123 @@ static int32_t tCompSetCopyMode(SCompressor *pCmprsor) { ...@@ -1051,124 +1051,123 @@ static int32_t tCompSetCopyMode(SCompressor *pCmprsor) {
int32_t code = 0; int32_t code = 0;
if (pCmprsor->ts_n) { if (pCmprsor->ts_n) {
code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * (pCmprsor->ts_n + 1)); if (pCmprsor->autoAlloc) {
if (code) return code; code = tRealloc(&pCmprsor->aBuf[1], sizeof(int64_t) * pCmprsor->ts_n);
pCmprsor->nBuf[1] = 1; if (code) return code;
}
pCmprsor->nBuf[1] = 0;
int64_t n = 1; int64_t n = 1;
int64_t valPrev; int64_t value;
int64_t delPrev; int64_t delta;
uint64_t vZigzag; uint64_t vZigzag;
while (n < pCmprsor->nBuf[0]) { while (n < pCmprsor->nBuf[0]) {
uint8_t n1 = pCmprsor->aBuf[0][0] & 0xf; uint8_t aN[2];
uint8_t n2 = pCmprsor->aBuf[0][0] >> 4; aN[0] = pCmprsor->aBuf[0][n] & 0xf;
aN[1] = pCmprsor->aBuf[0][n] >> 4;
n++; n++;
vZigzag = 0; for (int32_t i = 0; i < 2; i++) {
for (uint8_t i = 0; i < n1; i++) { vZigzag = 0;
vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (sizeof(int64_t) * i)); for (uint8_t j = 0; j < aN[i]; j++) {
n++; vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (8 * j));
} n++;
int64_t delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag); }
if (n == 2) {
delPrev = 0;
valPrev = delta_of_delta;
} else {
delPrev = delta_of_delta + delPrev;
valPrev = delPrev + valPrev;
}
memcpy(pCmprsor->aBuf[1] + pCmprsor->nBuf[1], &valPrev, sizeof(int64_t)); int64_t delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag);
pCmprsor->nBuf[1] += sizeof(int64_t); if (pCmprsor->nBuf[1] == 0) {
delta = 0;
value = delta_of_delta;
} else {
delta = delta_of_delta + delta;
value = delta + value;
}
if (n >= pCmprsor->nBuf[0]) break; memcpy(pCmprsor->aBuf[1] + pCmprsor->nBuf[1], &value, sizeof(int64_t));
pCmprsor->nBuf[1] += sizeof(int64_t);
vZigzag = 0; if (n >= pCmprsor->nBuf[0]) break;
for (uint8_t i = 0; i < n2; i++) {
vZigzag |= (((uint64_t)pCmprsor->aBuf[0][n]) << (sizeof(int64_t) * i));
n++;
} }
delta_of_delta = ZIGZAG_DECODE(int64_t, vZigzag);
delPrev = delta_of_delta + delPrev;
valPrev = delPrev + valPrev;
} }
uint8_t *pBuf = pCmprsor->aBuf[0]; ASSERT(n == pCmprsor->nBuf[0]);
pCmprsor->aBuf[0] = pCmprsor->aBuf[1];
pCmprsor->aBuf[1] = pBuf;
pCmprsor->nBuf[0] = pCmprsor->nBuf[1];
} else {
// TODO
}
pCmprsor->aBuf[0][0] = 0; if (pCmprsor->autoAlloc) {
pCmprsor->ts_copy = 1; code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[1] + 1);
if (code) return code;
}
memcpy(pCmprsor->aBuf[0] + 1, pCmprsor->aBuf[1], pCmprsor->nBuf[1]);
pCmprsor->nBuf[0] = 1 + pCmprsor->nBuf[1];
}
pCmprsor->aBuf[0][0] = MODE_NOCOMPRESS;
return code; return code;
} }
static int32_t tCompTimestamp(SCompressor *pCmprsor, TSKEY ts) { static int32_t tCompTimestamp(SCompressor *pCmprsor, int64_t ts) {
int32_t code = 0; int32_t code = 0;
if (pCmprsor->ts_n == 0) { ASSERT(pCmprsor->type == TSDB_DATA_TYPE_TIMESTAMP);
pCmprsor->ts_prev_val = ts;
pCmprsor->ts_prev_delta = -ts;
}
if (pCmprsor->ts_copy) goto _copy_exit;
if (!I64_SAFE_ADD(ts, -pCmprsor->ts_prev_val)) {
code = tCompSetCopyMode(pCmprsor);
if (code) return code;
goto _copy_exit;
}
int64_t delta = ts - pCmprsor->ts_prev_val;
if (!I64_SAFE_ADD(delta, -pCmprsor->ts_prev_delta)) { if (pCmprsor->aBuf[0][0] == MODE_COMPRESS) {
code = tCompSetCopyMode(pCmprsor); if (pCmprsor->ts_n == 0) {
if (code) return code; pCmprsor->ts_prev_val = ts;
goto _copy_exit; pCmprsor->ts_prev_delta = -ts;
} }
int64_t delta_of_delta = delta - pCmprsor->ts_prev_delta; if (!I64_SAFE_ADD(ts, -pCmprsor->ts_prev_val)) {
uint64_t zigzag_value = ZIGZAG_ENCODE(int64_t, delta_of_delta); code = tCompSetCopyMode(pCmprsor);
if (code) return code;
goto _copy_cmpr;
}
int64_t delta = ts - pCmprsor->ts_prev_val;
pCmprsor->ts_prev_val = ts; if (!I64_SAFE_ADD(delta, -pCmprsor->ts_prev_delta)) {
pCmprsor->ts_prev_delta = delta; code = tCompSetCopyMode(pCmprsor);
if (code) return code;
goto _copy_cmpr;
}
int64_t delta_of_delta = delta - pCmprsor->ts_prev_delta;
uint64_t vZigzag = ZIGZAG_ENCODE(int64_t, delta_of_delta);
if (pCmprsor->ts_n & 0x1 == 0) { pCmprsor->ts_prev_val = ts;
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17 /*sizeof(int64_t) * 2 + 1*/); pCmprsor->ts_prev_delta = delta;
if (code) return code;
pCmprsor->ts_flag_p = &pCmprsor->aBuf[0][pCmprsor->nBuf[0]]; if ((pCmprsor->ts_n & 0x1) == 0) {
pCmprsor->nBuf[0]++; if (pCmprsor->autoAlloc) {
pCmprsor->ts_flag_p[0] = 0; code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + 17);
if (code) return code;
}
while (zigzag_value) { pCmprsor->ts_flag_p = pCmprsor->aBuf[0] + pCmprsor->nBuf[0];
pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (zigzag_value & 0xff);
pCmprsor->nBuf[0]++; pCmprsor->nBuf[0]++;
pCmprsor->ts_flag_p[0]++; pCmprsor->ts_flag_p[0] = 0;
while (vZigzag) {
pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (vZigzag & 0xff);
pCmprsor->nBuf[0]++;
pCmprsor->ts_flag_p[0]++;
vZigzag >>= 8;
}
} else {
while (vZigzag) {
pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (vZigzag & 0xff);
pCmprsor->nBuf[0]++;
pCmprsor->ts_flag_p[0] += 0x10;
vZigzag >>= 8;
}
} }
} else { } else {
while (zigzag_value) { _copy_cmpr:
pCmprsor->aBuf[0][pCmprsor->nBuf[0]] = (zigzag_value & 0xff); if (pCmprsor->autoAlloc) {
pCmprsor->nBuf[0]++; code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(ts));
pCmprsor->ts_flag_p += (uint8_t)0x10; if (code) return code;
} }
}
memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts));
pCmprsor->nBuf[0] += sizeof(ts);
}
pCmprsor->ts_n++; pCmprsor->ts_n++;
return code;
_copy_exit:
code = tRealloc(&pCmprsor->aBuf[0], pCmprsor->nBuf[0] + sizeof(int64_t));
if (code) return code;
memcpy(pCmprsor->aBuf[0] + pCmprsor->nBuf[0], &ts, sizeof(ts));
pCmprsor->nBuf[0] += sizeof(ts);
pCmprsor->ts_n++;
return code; return code;
} }
...@@ -1432,17 +1431,22 @@ int32_t tCompressorDestroy(SCompressor *pCmprsor) { ...@@ -1432,17 +1431,22 @@ int32_t tCompressorDestroy(SCompressor *pCmprsor) {
return code; return code;
} }
int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg, int8_t autoAlloc) {
int32_t code = 0; int32_t code = 0;
pCmprsor->type = type; pCmprsor->type = type;
pCmprsor->cmprAlg = cmprAlg; pCmprsor->cmprAlg = cmprAlg;
pCmprsor->nBuf[0] = 0; // (todo) may or may not +/- 1 pCmprsor->autoAlloc = autoAlloc;
switch (type) { switch (type) {
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
pCmprsor->ts_copy = 0;
pCmprsor->ts_n = 0; pCmprsor->ts_n = 0;
pCmprsor->ts_prev_val = 0;
pCmprsor->ts_prev_delta = 0;
pCmprsor->ts_flag_p = NULL;
pCmprsor->aBuf[0][0] = MODE_COMPRESS;
pCmprsor->nBuf[0] = 1;
pCmprsor->nBuf[1] = 0;
break; break;
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
pCmprsor->bool_n = 0; pCmprsor->bool_n = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册