Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
593f694f
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
593f694f
编写于
11月 21, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into enh/row_optimize
上级
e69821a6
9ffcef81
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
278 addition
and
75 deletion
+278
-75
docs/zh/05-get-started/index.md
docs/zh/05-get-started/index.md
+12
-1
include/libs/stream/streamState.h
include/libs/stream/streamState.h
+8
-3
packaging/docker/DockerfileCloud
packaging/docker/DockerfileCloud
+3
-0
packaging/docker/run.sh
packaging/docker/run.sh
+175
-7
source/dnode/vnode/src/tsdb/tsdbUtil.c
source/dnode/vnode/src/tsdb/tsdbUtil.c
+1
-1
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+79
-63
未找到文件。
docs/zh/05-get-started/index.md
浏览文件 @
593f694f
...
...
@@ -18,7 +18,18 @@ import {useCurrentSidebarCategory} from '@docusaurus/theme-common';
<DocCardList items={useCurrentSidebarCategory().items}/>
```
### 加入 TDengine 官方社区
## 学习 TDengine 知识地图
TDengine 知识地图中涵盖了 TDengine 的各种知识点,揭示了各概念实体之间的调用关系和数据流向。学习和了解 TDengine 知识地图有助于你快速掌握 TDengine 的知识体系。
<figure>
<center>
<a
href=
"/img/tdengine-map.svg"
target=
"_blank"
><img
src=
"/img/tdengine-map.svg"
width=
"80%"
/></a>
<figcaption>
图 1. TDengine 知识地图
</figcaption>
</center>
</figure>
## 加入 TDengine 官方社区
微信扫描以下二维码,学习了解 TDengine 的最新技术,与大家共同交流物联网大数据技术应用、TDengine 使用问题和技巧等话题。
...
...
include/libs/stream/streamState.h
浏览文件 @
593f694f
...
...
@@ -27,8 +27,7 @@ typedef struct SStreamTask SStreamTask;
typedef
bool
(
*
state_key_cmpr_fn
)(
void
*
pKey1
,
void
*
pKey2
);
// incremental state storage
typedef
struct
{
typedef
struct
STdbState
{
SStreamTask
*
pOwner
;
TDB
*
db
;
TTB
*
pStateDb
;
...
...
@@ -37,7 +36,12 @@ typedef struct {
TTB
*
pSessionStateDb
;
TTB
*
pParNameDb
;
TXN
txn
;
int32_t
number
;
}
STdbState
;
// incremental state storage
typedef
struct
{
STdbState
*
pTdbState
;
int32_t
number
;
}
SStreamState
;
SStreamState
*
streamStateOpen
(
char
*
path
,
SStreamTask
*
pTask
,
bool
specPath
,
int32_t
szPage
,
int32_t
pages
);
...
...
@@ -45,6 +49,7 @@ void streamStateClose(SStreamState* pState);
int32_t
streamStateBegin
(
SStreamState
*
pState
);
int32_t
streamStateCommit
(
SStreamState
*
pState
);
int32_t
streamStateAbort
(
SStreamState
*
pState
);
void
streamStateDestroy
(
SStreamState
*
pState
);
typedef
struct
{
TBC
*
pCur
;
...
...
packaging/docker/DockerfileCloud
浏览文件 @
593f694f
...
...
@@ -7,6 +7,9 @@ ARG dirName
ARG cpuType
RUN echo ${pkgFile} && echo ${dirName}
RUN apt update
RUN apt install -y curl
COPY ${pkgFile} /root/
ENV TINI_VERSION v0.19.0
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-${cpuType} /tini
...
...
packaging/docker/run.sh
100644 → 100755
浏览文件 @
593f694f
#!/bin/bash
TAOS_RUN_TAOSBENCHMARK_TEST_ONCE
=
0
#ADMIN_URL=${ADMIN_URL:-http://172.26.10.84:10001}
TAOSD_STARTUP_TIMEOUT_SECOND
=
${
TAOSD_STARTUP_TIMEOUT_SECOND
:-
160
}
TAOS_TIMEOUT_SECOND
=
${
TAOS_TIMEOUT_SECOND
:-
5
}
BACKUP_CORE_FOLDER
=
/data/corefile
ALERT_URL
=
app/system/alert/add
echo
"ADMIN_URL:
${
ADMIN_URL
}
"
echo
"TAOS_TIMEOUT_SECOND:
${
TAOS_TIMEOUT_SECOND
}
"
function
set_service_state
()
{
#echo "set service state: $1, $2"
service_state
=
"
$1
"
service_msg
=
"
$2
"
}
set_service_state
"init"
"ok"
app_name
=
`
hostname
|cut
-d
\-
-f1
`
function
check_taosd
()
{
timeout
$TAOS_TIMEOUT_SECOND
taos
-s
"show databases;"
>
/dev/null
local
ret
=
$?
if
[
$ret
-ne
0
]
;
then
echo
"
`
date
`
check taosd error
$ret
"
if
[
"x
$1
"
!=
"xignore"
]
;
then
set_service_state
"error"
"taos check failed
$ret
"
fi
else
set_service_state
"ready"
"ok"
fi
}
function
post_error_msg
()
{
if
[
!
-z
"
${
ADMIN_URL
}
"
]
;
then
taos_version
=
`
taos
--version
`
echo
"app_name:
${
app_name
}
"
echo
"service_state:
${
service_state
}
"
echo
"
`
date
`
service_msg:
${
service_msg
}
"
echo
"
${
taos_version
}
"
curl
-X
POST
-H
"Content-Type: application/json"
\
-d
"{
\"
appName
\"
:
\"
${
app_name
}
\"
,
\
\"
alertLevel
\"
:
\"
${
service_state
}
\"
,
\
\"
taosVersion
\"
:
\"
${
taos_version
}
\"
,
\
\"
alertMsg
\"
:
\"
${
service_msg
}
\"
}"
\
${
ADMIN_URL
}
/
${
ALERT_URL
}
fi
}
function
check_taosd_exit_type
()
{
local
core_pattern
=
`
cat
/proc/sys/kernel/core_pattern
`
echo
"
$core_pattern
"
|
grep
-q
"^/"
if
[
$?
-eq
0
]
;
then
core_folder
=
`
dirname
$core_pattern
`
core_prefix
=
`
basename
$core_pattern
|
sed
"s/%.*//"
`
else
core_folder
=
`
pwd
`
core_prefix
=
"
$core_pattern
"
fi
local
core_files
=
`
ls
$core_folder
|
grep
"^
${
core_prefix
}
"
`
if
[
!
-z
"
$core_files
"
]
;
then
# move core files to another folder
mkdir
-p
${
BACKUP_CORE_FOLDER
}
mv
${
core_folder
}
/
${
core_prefix
}*
${
BACKUP_CORE_FOLDER
}
/
set_service_state
"error"
"taosd exit with core file"
else
set_service_state
"error"
"taosd exit without core file"
fi
}
disk_usage_level
=(
60 80 99
)
current_disk_level
=
0
disk_state
=
"ok"
disk_msg
=
"ok"
get_usage_ok
=
"yes"
function
post_disk_error_msg
()
{
if
[
!
-z
"
${
ADMIN_URL
}
"
]
;
then
taos_version
=
`
taos
--version
`
echo
"app_name:
${
app_name
}
"
echo
"disk_state:
${
disk_state
}
"
echo
"
`
date
`
disk_msg:
${
disk_msg
}
"
echo
"
${
taos_version
}
"
curl
-X
POST
-H
"Content-Type: application/json"
\
-d
"{
\"
appName
\"
:
\"
${
app_name
}
\"
,
\
\"
alertLevel
\"
:
\"
${
disk_state
}
\"
,
\
\"
taosVersion
\"
:
\"
${
taos_version
}
\"
,
\
\"
alertMsg
\"
:
\"
${
disk_msg
}
\"
}"
\
${
ADMIN_URL
}
/
${
ALERT_URL
}
fi
}
function
check_disk
()
{
local
folder
=
`
cat
/etc/taos/taos.cfg|grep
-v
"^#"
|grep dataDir|awk
'{print $NF}'
`
if
[
-z
"
$folder
"
]
;
then
folder
=
"/var/lib/taos"
fi
local
mount_point
=
"
$folder
"
local
usage
=
""
while
[
-z
"
$usage
"
]
;
do
usage
=
`
df
-h
|grep
-w
"
${
mount_point
}
"
|awk
'{print $5}'
|grep
-v
Use|sed
"s/%
$/
/"
`
if
[
"x
${
mount_point
}
"
=
"x/"
]
;
then
break
fi
mount_point
=
`
dirname
${
mount_point
}
`
done
if
[
-z
"
$usage
"
]
;
then
disk_state
=
"error"
disk_msg
=
"cannot get disk usage"
if
[
"
$get_usage_ok
"
=
"yes"
]
;
then
post_disk_error_msg
get_usage_ok
=
"no"
fi
else
get_usage_ok
=
"yes"
local
current_level
=
0
for
level
in
${
disk_usage_level
[*]
}
;
do
if
[
${
usage
}
-ge
${
level
}
]
;
then
disk_state
=
"error"
disk_msg
=
"disk usage over
${
level
}
%"
current_level
=
${
level
}
fi
done
if
[
${
current_level
}
-gt
${
current_disk_level
}
]
;
then
post_disk_error_msg
elif
[
${
current_level
}
-lt
${
current_disk_level
}
]
;
then
echo
"disk usage reduced from
${
current_disk_level
}
to
${
current_level
}
"
fi
current_disk_level
=
${
current_level
}
fi
}
function
run_taosd
()
{
taosd
set_service_state
"error"
"taosd exit"
# post error msg
# check crash or OOM
check_taosd_exit_type
post_error_msg
}
function
print_service_state_change
()
{
if
[
"x
$1
"
!=
"x
${
service_state
}
"
]
;
then
echo
"
`
date
`
service state:
${
service_state
}
,
${
service_msg
}
"
fi
}
taosd_start_time
=
`
date
+%s
`
while
((
1
))
do
check_disk
# echo "outer loop: $a"
sleep
10
output
=
`
taos
-k
`
status
=
${
output
:0:1
}
output
=
`
timeout
$TAOS_TIMEOUT_SECOND
taos
-k
`
if
[
-z
"
${
output
}
"
]
;
then
echo
"
`
date
`
taos -k error"
status
=
""
else
status
=
${
output
:0:1
}
fi
# echo $output
# echo $status
if
[
"
$status
"
x
=
"0"
x
]
then
taosd &
# taosd_start_time=`date +%s`
run_taosd &
fi
# echo "$status"x "$TAOS_RUN_TAOSBENCHMARK_TEST"x "$TAOS_RUN_TAOSBENCHMARK_TEST_ONCE"x
if
[
"
$status
"
x
=
"2"
x
]
&&
[
"
$TAOS_RUN_TAOSBENCHMARK_TEST
"
x
=
"1"
x
]
&&
[
"
$TAOS_RUN_TAOSBENCHMARK_TEST_ONCE
"
x
=
"0"
x
]
...
...
@@ -24,13 +168,37 @@ do
taos
-s
"select stable_name from information_schema.ins_stables where db_name = 'test';"
|grep
-q
-w
meters
if
[
$?
-ne
0
]
;
then
taosBenchmark
-y
-t
1000
-n
1000
-S
900000
taos
-s
"create user admin_user pass 'NDS65R6t' sysinfo 0;"
taos
-s
"GRANT ALL on test.* to admin_user;"
taos
-s
"create user admin_user pass 'NDS65R6t' sysinfo 0;"
taos
-s
"GRANT ALL on test.* to admin_user;"
fi
fi
# check taosd status
if
[
"
$service_state
"
=
"ready"
]
;
then
# check taosd status
check_taosd
print_service_state_change
"ready"
if
[
"
$service_state
"
=
"error"
]
;
then
post_error_msg
fi
elif
[
"
$service_state
"
=
"init"
]
;
then
check_taosd
"ignore"
# check timeout
current_time
=
`
date
+%s
`
time_elapsed
=
$((
current_time
-
taosd_start_time
))
if
[
${
time_elapsed
}
-gt
${
TAOSD_STARTUP_TIMEOUT_SECOND
}
]
;
then
set_service_state
"error"
"taosd startup timeout"
post_error_msg
fi
print_service_state_change
"init"
elif
[
"
$service_state
"
=
"error"
]
;
then
# check taosd status
check_taosd
print_service_state_change
"error"
fi
# check taosadapter
nc
-z
localhost 6041
if
[
$?
-ne
0
]
;
then
taosadapter &
taosadapter &
fi
sleep
30
done
source/dnode/vnode/src/tsdb/tsdbUtil.c
浏览文件 @
593f694f
...
...
@@ -1698,7 +1698,7 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol
size
+=
pBlockCol
->
szBitmap
;
// offset
if
(
IS_VAR_DATA_TYPE
(
pColData
->
type
))
{
if
(
IS_VAR_DATA_TYPE
(
pColData
->
type
)
&&
pColData
->
flag
!=
(
HAS_NULL
|
HAS_NONE
)
)
{
code
=
tsdbCmprData
((
uint8_t
*
)
pColData
->
aOffset
,
sizeof
(
int32_t
)
*
pColData
->
nVal
,
TSDB_DATA_TYPE_INT
,
cmprAlg
,
ppOut
,
nOut
+
size
,
&
pBlockCol
->
szOffset
,
ppBuf
);
if
(
code
)
goto
_exit
;
...
...
source/libs/stream/src/streamState.c
浏览文件 @
593f694f
...
...
@@ -114,6 +114,12 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pState
->
pTdbState
=
taosMemoryCalloc
(
1
,
sizeof
(
STdbState
));
if
(
pState
->
pTdbState
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
streamStateDestroy
(
pState
);
return
NULL
;
}
char
statePath
[
1024
];
if
(
!
specPath
)
{
...
...
@@ -122,31 +128,34 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
memset
(
statePath
,
0
,
1024
);
tstrncpy
(
statePath
,
path
,
1024
);
}
if
(
tdbOpen
(
statePath
,
szPage
,
pages
,
&
pState
->
db
,
0
)
<
0
)
{
if
(
tdbOpen
(
statePath
,
szPage
,
pages
,
&
pState
->
pTdbState
->
db
,
0
)
<
0
)
{
goto
_err
;
}
// open state storage backend
if
(
tdbTbOpen
(
"state.db"
,
sizeof
(
SStateKey
),
-
1
,
stateKeyCmpr
,
pState
->
db
,
&
pState
->
pStateDb
,
0
)
<
0
)
{
if
(
tdbTbOpen
(
"state.db"
,
sizeof
(
SStateKey
),
-
1
,
stateKeyCmpr
,
pState
->
pTdbState
->
db
,
&
pState
->
pTdbState
->
pStateDb
,
0
)
<
0
)
{
goto
_err
;
}
// todo refactor
if
(
tdbTbOpen
(
"fill.state.db"
,
sizeof
(
SWinKey
),
-
1
,
winKeyCmpr
,
pState
->
db
,
&
pState
->
pFillStateDb
,
0
)
<
0
)
{
if
(
tdbTbOpen
(
"fill.state.db"
,
sizeof
(
SWinKey
),
-
1
,
winKeyCmpr
,
pState
->
pTdbState
->
db
,
&
pState
->
pTdbState
->
pFillStateDb
,
0
)
<
0
)
{
goto
_err
;
}
if
(
tdbTbOpen
(
"session.state.db"
,
sizeof
(
SStateSessionKey
),
-
1
,
stateSessionKeyCmpr
,
pState
->
db
,
&
pState
->
pSessionStateDb
,
0
)
<
0
)
{
if
(
tdbTbOpen
(
"session.state.db"
,
sizeof
(
SStateSessionKey
),
-
1
,
stateSessionKeyCmpr
,
pState
->
pTdbState
->
db
,
&
pState
->
p
TdbState
->
p
SessionStateDb
,
0
)
<
0
)
{
goto
_err
;
}
if
(
tdbTbOpen
(
"func.state.db"
,
sizeof
(
STupleKey
),
-
1
,
STupleKeyCmpr
,
pState
->
db
,
&
pState
->
pFuncStateDb
,
0
)
<
0
)
{
if
(
tdbTbOpen
(
"func.state.db"
,
sizeof
(
STupleKey
),
-
1
,
STupleKeyCmpr
,
pState
->
pTdbState
->
db
,
&
pState
->
pTdbState
->
pFuncStateDb
,
0
)
<
0
)
{
goto
_err
;
}
if
(
tdbTbOpen
(
"parname.state.db"
,
sizeof
(
int64_t
),
TSDB_TABLE_NAME_LEN
,
NULL
,
pState
->
db
,
&
pState
->
pParNameDb
,
0
)
<
0
)
{
if
(
tdbTbOpen
(
"parname.state.db"
,
sizeof
(
int64_t
),
TSDB_TABLE_NAME_LEN
,
NULL
,
pState
->
pTdbState
->
db
,
&
pState
->
pTdbState
->
pParNameDb
,
0
)
<
0
)
{
goto
_err
;
}
...
...
@@ -154,117 +163,117 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
goto
_err
;
}
pState
->
pOwner
=
pTask
;
pState
->
p
TdbState
->
p
Owner
=
pTask
;
return
pState
;
_err:
tdbTbClose
(
pState
->
pStateDb
);
tdbTbClose
(
pState
->
pFuncStateDb
);
tdbTbClose
(
pState
->
pFillStateDb
);
tdbTbClose
(
pState
->
pSessionStateDb
);
tdbTbClose
(
pState
->
pParNameDb
);
tdbClose
(
pState
->
db
);
taosMemoryFree
(
pState
);
tdbTbClose
(
pState
->
p
TdbState
->
p
StateDb
);
tdbTbClose
(
pState
->
p
TdbState
->
p
FuncStateDb
);
tdbTbClose
(
pState
->
p
TdbState
->
p
FillStateDb
);
tdbTbClose
(
pState
->
p
TdbState
->
p
SessionStateDb
);
tdbTbClose
(
pState
->
p
TdbState
->
p
ParNameDb
);
tdbClose
(
pState
->
pTdbState
->
db
);
streamStateDestroy
(
pState
);
return
NULL
;
}
void
streamStateClose
(
SStreamState
*
pState
)
{
tdbCommit
(
pState
->
db
,
&
p
State
->
txn
);
tdbPostCommit
(
pState
->
db
,
&
p
State
->
txn
);
tdbTbClose
(
pState
->
pStateDb
);
tdbTbClose
(
pState
->
pFuncStateDb
);
tdbTbClose
(
pState
->
pFillStateDb
);
tdbTbClose
(
pState
->
pSessionStateDb
);
tdbTbClose
(
pState
->
pParNameDb
);
tdbClose
(
pState
->
db
);
tdbCommit
(
pState
->
pTdbState
->
db
,
&
pState
->
pTdb
State
->
txn
);
tdbPostCommit
(
pState
->
pTdbState
->
db
,
&
pState
->
pTdb
State
->
txn
);
tdbTbClose
(
pState
->
p
TdbState
->
p
StateDb
);
tdbTbClose
(
pState
->
p
TdbState
->
p
FuncStateDb
);
tdbTbClose
(
pState
->
p
TdbState
->
p
FillStateDb
);
tdbTbClose
(
pState
->
p
TdbState
->
p
SessionStateDb
);
tdbTbClose
(
pState
->
p
TdbState
->
p
ParNameDb
);
tdbClose
(
pState
->
pTdbState
->
db
);
taosMemoryFree
(
pState
);
streamStateDestroy
(
pState
);
}
int32_t
streamStateBegin
(
SStreamState
*
pState
)
{
if
(
tdbTxnOpen
(
&
pState
->
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
if
(
tdbTxnOpen
(
&
pState
->
pTdbState
->
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
return
-
1
;
}
if
(
tdbBegin
(
pState
->
db
,
&
p
State
->
txn
)
<
0
)
{
tdbTxnClose
(
&
pState
->
txn
);
if
(
tdbBegin
(
pState
->
pTdbState
->
db
,
&
pState
->
pTdb
State
->
txn
)
<
0
)
{
tdbTxnClose
(
&
pState
->
pTdbState
->
txn
);
return
-
1
;
}
return
0
;
}
int32_t
streamStateCommit
(
SStreamState
*
pState
)
{
if
(
tdbCommit
(
pState
->
db
,
&
p
State
->
txn
)
<
0
)
{
if
(
tdbCommit
(
pState
->
pTdbState
->
db
,
&
pState
->
pTdb
State
->
txn
)
<
0
)
{
return
-
1
;
}
if
(
tdbPostCommit
(
pState
->
db
,
&
p
State
->
txn
)
<
0
)
{
if
(
tdbPostCommit
(
pState
->
pTdbState
->
db
,
&
pState
->
pTdb
State
->
txn
)
<
0
)
{
return
-
1
;
}
memset
(
&
pState
->
txn
,
0
,
sizeof
(
TXN
));
if
(
tdbTxnOpen
(
&
pState
->
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
memset
(
&
pState
->
pTdbState
->
txn
,
0
,
sizeof
(
TXN
));
if
(
tdbTxnOpen
(
&
pState
->
pTdbState
->
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
return
-
1
;
}
if
(
tdbBegin
(
pState
->
db
,
&
p
State
->
txn
)
<
0
)
{
if
(
tdbBegin
(
pState
->
pTdbState
->
db
,
&
pState
->
pTdb
State
->
txn
)
<
0
)
{
return
-
1
;
}
return
0
;
}
int32_t
streamStateAbort
(
SStreamState
*
pState
)
{
if
(
tdbAbort
(
pState
->
db
,
&
p
State
->
txn
)
<
0
)
{
if
(
tdbAbort
(
pState
->
pTdbState
->
db
,
&
pState
->
pTdb
State
->
txn
)
<
0
)
{
return
-
1
;
}
memset
(
&
pState
->
txn
,
0
,
sizeof
(
TXN
));
if
(
tdbTxnOpen
(
&
pState
->
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
memset
(
&
pState
->
pTdbState
->
txn
,
0
,
sizeof
(
TXN
));
if
(
tdbTxnOpen
(
&
pState
->
pTdbState
->
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
return
-
1
;
}
if
(
tdbBegin
(
pState
->
db
,
&
p
State
->
txn
)
<
0
)
{
if
(
tdbBegin
(
pState
->
pTdbState
->
db
,
&
pState
->
pTdb
State
->
txn
)
<
0
)
{
return
-
1
;
}
return
0
;
}
int32_t
streamStateFuncPut
(
SStreamState
*
pState
,
const
STupleKey
*
key
,
const
void
*
value
,
int32_t
vLen
)
{
return
tdbTbUpsert
(
pState
->
p
FuncStateDb
,
key
,
sizeof
(
STupleKey
),
value
,
vLen
,
&
p
State
->
txn
);
return
tdbTbUpsert
(
pState
->
p
TdbState
->
pFuncStateDb
,
key
,
sizeof
(
STupleKey
),
value
,
vLen
,
&
pState
->
pTdb
State
->
txn
);
}
int32_t
streamStateFuncGet
(
SStreamState
*
pState
,
const
STupleKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
)
{
return
tdbTbGet
(
pState
->
pFuncStateDb
,
key
,
sizeof
(
STupleKey
),
pVal
,
pVLen
);
return
tdbTbGet
(
pState
->
p
TdbState
->
p
FuncStateDb
,
key
,
sizeof
(
STupleKey
),
pVal
,
pVLen
);
}
int32_t
streamStateFuncDel
(
SStreamState
*
pState
,
const
STupleKey
*
key
)
{
return
tdbTbDelete
(
pState
->
p
FuncStateDb
,
key
,
sizeof
(
STupleKey
),
&
p
State
->
txn
);
return
tdbTbDelete
(
pState
->
p
TdbState
->
pFuncStateDb
,
key
,
sizeof
(
STupleKey
),
&
pState
->
pTdb
State
->
txn
);
}
// todo refactor
int32_t
streamStatePut
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
const
void
*
value
,
int32_t
vLen
)
{
SStateKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
return
tdbTbUpsert
(
pState
->
p
StateDb
,
&
sKey
,
sizeof
(
SStateKey
),
value
,
vLen
,
&
p
State
->
txn
);
return
tdbTbUpsert
(
pState
->
p
TdbState
->
pStateDb
,
&
sKey
,
sizeof
(
SStateKey
),
value
,
vLen
,
&
pState
->
pTdb
State
->
txn
);
}
// todo refactor
int32_t
streamStateFillPut
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
const
void
*
value
,
int32_t
vLen
)
{
return
tdbTbUpsert
(
pState
->
p
FillStateDb
,
key
,
sizeof
(
SWinKey
),
value
,
vLen
,
&
p
State
->
txn
);
return
tdbTbUpsert
(
pState
->
p
TdbState
->
pFillStateDb
,
key
,
sizeof
(
SWinKey
),
value
,
vLen
,
&
pState
->
pTdb
State
->
txn
);
}
// todo refactor
int32_t
streamStateGet
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
)
{
SStateKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
return
tdbTbGet
(
pState
->
pStateDb
,
&
sKey
,
sizeof
(
SStateKey
),
pVal
,
pVLen
);
return
tdbTbGet
(
pState
->
p
TdbState
->
p
StateDb
,
&
sKey
,
sizeof
(
SStateKey
),
pVal
,
pVLen
);
}
// todo refactor
int32_t
streamStateFillGet
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
)
{
return
tdbTbGet
(
pState
->
pFillStateDb
,
key
,
sizeof
(
SWinKey
),
pVal
,
pVLen
);
return
tdbTbGet
(
pState
->
p
TdbState
->
p
FillStateDb
,
key
,
sizeof
(
SWinKey
),
pVal
,
pVLen
);
}
// todo refactor
int32_t
streamStateDel
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
SStateKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
return
tdbTbDelete
(
pState
->
p
StateDb
,
&
sKey
,
sizeof
(
SStateKey
),
&
p
State
->
txn
);
return
tdbTbDelete
(
pState
->
p
TdbState
->
pStateDb
,
&
sKey
,
sizeof
(
SStateKey
),
&
pState
->
pTdb
State
->
txn
);
}
int32_t
streamStateClear
(
SStreamState
*
pState
)
{
...
...
@@ -288,7 +297,7 @@ void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number
// todo refactor
int32_t
streamStateFillDel
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
return
tdbTbDelete
(
pState
->
p
FillStateDb
,
key
,
sizeof
(
SWinKey
),
&
p
State
->
txn
);
return
tdbTbDelete
(
pState
->
p
TdbState
->
pFillStateDb
,
key
,
sizeof
(
SWinKey
),
&
pState
->
pTdb
State
->
txn
);
}
int32_t
streamStateAddIfNotExist
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
)
{
...
...
@@ -314,7 +323,7 @@ int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pV
SStreamStateCur
*
streamStateGetCur
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
return
NULL
;
tdbTbcOpen
(
pState
->
pStateDb
,
&
pCur
->
pCur
,
NULL
);
tdbTbcOpen
(
pState
->
p
TdbState
->
p
StateDb
,
&
pCur
->
pCur
,
NULL
);
int32_t
c
=
0
;
SStateKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
...
...
@@ -330,7 +339,7 @@ SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
SStreamStateCur
*
streamStateFillGetCur
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
return
NULL
;
tdbTbcOpen
(
pState
->
pFillStateDb
,
&
pCur
->
pCur
,
NULL
);
tdbTbcOpen
(
pState
->
p
TdbState
->
p
FillStateDb
,
&
pCur
->
pCur
,
NULL
);
int32_t
c
=
0
;
tdbTbcMoveTo
(
pCur
->
pCur
,
key
,
sizeof
(
SWinKey
),
&
c
);
...
...
@@ -422,7 +431,7 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
return
NULL
;
}
pCur
->
number
=
pState
->
number
;
if
(
tdbTbcOpen
(
pState
->
pStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
if
(
tdbTbcOpen
(
pState
->
p
TdbState
->
p
StateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
streamStateFreeCur
(
pCur
);
return
NULL
;
}
...
...
@@ -448,7 +457,7 @@ SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey*
if
(
!
pCur
)
{
return
NULL
;
}
if
(
tdbTbcOpen
(
pState
->
pFillStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
if
(
tdbTbcOpen
(
pState
->
p
TdbState
->
p
FillStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
streamStateFreeCur
(
pCur
);
return
NULL
;
}
...
...
@@ -473,7 +482,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey*
if
(
pCur
==
NULL
)
{
return
NULL
;
}
if
(
tdbTbcOpen
(
pState
->
pFillStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
if
(
tdbTbcOpen
(
pState
->
p
TdbState
->
p
FillStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
streamStateFreeCur
(
pCur
);
return
NULL
;
}
...
...
@@ -520,7 +529,8 @@ void streamFreeVal(void* val) { tdbFree(val); }
int32_t
streamStateSessionPut
(
SStreamState
*
pState
,
const
SSessionKey
*
key
,
const
void
*
value
,
int32_t
vLen
)
{
SStateSessionKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
return
tdbTbUpsert
(
pState
->
pSessionStateDb
,
&
sKey
,
sizeof
(
SStateSessionKey
),
value
,
vLen
,
&
pState
->
txn
);
return
tdbTbUpsert
(
pState
->
pTdbState
->
pSessionStateDb
,
&
sKey
,
sizeof
(
SStateSessionKey
),
value
,
vLen
,
&
pState
->
pTdbState
->
txn
);
}
int32_t
streamStateSessionGet
(
SStreamState
*
pState
,
SSessionKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
)
{
...
...
@@ -543,7 +553,7 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa
int32_t
streamStateSessionDel
(
SStreamState
*
pState
,
const
SSessionKey
*
key
)
{
SStateSessionKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
return
tdbTbDelete
(
pState
->
p
SessionStateDb
,
&
sKey
,
sizeof
(
SStateSessionKey
),
&
p
State
->
txn
);
return
tdbTbDelete
(
pState
->
p
TdbState
->
pSessionStateDb
,
&
sKey
,
sizeof
(
SStateSessionKey
),
&
pState
->
pTdb
State
->
txn
);
}
SStreamStateCur
*
streamStateSessionSeekKeyCurrentPrev
(
SStreamState
*
pState
,
const
SSessionKey
*
key
)
{
...
...
@@ -552,7 +562,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons
return
NULL
;
}
pCur
->
number
=
pState
->
number
;
if
(
tdbTbcOpen
(
pState
->
pSessionStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
if
(
tdbTbcOpen
(
pState
->
p
TdbState
->
p
SessionStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
streamStateFreeCur
(
pCur
);
return
NULL
;
}
...
...
@@ -579,7 +589,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons
return
NULL
;
}
pCur
->
number
=
pState
->
number
;
if
(
tdbTbcOpen
(
pState
->
pSessionStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
if
(
tdbTbcOpen
(
pState
->
p
TdbState
->
p
SessionStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
streamStateFreeCur
(
pCur
);
return
NULL
;
}
...
...
@@ -607,7 +617,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess
return
NULL
;
}
pCur
->
number
=
pState
->
number
;
if
(
tdbTbcOpen
(
pState
->
pSessionStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
if
(
tdbTbcOpen
(
pState
->
p
TdbState
->
p
SessionStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
streamStateFreeCur
(
pCur
);
return
NULL
;
}
...
...
@@ -674,7 +684,7 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey*
return
-
1
;
}
pCur
->
number
=
pState
->
number
;
if
(
tdbTbcOpen
(
pState
->
pSessionStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
if
(
tdbTbcOpen
(
pState
->
p
TdbState
->
p
SessionStateDb
,
&
pCur
->
pCur
,
NULL
)
<
0
)
{
streamStateFreeCur
(
pCur
);
return
-
1
;
}
...
...
@@ -821,13 +831,19 @@ _end:
}
int32_t
streamStatePutParName
(
SStreamState
*
pState
,
int64_t
groupId
,
const
char
tbname
[
TSDB_TABLE_NAME_LEN
])
{
tdbTbUpsert
(
pState
->
pParNameDb
,
&
groupId
,
sizeof
(
int64_t
),
tbname
,
TSDB_TABLE_NAME_LEN
,
&
pState
->
txn
);
tdbTbUpsert
(
pState
->
pTdbState
->
pParNameDb
,
&
groupId
,
sizeof
(
int64_t
),
tbname
,
TSDB_TABLE_NAME_LEN
,
&
pState
->
pTdbState
->
txn
);
return
0
;
}
int32_t
streamStateGetParName
(
SStreamState
*
pState
,
int64_t
groupId
,
void
**
pVal
)
{
int32_t
len
;
return
tdbTbGet
(
pState
->
pParNameDb
,
&
groupId
,
sizeof
(
int64_t
),
pVal
,
&
len
);
return
tdbTbGet
(
pState
->
pTdbState
->
pParNameDb
,
&
groupId
,
sizeof
(
int64_t
),
pVal
,
&
len
);
}
void
streamStateDestroy
(
SStreamState
*
pState
)
{
taosMemoryFreeClear
(
pState
->
pTdbState
);
taosMemoryFreeClear
(
pState
);
}
#if 0
...
...
@@ -837,7 +853,7 @@ char* streamStateSessionDump(SStreamState* pState) {
return NULL;
}
pCur->number = pState->number;
if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) {
if (tdbTbcOpen(pState->p
TdbState->p
SessionStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录