Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
832f2fb7
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
832f2fb7
编写于
9月 16, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more code
上级
6b45cf18
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
156 addition
and
8 deletion
+156
-8
source/dnode/vnode/src/tsdb/tsdbCompress.c
source/dnode/vnode/src/tsdb/tsdbCompress.c
+156
-8
未找到文件。
source/dnode/vnode/src/tsdb/tsdbCompress.c
浏览文件 @
832f2fb7
...
@@ -16,6 +16,8 @@
...
@@ -16,6 +16,8 @@
#include "lz4.h"
#include "lz4.h"
#include "tsdb.h"
#include "tsdb.h"
#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a)))
typedef
struct
{
typedef
struct
{
int8_t
type
;
int8_t
type
;
int8_t
cmprAlg
;
int8_t
cmprAlg
;
...
@@ -24,7 +26,11 @@ typedef struct {
...
@@ -24,7 +26,11 @@ typedef struct {
union
{
union
{
// Timestamp ----
// Timestamp ----
struct
{
struct
{
/* data */
int8_t
ts_copy
;
int32_t
ts_n
;
int64_t
ts_prev_val
;
int64_t
ts_prev_delta
;
uint8_t
*
ts_flag_p
;
};
};
// Integer ----
// Integer ----
struct
{
struct
{
...
@@ -46,15 +52,133 @@ typedef struct {
...
@@ -46,15 +52,133 @@ typedef struct {
}
SCompressor
;
}
SCompressor
;
// Timestamp =====================================================
// Timestamp =====================================================
static
int32_t
tCompSetCopyMode
(
SCompressor
*
pCmprsor
)
{
int32_t
code
=
0
;
if
(
pCmprsor
->
ts_n
)
{
code
=
tRealloc
(
&
pCmprsor
->
aBuf
[
1
],
sizeof
(
int64_t
)
*
(
pCmprsor
->
ts_n
+
1
));
if
(
code
)
return
code
;
pCmprsor
->
nBuf
[
1
]
=
1
;
int64_t
n
=
1
;
int64_t
valPrev
;
int64_t
delPrev
;
uint64_t
vZigzag
;
while
(
n
<
pCmprsor
->
nBuf
[
0
])
{
uint8_t
n1
=
pCmprsor
->
aBuf
[
0
][
0
]
&
0xf
;
uint8_t
n2
=
pCmprsor
->
aBuf
[
0
][
0
]
>>
4
;
n
++
;
vZigzag
=
0
;
for
(
uint8_t
i
=
0
;
i
<
n1
;
i
++
)
{
vZigzag
|=
(((
uint64_t
)
pCmprsor
->
aBuf
[
0
][
n
])
<<
(
sizeof
(
int64_t
)
*
i
));
n
++
;
}
int64_t
delta_of_delta
=
ZIGZAGD
(
int64_t
,
vZigzag
);
if
(
n
==
2
)
{
delPrev
=
0
;
valPrev
=
delta_of_delta
;
}
else
{
delPrev
=
delta_of_delta
+
delPrev
;
valPrev
=
delPrev
+
valPrev
;
}
memcpy
(
pCmprsor
->
aBuf
[
1
]
+
pCmprsor
->
nBuf
[
1
],
&
valPrev
,
sizeof
(
int64_t
));
pCmprsor
->
nBuf
[
1
]
+=
sizeof
(
int64_t
);
if
(
n
>=
pCmprsor
->
nBuf
[
0
])
break
;
vZigzag
=
0
;
for
(
uint8_t
i
=
0
;
i
<
n2
;
i
++
)
{
vZigzag
|=
(((
uint64_t
)
pCmprsor
->
aBuf
[
0
][
n
])
<<
(
sizeof
(
int64_t
)
*
i
));
n
++
;
}
int64_t
delta_of_delta
=
ZIGZAGD
(
int64_t
,
vZigzag
);
delPrev
=
delta_of_delta
+
delPrev
;
valPrev
=
delPrev
+
valPrev
;
}
uint8_t
*
pBuf
=
pCmprsor
->
aBuf
[
0
];
pCmprsor
->
aBuf
[
0
]
=
pCmprsor
->
aBuf
[
1
];
pCmprsor
->
aBuf
[
1
]
=
pBuf
;
pCmprsor
->
nBuf
[
0
]
=
pCmprsor
->
nBuf
[
1
];
}
else
{
// TODO
}
pCmprsor
->
aBuf
[
0
][
0
]
=
0
;
pCmprsor
->
ts_copy
=
1
;
return
code
;
}
static
int32_t
tCompTimestamp
(
SCompressor
*
pCmprsor
,
TSKEY
ts
)
{
static
int32_t
tCompTimestamp
(
SCompressor
*
pCmprsor
,
TSKEY
ts
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
// TODO
if
(
pCmprsor
->
ts_n
==
0
)
{
pCmprsor
->
ts_prev_val
=
ts
;
pCmprsor
->
ts_prev_delta
=
-
ts
;
}
if
(
pCmprsor
->
ts_copy
)
goto
_copy_exit
;
if
(
!
I64_SAFE_ADD
(
ts
,
-
pCmprsor
->
ts_prev_val
))
{
code
=
tCompSetCopyMode
(
pCmprsor
);
if
(
code
)
return
code
;
goto
_copy_exit
;
}
int64_t
delta
=
ts
-
pCmprsor
->
ts_prev_val
;
if
(
!
I64_SAFE_ADD
(
delta
,
-
pCmprsor
->
ts_prev_delta
))
{
code
=
tCompSetCopyMode
(
pCmprsor
);
if
(
code
)
return
code
;
goto
_copy_exit
;
}
int64_t
delta_of_delta
=
delta
-
pCmprsor
->
ts_prev_delta
;
uint64_t
zigzag_value
=
ZIGZAGE
(
int64_t
,
delta_of_delta
);
pCmprsor
->
ts_prev_val
=
ts
;
pCmprsor
->
ts_prev_delta
=
delta
;
if
(
pCmprsor
->
ts_n
&
0x1
==
0
)
{
code
=
tRealloc
(
&
pCmprsor
->
aBuf
[
0
],
pCmprsor
->
nBuf
[
0
]
+
17
/*sizeof(int64_t) * 2 + 1*/
);
if
(
code
)
return
code
;
pCmprsor
->
ts_flag_p
=
&
pCmprsor
->
aBuf
[
0
][
pCmprsor
->
nBuf
[
0
]];
pCmprsor
->
nBuf
[
0
]
++
;
pCmprsor
->
ts_flag_p
[
0
]
=
0
;
while
(
zigzag_value
)
{
pCmprsor
->
aBuf
[
0
][
pCmprsor
->
nBuf
[
0
]]
=
(
zigzag_value
&
0xff
);
pCmprsor
->
nBuf
[
0
]
++
;
pCmprsor
->
ts_flag_p
[
0
]
++
;
}
}
else
{
while
(
zigzag_value
)
{
pCmprsor
->
aBuf
[
0
][
pCmprsor
->
nBuf
[
0
]]
=
(
zigzag_value
&
0xff
);
pCmprsor
->
nBuf
[
0
]
++
;
pCmprsor
->
ts_flag_p
+=
(
uint8_t
)
0x10
;
}
}
pCmprsor
->
ts_n
++
;
return
code
;
_copy_exit:
code
=
tRealloc
(
&
pCmprsor
->
aBuf
[
0
],
pCmprsor
->
nBuf
[
0
]
+
sizeof
(
int64_t
));
if
(
code
)
return
code
;
memcpy
(
pCmprsor
->
aBuf
[
0
]
+
pCmprsor
->
nBuf
[
0
],
&
ts
,
sizeof
(
ts
));
pCmprsor
->
nBuf
[
0
]
+=
sizeof
(
ts
);
pCmprsor
->
ts_n
++
;
return
code
;
return
code
;
}
}
// Integer =====================================================
// Integer =====================================================
#define I64_SAFE_ADD(a, b) (((a) >= 0 && (b) <= INT64_MAX - (b)) || ((a) < 0 && (b) >= INT64_MIN - (a)))
#define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL)
#define SIMPLE8B_MAX ((uint64_t)1152921504606846974LL)
static
int32_t
tCompI64
(
SCompressor
*
pCmprsor
,
int64_t
val
)
{
static
int32_t
tCompI64
(
SCompressor
*
pCmprsor
,
int64_t
val
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
...
@@ -110,7 +234,8 @@ static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t
...
@@ -110,7 +234,8 @@ static int32_t tCompBinary(SCompressor *pCmprsor, const uint8_t *pData, int32_t
}
}
// Bool =====================================================
// Bool =====================================================
static
uint8_t
BOOL_CMPR_TABLE
[]
=
{
0
b01
,
0
b0100
,
0
b010000
,
0
b01000000
};
static
const
uint8_t
BOOL_CMPR_TABLE
[]
=
{
0
b01
,
0
b0100
,
0
b010000
,
0
b01000000
};
static
int32_t
tCompBool
(
SCompressor
*
pCmprsor
,
bool
vBool
)
{
static
int32_t
tCompBool
(
SCompressor
*
pCmprsor
,
bool
vBool
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
...
@@ -122,6 +247,9 @@ static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) {
...
@@ -122,6 +247,9 @@ static int32_t tCompBool(SCompressor *pCmprsor, bool vBool) {
if
(
mod4
==
3
)
{
if
(
mod4
==
3
)
{
pCmprsor
->
nBuf
[
0
]
++
;
pCmprsor
->
nBuf
[
0
]
++
;
pCmprsor
->
aBuf
[
0
][
pCmprsor
->
nBuf
[
0
]]
=
0
;
pCmprsor
->
aBuf
[
0
][
pCmprsor
->
nBuf
[
0
]]
=
0
;
code
=
tRealloc
(
&
pCmprsor
->
aBuf
[
0
],
pCmprsor
->
nBuf
[
0
]);
if
(
code
)
goto
_exit
;
}
}
_exit:
_exit:
...
@@ -138,6 +266,13 @@ int32_t tCompressorCreate(SCompressor **ppCmprsor) {
...
@@ -138,6 +266,13 @@ int32_t tCompressorCreate(SCompressor **ppCmprsor) {
goto
_exit
;
goto
_exit
;
}
}
code
=
tRealloc
(
&
(
*
ppCmprsor
)
->
aBuf
[
0
],
1024
);
if
(
code
)
{
taosMemoryFree
(
*
ppCmprsor
);
*
ppCmprsor
=
NULL
;
goto
_exit
;
}
_exit:
_exit:
return
code
;
return
code
;
}
}
...
@@ -146,9 +281,12 @@ int32_t tCompressorDestroy(SCompressor *pCmprsor) {
...
@@ -146,9 +281,12 @@ int32_t tCompressorDestroy(SCompressor *pCmprsor) {
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
pCmprsor
)
{
if
(
pCmprsor
)
{
for
(
int32_t
iBuf
=
0
;
iBuf
<
sizeof
(
pCmprsor
->
aBuf
)
/
sizeof
(
pCmprsor
->
aBuf
[
0
]);
iBuf
++
)
{
int32_t
nBuf
=
sizeof
(
pCmprsor
->
aBuf
)
/
sizeof
(
pCmprsor
->
aBuf
[
0
]);
for
(
int32_t
iBuf
=
0
;
iBuf
<
nBuf
;
iBuf
++
)
{
tFree
(
pCmprsor
->
aBuf
[
iBuf
]);
tFree
(
pCmprsor
->
aBuf
[
iBuf
]);
}
}
taosMemoryFree
(
pCmprsor
);
}
}
return
code
;
return
code
;
...
@@ -159,13 +297,17 @@ int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
...
@@ -159,13 +297,17 @@ int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
pCmprsor
->
type
=
type
;
pCmprsor
->
type
=
type
;
pCmprsor
->
cmprAlg
=
cmprAlg
;
pCmprsor
->
cmprAlg
=
cmprAlg
;
pCmprsor
->
nBuf
[
0
]
=
0
;
// (todo) may or may not +/- 1
switch
(
type
)
{
switch
(
type
)
{
case
TSDB_DATA_TYPE_TIMESTAMP
:
pCmprsor
->
ts_copy
=
0
;
pCmprsor
->
ts_n
=
0
;
break
;
case
TSDB_DATA_TYPE_BOOL
:
case
TSDB_DATA_TYPE_BOOL
:
pCmprsor
->
bool_n
=
0
;
pCmprsor
->
bool_n
=
0
;
pCmprsor
->
bool_b
=
0
;
pCmprsor
->
aBuf
[
0
][
0
]
=
0
;
break
;
break
;
default:
default:
break
;
break
;
}
}
...
@@ -200,3 +342,9 @@ int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int64_t *nData)
...
@@ -200,3 +342,9 @@ int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int64_t *nData)
_exit:
_exit:
return
code
;
return
code
;
}
}
int32_t
tCompress
(
SCompressor
*
pCmprsor
,
void
*
pData
,
int64_t
nData
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录