Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d5c64f71
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
d5c64f71
编写于
12月 30, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
12月 30, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #19264 from taosdata/fix/TD-21216
enh: remove assert
上级
3a11c622
d011de2c
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
90 addition
and
85 deletion
+90
-85
source/libs/index/inc/indexUtil.h
source/libs/index/inc/indexUtil.h
+0
-1
source/libs/index/src/index.c
source/libs/index/src/index.c
+3
-1
source/libs/index/src/indexComm.c
source/libs/index/src/indexComm.c
+2
-3
source/libs/index/src/indexFilter.c
source/libs/index/src/indexFilter.c
+3
-1
source/libs/index/src/indexFst.c
source/libs/index/src/indexFst.c
+28
-32
source/libs/index/src/indexFstDfa.c
source/libs/index/src/indexFstDfa.c
+3
-2
source/libs/index/src/indexFstFile.c
source/libs/index/src/indexFstFile.c
+9
-5
source/libs/index/src/indexFstRegister.c
source/libs/index/src/indexFstRegister.c
+2
-2
source/libs/index/src/indexFstUtil.c
source/libs/index/src/indexFstUtil.c
+0
-1
source/libs/index/src/indexTfile.c
source/libs/index/src/indexTfile.c
+8
-8
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+3
-12
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+8
-5
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+5
-2
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+16
-10
未找到文件。
source/libs/index/inc/indexUtil.h
浏览文件 @
d5c64f71
...
@@ -36,7 +36,6 @@ extern "C" {
...
@@ -36,7 +36,6 @@ extern "C" {
#define SERIALIZE_VAR_TO_BUF(buf, var, type) \
#define SERIALIZE_VAR_TO_BUF(buf, var, type) \
do { \
do { \
type c = var; \
type c = var; \
assert(sizeof(type) == sizeof(c)); \
memcpy((void *)buf, (void *)&c, sizeof(c)); \
memcpy((void *)buf, (void *)&c, sizeof(c)); \
buf += sizeof(c); \
buf += sizeof(c); \
} while (0)
} while (0)
...
...
source/libs/index/src/index.c
浏览文件 @
d5c64f71
...
@@ -226,7 +226,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
...
@@ -226,7 +226,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
indexDebug
(
"w suid:%"
PRIu64
", colName:%s, colType:%d"
,
key
.
suid
,
key
.
colName
,
key
.
colType
);
indexDebug
(
"w suid:%"
PRIu64
", colName:%s, colType:%d"
,
key
.
suid
,
key
.
colName
,
key
.
colType
);
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
buf
,
sz
);
IndexCache
**
cache
=
taosHashGet
(
index
->
colObj
,
buf
,
sz
);
assert
(
*
cache
!=
NULL
);
ASSERTS
(
*
cache
!=
NULL
,
"index-cache already release"
);
if
(
*
cache
==
NULL
)
return
-
1
;
int
ret
=
idxCachePut
(
*
cache
,
p
,
uid
);
int
ret
=
idxCachePut
(
*
cache
,
p
,
uid
);
if
(
ret
!=
0
)
{
if
(
ret
!=
0
)
{
return
ret
;
return
ret
;
...
...
source/libs/index/src/indexComm.c
浏览文件 @
d5c64f71
...
@@ -170,7 +170,6 @@ TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t d
...
@@ -170,7 +170,6 @@ TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t d
}
}
return
tDoCompare
(
func
,
cmptype
,
&
va
,
&
vb
);
return
tDoCompare
(
func
,
cmptype
,
&
va
,
&
vb
);
}
}
assert
(
0
);
return
BREAK
;
return
BREAK
;
#endif
#endif
}
}
...
@@ -367,7 +366,7 @@ int32_t idxConvertData(void* src, int8_t type, void** dst) {
...
@@ -367,7 +366,7 @@ int32_t idxConvertData(void* src, int8_t type, void** dst) {
tlen
=
taosEncodeBinary
(
dst
,
src
,
strlen
(
src
));
tlen
=
taosEncodeBinary
(
dst
,
src
,
strlen
(
src
));
break
;
break
;
default:
default:
ASSERT
(
0
);
ASSERT
S
(
0
,
"index invalid input type"
);
break
;
break
;
}
}
*
dst
=
(
char
*
)
*
dst
-
tlen
;
*
dst
=
(
char
*
)
*
dst
-
tlen
;
...
@@ -459,7 +458,7 @@ int32_t idxConvertDataToStr(void* src, int8_t type, void** dst) {
...
@@ -459,7 +458,7 @@ int32_t idxConvertDataToStr(void* src, int8_t type, void** dst) {
*
dst
=
(
char
*
)
*
dst
-
tlen
;
*
dst
=
(
char
*
)
*
dst
-
tlen
;
break
;
break
;
default:
default:
ASSERT
(
0
);
ASSERT
S
(
0
,
"index invalid input type"
);
break
;
break
;
}
}
return
tlen
;
return
tlen
;
...
...
source/libs/index/src/indexFilter.c
浏览文件 @
d5c64f71
...
@@ -206,7 +206,9 @@ static FORCE_INLINE int32_t sifGetValueFromNode(SNode *node, char **value) {
...
@@ -206,7 +206,9 @@ static FORCE_INLINE int32_t sifGetValueFromNode(SNode *node, char **value) {
static
FORCE_INLINE
int32_t
sifInitJsonParam
(
SNode
*
node
,
SIFParam
*
param
,
SIFCtx
*
ctx
)
{
static
FORCE_INLINE
int32_t
sifInitJsonParam
(
SNode
*
node
,
SIFParam
*
param
,
SIFCtx
*
ctx
)
{
SOperatorNode
*
nd
=
(
SOperatorNode
*
)
node
;
SOperatorNode
*
nd
=
(
SOperatorNode
*
)
node
;
assert
(
nodeType
(
node
)
==
QUERY_NODE_OPERATOR
);
if
(
nodeType
(
node
)
!=
QUERY_NODE_OPERATOR
)
{
return
-
1
;
}
SColumnNode
*
l
=
(
SColumnNode
*
)
nd
->
pLeft
;
SColumnNode
*
l
=
(
SColumnNode
*
)
nd
->
pLeft
;
SValueNode
*
r
=
(
SValueNode
*
)
nd
->
pRight
;
SValueNode
*
r
=
(
SValueNode
*
)
nd
->
pRight
;
...
...
source/libs/index/src/indexFst.c
浏览文件 @
d5c64f71
...
@@ -65,10 +65,7 @@ void fstUnFinishedNodesPushEmpty(FstUnFinishedNodes* nodes, bool isFinal) {
...
@@ -65,10 +65,7 @@ void fstUnFinishedNodesPushEmpty(FstUnFinishedNodes* nodes, bool isFinal) {
taosArrayPush
(
nodes
->
stack
,
&
un
);
taosArrayPush
(
nodes
->
stack
,
&
un
);
}
}
FstBuilderNode
*
fstUnFinishedNodesPopRoot
(
FstUnFinishedNodes
*
nodes
)
{
FstBuilderNode
*
fstUnFinishedNodesPopRoot
(
FstUnFinishedNodes
*
nodes
)
{
assert
(
taosArrayGetSize
(
nodes
->
stack
)
==
1
);
FstBuilderNodeUnfinished
*
un
=
taosArrayPop
(
nodes
->
stack
);
FstBuilderNodeUnfinished
*
un
=
taosArrayPop
(
nodes
->
stack
);
assert
(
un
->
last
==
NULL
);
return
un
->
node
;
return
un
->
node
;
}
}
...
@@ -82,7 +79,6 @@ FstBuilderNode* fstUnFinishedNodesPopFreeze(FstUnFinishedNodes* nodes, CompiledA
...
@@ -82,7 +79,6 @@ FstBuilderNode* fstUnFinishedNodesPopFreeze(FstUnFinishedNodes* nodes, CompiledA
FstBuilderNode
*
fstUnFinishedNodesPopEmpty
(
FstUnFinishedNodes
*
nodes
)
{
FstBuilderNode
*
fstUnFinishedNodesPopEmpty
(
FstUnFinishedNodes
*
nodes
)
{
FstBuilderNodeUnfinished
*
un
=
taosArrayPop
(
nodes
->
stack
);
FstBuilderNodeUnfinished
*
un
=
taosArrayPop
(
nodes
->
stack
);
assert
(
un
->
last
==
NULL
);
return
un
->
node
;
return
un
->
node
;
}
}
void
fstUnFinishedNodesSetRootOutput
(
FstUnFinishedNodes
*
nodes
,
Output
out
)
{
void
fstUnFinishedNodesSetRootOutput
(
FstUnFinishedNodes
*
nodes
,
Output
out
)
{
...
@@ -102,7 +98,8 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output
...
@@ -102,7 +98,8 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output
}
}
int32_t
sz
=
taosArrayGetSize
(
nodes
->
stack
)
-
1
;
int32_t
sz
=
taosArrayGetSize
(
nodes
->
stack
)
-
1
;
FstBuilderNodeUnfinished
*
un
=
taosArrayGet
(
nodes
->
stack
,
sz
);
FstBuilderNodeUnfinished
*
un
=
taosArrayGet
(
nodes
->
stack
,
sz
);
assert
(
un
->
last
==
NULL
);
ASSERTS
(
un
->
last
==
NULL
,
"index-fst meet unexpected node"
);
if
(
un
->
last
!=
NULL
)
return
;
// FstLastTransition *trn = taosMemoryMalloc(sizeof(FstLastTransition));
// FstLastTransition *trn = taosMemoryMalloc(sizeof(FstLastTransition));
// trn->inp = s->data[s->start];
// trn->inp = s->data[s->start];
...
@@ -247,7 +244,6 @@ void fstStateCompileForOneTrans(IdxFstFile* w, CompiledAddr addr, FstTransition*
...
@@ -247,7 +244,6 @@ void fstStateCompileForOneTrans(IdxFstFile* w, CompiledAddr addr, FstTransition*
}
}
void
fstStateCompileForAnyTrans
(
IdxFstFile
*
w
,
CompiledAddr
addr
,
FstBuilderNode
*
node
)
{
void
fstStateCompileForAnyTrans
(
IdxFstFile
*
w
,
CompiledAddr
addr
,
FstBuilderNode
*
node
)
{
int32_t
sz
=
taosArrayGetSize
(
node
->
trans
);
int32_t
sz
=
taosArrayGetSize
(
node
->
trans
);
assert
(
sz
<=
256
);
uint8_t
tSize
=
0
;
uint8_t
tSize
=
0
;
uint8_t
oSize
=
packSize
(
node
->
finalOutput
);
uint8_t
oSize
=
packSize
(
node
->
finalOutput
);
...
@@ -322,7 +318,7 @@ void fstStateCompileForAnyTrans(IdxFstFile* w, CompiledAddr addr, FstBuilderNode
...
@@ -322,7 +318,7 @@ void fstStateCompileForAnyTrans(IdxFstFile* w, CompiledAddr addr, FstBuilderNode
// set_comm_input
// set_comm_input
void
fstStateSetCommInput
(
FstState
*
s
,
uint8_t
inp
)
{
void
fstStateSetCommInput
(
FstState
*
s
,
uint8_t
inp
)
{
assert
(
s
->
state
==
OneTransNext
||
s
->
state
==
OneTrans
);
ASSERT
(
s
->
state
==
OneTransNext
||
s
->
state
==
OneTrans
);
uint8_t
val
;
uint8_t
val
;
COMMON_INDEX
(
inp
,
0
b111111
,
val
);
COMMON_INDEX
(
inp
,
0
b111111
,
val
);
...
@@ -331,7 +327,7 @@ void fstStateSetCommInput(FstState* s, uint8_t inp) {
...
@@ -331,7 +327,7 @@ void fstStateSetCommInput(FstState* s, uint8_t inp) {
// comm_input
// comm_input
uint8_t
fstStateCommInput
(
FstState
*
s
,
bool
*
null
)
{
uint8_t
fstStateCommInput
(
FstState
*
s
,
bool
*
null
)
{
assert
(
s
->
state
==
OneTransNext
||
s
->
state
==
OneTrans
);
ASSERT
(
s
->
state
==
OneTransNext
||
s
->
state
==
OneTrans
);
uint8_t
v
=
s
->
val
&
0
b00111111
;
uint8_t
v
=
s
->
val
&
0
b00111111
;
if
(
v
==
0
)
{
if
(
v
==
0
)
{
*
null
=
true
;
*
null
=
true
;
...
@@ -344,7 +340,7 @@ uint8_t fstStateCommInput(FstState* s, bool* null) {
...
@@ -344,7 +340,7 @@ uint8_t fstStateCommInput(FstState* s, bool* null) {
// input_len
// input_len
uint64_t
fstStateInputLen
(
FstState
*
s
)
{
uint64_t
fstStateInputLen
(
FstState
*
s
)
{
assert
(
s
->
state
==
OneTransNext
||
s
->
state
==
OneTrans
);
ASSERT
(
s
->
state
==
OneTransNext
||
s
->
state
==
OneTrans
);
bool
null
=
false
;
bool
null
=
false
;
fstStateCommInput
(
s
,
&
null
);
fstStateCommInput
(
s
,
&
null
);
return
null
?
1
:
0
;
return
null
?
1
:
0
;
...
@@ -352,11 +348,11 @@ uint64_t fstStateInputLen(FstState* s) {
...
@@ -352,11 +348,11 @@ uint64_t fstStateInputLen(FstState* s) {
// end_addr
// end_addr
uint64_t
fstStateEndAddrForOneTransNext
(
FstState
*
s
,
FstSlice
*
data
)
{
uint64_t
fstStateEndAddrForOneTransNext
(
FstState
*
s
,
FstSlice
*
data
)
{
assert
(
s
->
state
==
OneTransNext
);
ASSERT
(
s
->
state
==
OneTransNext
);
return
FST_SLICE_LEN
(
data
)
-
1
-
fstStateInputLen
(
s
);
return
FST_SLICE_LEN
(
data
)
-
1
-
fstStateInputLen
(
s
);
}
}
uint64_t
fstStateEndAddrForOneTrans
(
FstState
*
s
,
FstSlice
*
data
,
PackSizes
sizes
)
{
uint64_t
fstStateEndAddrForOneTrans
(
FstState
*
s
,
FstSlice
*
data
,
PackSizes
sizes
)
{
assert
(
s
->
state
==
OneTrans
);
ASSERT
(
s
->
state
==
OneTrans
);
return
FST_SLICE_LEN
(
data
)
-
1
-
fstStateInputLen
(
s
)
-
1
// pack size
return
FST_SLICE_LEN
(
data
)
-
1
-
fstStateInputLen
(
s
)
-
1
// pack size
-
FST_GET_TRANSITION_PACK_SIZE
(
sizes
)
-
FST_GET_OUTPUT_PACK_SIZE
(
sizes
);
-
FST_GET_TRANSITION_PACK_SIZE
(
sizes
)
-
FST_GET_OUTPUT_PACK_SIZE
(
sizes
);
}
}
...
@@ -370,7 +366,7 @@ uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice*
...
@@ -370,7 +366,7 @@ uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice*
}
}
// input
// input
uint8_t
fstStateInput
(
FstState
*
s
,
FstNode
*
node
)
{
uint8_t
fstStateInput
(
FstState
*
s
,
FstNode
*
node
)
{
assert
(
s
->
state
==
OneTransNext
||
s
->
state
==
OneTrans
);
ASSERT
(
s
->
state
==
OneTransNext
||
s
->
state
==
OneTrans
);
FstSlice
*
slice
=
&
node
->
data
;
FstSlice
*
slice
=
&
node
->
data
;
bool
null
=
false
;
bool
null
=
false
;
uint8_t
inp
=
fstStateCommInput
(
s
,
&
null
);
uint8_t
inp
=
fstStateCommInput
(
s
,
&
null
);
...
@@ -378,7 +374,7 @@ uint8_t fstStateInput(FstState* s, FstNode* node) {
...
@@ -378,7 +374,7 @@ uint8_t fstStateInput(FstState* s, FstNode* node) {
return
null
==
false
?
inp
:
data
[
node
->
start
-
1
];
return
null
==
false
?
inp
:
data
[
node
->
start
-
1
];
}
}
uint8_t
fstStateInputForAnyTrans
(
FstState
*
s
,
FstNode
*
node
,
uint64_t
i
)
{
uint8_t
fstStateInputForAnyTrans
(
FstState
*
s
,
FstNode
*
node
,
uint64_t
i
)
{
assert
(
s
->
state
==
AnyTrans
);
ASSERT
(
s
->
state
==
AnyTrans
);
FstSlice
*
slice
=
&
node
->
data
;
FstSlice
*
slice
=
&
node
->
data
;
uint64_t
at
=
node
->
start
-
fstStateNtransLen
(
s
)
-
1
// pack size
uint64_t
at
=
node
->
start
-
fstStateNtransLen
(
s
)
-
1
// pack size
...
@@ -390,7 +386,7 @@ uint8_t fstStateInputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
...
@@ -390,7 +386,7 @@ uint8_t fstStateInputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
// trans_addr
// trans_addr
CompiledAddr
fstStateTransAddr
(
FstState
*
s
,
FstNode
*
node
)
{
CompiledAddr
fstStateTransAddr
(
FstState
*
s
,
FstNode
*
node
)
{
assert
(
s
->
state
==
OneTransNext
||
s
->
state
==
OneTrans
);
ASSERT
(
s
->
state
==
OneTransNext
||
s
->
state
==
OneTrans
);
FstSlice
*
slice
=
&
node
->
data
;
FstSlice
*
slice
=
&
node
->
data
;
if
(
s
->
state
==
OneTransNext
)
{
if
(
s
->
state
==
OneTransNext
)
{
return
(
CompiledAddr
)(
node
->
end
)
-
1
;
return
(
CompiledAddr
)(
node
->
end
)
-
1
;
...
@@ -406,7 +402,7 @@ CompiledAddr fstStateTransAddr(FstState* s, FstNode* node) {
...
@@ -406,7 +402,7 @@ CompiledAddr fstStateTransAddr(FstState* s, FstNode* node) {
}
}
}
}
CompiledAddr
fstStateTransAddrForAnyTrans
(
FstState
*
s
,
FstNode
*
node
,
uint64_t
i
)
{
CompiledAddr
fstStateTransAddrForAnyTrans
(
FstState
*
s
,
FstNode
*
node
,
uint64_t
i
)
{
assert
(
s
->
state
==
AnyTrans
);
ASSERT
(
s
->
state
==
AnyTrans
);
FstSlice
*
slice
=
&
node
->
data
;
FstSlice
*
slice
=
&
node
->
data
;
uint8_t
tSizes
=
FST_GET_TRANSITION_PACK_SIZE
(
node
->
sizes
);
uint8_t
tSizes
=
FST_GET_TRANSITION_PACK_SIZE
(
node
->
sizes
);
...
@@ -418,7 +414,7 @@ CompiledAddr fstStateTransAddrForAnyTrans(FstState* s, FstNode* node, uint64_t i
...
@@ -418,7 +414,7 @@ CompiledAddr fstStateTransAddrForAnyTrans(FstState* s, FstNode* node, uint64_t i
// sizes
// sizes
PackSizes
fstStateSizes
(
FstState
*
s
,
FstSlice
*
slice
)
{
PackSizes
fstStateSizes
(
FstState
*
s
,
FstSlice
*
slice
)
{
assert
(
s
->
state
==
OneTrans
||
s
->
state
==
AnyTrans
);
ASSERT
(
s
->
state
==
OneTrans
||
s
->
state
==
AnyTrans
);
uint64_t
i
;
uint64_t
i
;
if
(
s
->
state
==
OneTrans
)
{
if
(
s
->
state
==
OneTrans
)
{
i
=
FST_SLICE_LEN
(
slice
)
-
1
-
fstStateInputLen
(
s
)
-
1
;
i
=
FST_SLICE_LEN
(
slice
)
-
1
-
fstStateInputLen
(
s
)
-
1
;
...
@@ -431,7 +427,7 @@ PackSizes fstStateSizes(FstState* s, FstSlice* slice) {
...
@@ -431,7 +427,7 @@ PackSizes fstStateSizes(FstState* s, FstSlice* slice) {
}
}
// Output
// Output
Output
fstStateOutput
(
FstState
*
s
,
FstNode
*
node
)
{
Output
fstStateOutput
(
FstState
*
s
,
FstNode
*
node
)
{
assert
(
s
->
state
==
OneTrans
);
ASSERT
(
s
->
state
==
OneTrans
);
uint8_t
oSizes
=
FST_GET_OUTPUT_PACK_SIZE
(
node
->
sizes
);
uint8_t
oSizes
=
FST_GET_OUTPUT_PACK_SIZE
(
node
->
sizes
);
if
(
oSizes
==
0
)
{
if
(
oSizes
==
0
)
{
...
@@ -445,7 +441,7 @@ Output fstStateOutput(FstState* s, FstNode* node) {
...
@@ -445,7 +441,7 @@ Output fstStateOutput(FstState* s, FstNode* node) {
return
unpackUint64
(
data
+
i
,
oSizes
);
return
unpackUint64
(
data
+
i
,
oSizes
);
}
}
Output
fstStateOutputForAnyTrans
(
FstState
*
s
,
FstNode
*
node
,
uint64_t
i
)
{
Output
fstStateOutputForAnyTrans
(
FstState
*
s
,
FstNode
*
node
,
uint64_t
i
)
{
assert
(
s
->
state
==
AnyTrans
);
ASSERT
(
s
->
state
==
AnyTrans
);
uint8_t
oSizes
=
FST_GET_OUTPUT_PACK_SIZE
(
node
->
sizes
);
uint8_t
oSizes
=
FST_GET_OUTPUT_PACK_SIZE
(
node
->
sizes
);
if
(
oSizes
==
0
)
{
if
(
oSizes
==
0
)
{
...
@@ -462,19 +458,19 @@ Output fstStateOutputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
...
@@ -462,19 +458,19 @@ Output fstStateOutputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
// anyTrans specify function
// anyTrans specify function
void
fstStateSetFinalState
(
FstState
*
s
,
bool
yes
)
{
void
fstStateSetFinalState
(
FstState
*
s
,
bool
yes
)
{
assert
(
s
->
state
==
AnyTrans
);
ASSERT
(
s
->
state
==
AnyTrans
);
if
(
yes
)
{
if
(
yes
)
{
s
->
val
|=
0
b01000000
;
s
->
val
|=
0
b01000000
;
}
}
return
;
return
;
}
}
bool
fstStateIsFinalState
(
FstState
*
s
)
{
bool
fstStateIsFinalState
(
FstState
*
s
)
{
assert
(
s
->
state
==
AnyTrans
);
ASSERT
(
s
->
state
==
AnyTrans
);
return
(
s
->
val
&
0
b01000000
)
==
0
b01000000
;
return
(
s
->
val
&
0
b01000000
)
==
0
b01000000
;
}
}
void
fstStateSetStateNtrans
(
FstState
*
s
,
uint8_t
n
)
{
void
fstStateSetStateNtrans
(
FstState
*
s
,
uint8_t
n
)
{
assert
(
s
->
state
==
AnyTrans
);
ASSERT
(
s
->
state
==
AnyTrans
);
if
(
n
<=
0
b00111111
)
{
if
(
n
<=
0
b00111111
)
{
s
->
val
=
(
s
->
val
&
0
b11000000
)
|
n
;
s
->
val
=
(
s
->
val
&
0
b11000000
)
|
n
;
}
}
...
@@ -482,7 +478,7 @@ void fstStateSetStateNtrans(FstState* s, uint8_t n) {
...
@@ -482,7 +478,7 @@ void fstStateSetStateNtrans(FstState* s, uint8_t n) {
}
}
// state_ntrans
// state_ntrans
uint8_t
fstStateStateNtrans
(
FstState
*
s
,
bool
*
null
)
{
uint8_t
fstStateStateNtrans
(
FstState
*
s
,
bool
*
null
)
{
assert
(
s
->
state
==
AnyTrans
);
ASSERT
(
s
->
state
==
AnyTrans
);
*
null
=
false
;
*
null
=
false
;
uint8_t
n
=
s
->
val
&
0
b00111111
;
uint8_t
n
=
s
->
val
&
0
b00111111
;
...
@@ -492,16 +488,16 @@ uint8_t fstStateStateNtrans(FstState* s, bool* null) {
...
@@ -492,16 +488,16 @@ uint8_t fstStateStateNtrans(FstState* s, bool* null) {
return
n
;
return
n
;
}
}
uint64_t
fstStateTotalTransSize
(
FstState
*
s
,
uint64_t
version
,
PackSizes
sizes
,
uint64_t
nTrans
)
{
uint64_t
fstStateTotalTransSize
(
FstState
*
s
,
uint64_t
version
,
PackSizes
sizes
,
uint64_t
nTrans
)
{
assert
(
s
->
state
==
AnyTrans
);
ASSERT
(
s
->
state
==
AnyTrans
);
uint64_t
idxSize
=
fstStateTransIndexSize
(
s
,
version
,
nTrans
);
uint64_t
idxSize
=
fstStateTransIndexSize
(
s
,
version
,
nTrans
);
return
nTrans
+
(
nTrans
*
FST_GET_TRANSITION_PACK_SIZE
(
sizes
))
+
idxSize
;
return
nTrans
+
(
nTrans
*
FST_GET_TRANSITION_PACK_SIZE
(
sizes
))
+
idxSize
;
}
}
uint64_t
fstStateTransIndexSize
(
FstState
*
s
,
uint64_t
version
,
uint64_t
nTrans
)
{
uint64_t
fstStateTransIndexSize
(
FstState
*
s
,
uint64_t
version
,
uint64_t
nTrans
)
{
assert
(
s
->
state
==
AnyTrans
);
ASSERT
(
s
->
state
==
AnyTrans
);
return
(
version
>=
2
&&
nTrans
>
TRANS_INDEX_THRESHOLD
)
?
256
:
0
;
return
(
version
>=
2
&&
nTrans
>
TRANS_INDEX_THRESHOLD
)
?
256
:
0
;
}
}
uint64_t
fstStateNtransLen
(
FstState
*
s
)
{
uint64_t
fstStateNtransLen
(
FstState
*
s
)
{
assert
(
s
->
state
==
AnyTrans
);
ASSERT
(
s
->
state
==
AnyTrans
);
bool
null
=
false
;
bool
null
=
false
;
fstStateStateNtrans
(
s
,
&
null
);
fstStateStateNtrans
(
s
,
&
null
);
return
null
==
true
?
1
:
0
;
return
null
==
true
?
1
:
0
;
...
@@ -530,7 +526,7 @@ Output fstStateFinalOutput(FstState* s, uint64_t version, FstSlice* slice, PackS
...
@@ -530,7 +526,7 @@ Output fstStateFinalOutput(FstState* s, uint64_t version, FstSlice* slice, PackS
return
unpackUint64
(
data
+
at
,
(
uint8_t
)
oSizes
);
return
unpackUint64
(
data
+
at
,
(
uint8_t
)
oSizes
);
}
}
uint64_t
fstStateFindInput
(
FstState
*
s
,
FstNode
*
node
,
uint8_t
b
,
bool
*
null
)
{
uint64_t
fstStateFindInput
(
FstState
*
s
,
FstNode
*
node
,
uint8_t
b
,
bool
*
null
)
{
assert
(
s
->
state
==
AnyTrans
);
ASSERT
(
s
->
state
==
AnyTrans
);
FstSlice
*
slice
=
&
node
->
data
;
FstSlice
*
slice
=
&
node
->
data
;
if
(
node
->
version
>=
2
&&
node
->
nTrans
>
TRANS_INDEX_THRESHOLD
)
{
if
(
node
->
version
>=
2
&&
node
->
nTrans
>
TRANS_INDEX_THRESHOLD
)
{
uint64_t
at
=
node
->
start
-
fstStateNtransLen
(
s
)
-
1
// pack size
uint64_t
at
=
node
->
start
-
fstStateNtransLen
(
s
)
-
1
// pack size
...
@@ -676,17 +672,17 @@ bool fstNodeGetTransitionAddrAt(FstNode* node, uint64_t i, CompiledAddr* res) {
...
@@ -676,17 +672,17 @@ bool fstNodeGetTransitionAddrAt(FstNode* node, uint64_t i, CompiledAddr* res) {
bool
s
=
true
;
bool
s
=
true
;
FstState
*
st
=
&
node
->
state
;
FstState
*
st
=
&
node
->
state
;
if
(
st
->
state
==
OneTransNext
)
{
if
(
st
->
state
==
OneTransNext
)
{
assert
(
i
==
0
);
ASSERT
(
i
==
0
);
fstStateTransAddr
(
st
,
node
);
fstStateTransAddr
(
st
,
node
);
}
else
if
(
st
->
state
==
OneTrans
)
{
}
else
if
(
st
->
state
==
OneTrans
)
{
assert
(
i
==
0
);
ASSERT
(
i
==
0
);
fstStateTransAddr
(
st
,
node
);
fstStateTransAddr
(
st
,
node
);
}
else
if
(
st
->
state
==
AnyTrans
)
{
}
else
if
(
st
->
state
==
AnyTrans
)
{
fstStateTransAddrForAnyTrans
(
st
,
node
,
i
);
fstStateTransAddrForAnyTrans
(
st
,
node
,
i
);
}
else
if
(
FST_STATE_EMPTY_FINAL
(
node
))
{
}
else
if
(
FST_STATE_EMPTY_FINAL
(
node
))
{
s
=
false
;
s
=
false
;
}
else
{
}
else
{
assert
(
0
);
ASSERT
(
0
);
}
}
return
s
;
return
s
;
}
}
...
@@ -722,7 +718,7 @@ bool fstNodeFindInput(FstNode* node, uint8_t b, uint64_t* res) {
...
@@ -722,7 +718,7 @@ bool fstNodeFindInput(FstNode* node, uint8_t b, uint64_t* res) {
bool
fstNodeCompile
(
FstNode
*
node
,
void
*
w
,
CompiledAddr
lastAddr
,
CompiledAddr
addr
,
FstBuilderNode
*
builderNode
)
{
bool
fstNodeCompile
(
FstNode
*
node
,
void
*
w
,
CompiledAddr
lastAddr
,
CompiledAddr
addr
,
FstBuilderNode
*
builderNode
)
{
int32_t
sz
=
taosArrayGetSize
(
builderNode
->
trans
);
int32_t
sz
=
taosArrayGetSize
(
builderNode
->
trans
);
assert
(
sz
<
256
);
ASSERT
(
sz
<
256
);
if
(
sz
==
0
&&
builderNode
->
isFinal
&&
builderNode
->
finalOutput
==
0
)
{
if
(
sz
==
0
&&
builderNode
->
isFinal
&&
builderNode
->
finalOutput
==
0
)
{
return
true
;
return
true
;
}
else
if
(
sz
!=
1
||
builderNode
->
isFinal
)
{
}
else
if
(
sz
!=
1
||
builderNode
->
isFinal
)
{
...
@@ -804,7 +800,7 @@ void fstBuilderInsertOutput(FstBuilder* b, FstSlice bs, Output in) {
...
@@ -804,7 +800,7 @@ void fstBuilderInsertOutput(FstBuilder* b, FstSlice bs, Output in) {
uint64_t
prefixLen
=
fstUnFinishedNodesFindCommPrefixAndSetOutput
(
b
->
unfinished
,
bs
,
in
,
&
out
);
uint64_t
prefixLen
=
fstUnFinishedNodesFindCommPrefixAndSetOutput
(
b
->
unfinished
,
bs
,
in
,
&
out
);
if
(
prefixLen
==
FST_SLICE_LEN
(
s
))
{
if
(
prefixLen
==
FST_SLICE_LEN
(
s
))
{
assert
(
out
==
0
);
ASSERT
(
out
==
0
);
return
;
return
;
}
}
...
@@ -848,7 +844,7 @@ void fstBuilderCompileFrom(FstBuilder* b, uint64_t istate) {
...
@@ -848,7 +844,7 @@ void fstBuilderCompileFrom(FstBuilder* b, uint64_t istate) {
addr
=
fstBuilderCompile
(
b
,
bn
);
addr
=
fstBuilderCompile
(
b
,
bn
);
fstBuilderNodeDestroy
(
bn
);
fstBuilderNodeDestroy
(
bn
);
assert
(
addr
!=
NONE_ADDRESS
);
ASSERT
(
addr
!=
NONE_ADDRESS
);
}
}
fstUnFinishedNodesTopLastFreeze
(
b
->
unfinished
,
addr
);
fstUnFinishedNodesTopLastFreeze
(
b
->
unfinished
,
addr
);
return
;
return
;
...
...
source/libs/index/src/indexFstDfa.c
浏览文件 @
d5c64f71
...
@@ -104,8 +104,9 @@ bool dfaBuilderRunState(FstDfaBuilder *builder, FstSparseSet *cur, FstSparseSet
...
@@ -104,8 +104,9 @@ bool dfaBuilderRunState(FstDfaBuilder *builder, FstSparseSet *cur, FstSparseSet
DfaState
*
t
=
taosArrayGet
(
builder
->
dfa
->
states
,
state
);
DfaState
*
t
=
taosArrayGet
(
builder
->
dfa
->
states
,
state
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
t
->
insts
);
i
++
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
t
->
insts
);
i
++
)
{
int32_t
ip
=
*
(
int32_t
*
)
taosArrayGet
(
t
->
insts
,
i
);
int32_t
ip
=
*
(
int32_t
*
)
taosArrayGet
(
t
->
insts
,
i
);
bool
succ
=
sparSetAdd
(
cur
,
ip
,
NULL
);
assert
(
succ
==
true
);
bool
succ
=
sparSetAdd
(
cur
,
ip
,
NULL
);
if
(
succ
==
false
)
return
false
;
}
}
dfaRun
(
builder
->
dfa
,
cur
,
next
,
byte
);
dfaRun
(
builder
->
dfa
,
cur
,
next
,
byte
);
...
...
source/libs/index/src/indexFstFile.c
浏览文件 @
d5c64f71
...
@@ -100,7 +100,7 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
...
@@ -100,7 +100,7 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
do
{
do
{
char
key
[
1024
]
=
{
0
};
char
key
[
1024
]
=
{
0
};
assert
(
strlen
(
ctx
->
file
.
buf
)
+
1
+
64
<
sizeof
(
key
));
ASSERT
(
strlen
(
ctx
->
file
.
buf
)
+
1
+
64
<
sizeof
(
key
));
idxGenLRUKey
(
key
,
ctx
->
file
.
buf
,
blkId
);
idxGenLRUKey
(
key
,
ctx
->
file
.
buf
,
blkId
);
LRUHandle
*
h
=
taosLRUCacheLookup
(
ctx
->
lru
,
key
,
strlen
(
key
));
LRUHandle
*
h
=
taosLRUCacheLookup
(
ctx
->
lru
,
key
,
strlen
(
key
));
...
@@ -114,7 +114,8 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
...
@@ -114,7 +114,8 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
if
(
left
<
kBlockSize
)
{
if
(
left
<
kBlockSize
)
{
nread
=
TMIN
(
left
,
len
);
nread
=
TMIN
(
left
,
len
);
int32_t
bytes
=
taosPReadFile
(
ctx
->
file
.
pFile
,
buf
+
total
,
nread
,
offset
);
int32_t
bytes
=
taosPReadFile
(
ctx
->
file
.
pFile
,
buf
+
total
,
nread
,
offset
);
assert
(
bytes
==
nread
);
ASSERTS
(
bytes
==
nread
,
"index read incomplete data"
);
if
(
bytes
!=
nread
)
break
;
total
+=
bytes
;
total
+=
bytes
;
return
total
;
return
total
;
...
@@ -124,7 +125,8 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
...
@@ -124,7 +125,8 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
SDataBlock
*
blk
=
taosMemoryCalloc
(
1
,
cacheMemSize
);
SDataBlock
*
blk
=
taosMemoryCalloc
(
1
,
cacheMemSize
);
blk
->
blockId
=
blkId
;
blk
->
blockId
=
blkId
;
blk
->
nread
=
taosPReadFile
(
ctx
->
file
.
pFile
,
blk
->
buf
,
kBlockSize
,
blkId
*
kBlockSize
);
blk
->
nread
=
taosPReadFile
(
ctx
->
file
.
pFile
,
blk
->
buf
,
kBlockSize
,
blkId
*
kBlockSize
);
assert
(
blk
->
nread
<=
kBlockSize
);
ASSERTS
(
blk
->
nread
<=
kBlockSize
,
"index read incomplete data"
);
if
(
blk
->
nread
>
kBlockSize
)
break
;
if
(
blk
->
nread
<
kBlockSize
&&
blk
->
nread
<
len
)
{
if
(
blk
->
nread
<
kBlockSize
&&
blk
->
nread
<
len
)
{
taosMemoryFree
(
blk
);
taosMemoryFree
(
blk
);
...
@@ -275,7 +277,10 @@ int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
...
@@ -275,7 +277,10 @@ int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
// update checksum
// update checksum
IFileCtx
*
ctx
=
write
->
wrt
;
IFileCtx
*
ctx
=
write
->
wrt
;
int
nWrite
=
ctx
->
write
(
ctx
,
buf
,
len
);
int
nWrite
=
ctx
->
write
(
ctx
,
buf
,
len
);
assert
(
nWrite
==
len
);
ASSERTS
(
nWrite
==
len
,
"index write incomplete data"
);
if
(
nWrite
!=
len
)
{
return
-
1
;
}
write
->
count
+=
len
;
write
->
count
+=
len
;
write
->
summer
=
taosCalcChecksum
(
write
->
summer
,
buf
,
len
);
write
->
summer
=
taosCalcChecksum
(
write
->
summer
,
buf
,
len
);
...
@@ -302,7 +307,6 @@ int idxFileFlush(IdxFstFile* write) {
...
@@ -302,7 +307,6 @@ int idxFileFlush(IdxFstFile* write) {
}
}
void
idxFilePackUintIn
(
IdxFstFile
*
writer
,
uint64_t
n
,
uint8_t
nBytes
)
{
void
idxFilePackUintIn
(
IdxFstFile
*
writer
,
uint64_t
n
,
uint8_t
nBytes
)
{
assert
(
1
<=
nBytes
&&
nBytes
<=
8
);
uint8_t
*
buf
=
taosMemoryCalloc
(
8
,
sizeof
(
uint8_t
));
uint8_t
*
buf
=
taosMemoryCalloc
(
8
,
sizeof
(
uint8_t
));
for
(
uint8_t
i
=
0
;
i
<
nBytes
;
i
++
)
{
for
(
uint8_t
i
=
0
;
i
<
nBytes
;
i
++
)
{
buf
[
i
]
=
(
uint8_t
)
n
;
buf
[
i
]
=
(
uint8_t
)
n
;
...
...
source/libs/index/src/indexFstRegister.c
浏览文件 @
d5c64f71
...
@@ -57,8 +57,8 @@ static void fstRegistryCellPromote(SArray* arr, uint32_t start, uint32_t end) {
...
@@ -57,8 +57,8 @@ static void fstRegistryCellPromote(SArray* arr, uint32_t start, uint32_t end) {
if
(
start
>=
sz
&&
end
>=
sz
)
{
if
(
start
>=
sz
&&
end
>=
sz
)
{
return
;
return
;
}
}
ASSERTS
(
start
>=
end
,
"index-fst start lower than end"
);
assert
(
start
>=
end
)
;
if
(
start
<
end
)
return
;
int32_t
s
=
(
int32_t
)
start
;
int32_t
s
=
(
int32_t
)
start
;
int32_t
e
=
(
int32_t
)
end
;
int32_t
e
=
(
int32_t
)
end
;
...
...
source/libs/index/src/indexFstUtil.c
浏览文件 @
d5c64f71
...
@@ -101,7 +101,6 @@ FstSlice fstSliceDeepCopy(FstSlice* s, int32_t start, int32_t end) {
...
@@ -101,7 +101,6 @@ FstSlice fstSliceDeepCopy(FstSlice* s, int32_t start, int32_t end) {
int32_t
slen
;
int32_t
slen
;
uint8_t
*
data
=
fstSliceData
(
s
,
&
slen
);
uint8_t
*
data
=
fstSliceData
(
s
,
&
slen
);
assert
(
tlen
<=
slen
);
uint8_t
*
buf
=
taosMemoryMalloc
(
sizeof
(
uint8_t
)
*
tlen
);
uint8_t
*
buf
=
taosMemoryMalloc
(
sizeof
(
uint8_t
)
*
tlen
);
memcpy
(
buf
,
data
+
start
,
tlen
);
memcpy
(
buf
,
data
+
start
,
tlen
);
...
...
source/libs/index/src/indexTfile.c
浏览文件 @
d5c64f71
...
@@ -122,7 +122,6 @@ TFileCache* tfileCacheCreate(SIndex* idx, const char* path) {
...
@@ -122,7 +122,6 @@ TFileCache* tfileCacheCreate(SIndex* idx, const char* path) {
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
int32_t
sz
=
idxSerialCacheKey
(
&
key
,
buf
);
int32_t
sz
=
idxSerialCacheKey
(
&
key
,
buf
);
assert
(
sz
<
sizeof
(
buf
));
taosHashPut
(
tcache
->
tableCache
,
buf
,
sz
,
&
reader
,
sizeof
(
void
*
));
taosHashPut
(
tcache
->
tableCache
,
buf
,
sz
,
&
reader
,
sizeof
(
void
*
));
tfileReaderRef
(
reader
);
tfileReaderRef
(
reader
);
}
}
...
@@ -151,9 +150,8 @@ void tfileCacheDestroy(TFileCache* tcache) {
...
@@ -151,9 +150,8 @@ void tfileCacheDestroy(TFileCache* tcache) {
}
}
TFileReader
*
tfileCacheGet
(
TFileCache
*
tcache
,
ICacheKey
*
key
)
{
TFileReader
*
tfileCacheGet
(
TFileCache
*
tcache
,
ICacheKey
*
key
)
{
char
buf
[
128
]
=
{
0
};
char
buf
[
128
]
=
{
0
};
int32_t
sz
=
idxSerialCacheKey
(
key
,
buf
);
int32_t
sz
=
idxSerialCacheKey
(
key
,
buf
);
assert
(
sz
<
sizeof
(
buf
));
TFileReader
**
reader
=
taosHashGet
(
tcache
->
tableCache
,
buf
,
sz
);
TFileReader
**
reader
=
taosHashGet
(
tcache
->
tableCache
,
buf
,
sz
);
if
(
reader
==
NULL
||
*
reader
==
NULL
)
{
if
(
reader
==
NULL
||
*
reader
==
NULL
)
{
return
NULL
;
return
NULL
;
...
@@ -877,7 +875,7 @@ static int tfileWriteFooter(TFileWriter* write) {
...
@@ -877,7 +875,7 @@ static int tfileWriteFooter(TFileWriter* write) {
int
nwrite
=
write
->
ctx
->
write
(
write
->
ctx
,
buf
,
(
int32_t
)
strlen
(
buf
));
int
nwrite
=
write
->
ctx
->
write
(
write
->
ctx
,
buf
,
(
int32_t
)
strlen
(
buf
));
indexInfo
(
"tfile write footer size: %d"
,
write
->
ctx
->
size
(
write
->
ctx
));
indexInfo
(
"tfile write footer size: %d"
,
write
->
ctx
->
size
(
write
->
ctx
));
assert
(
nwrite
==
sizeof
(
FILE_MAGIC_NUMBER
)
);
ASSERTS
(
nwrite
==
sizeof
(
FILE_MAGIC_NUMBER
),
"index write incomplete data"
);
return
nwrite
;
return
nwrite
;
}
}
static
int
tfileReaderLoadHeader
(
TFileReader
*
reader
)
{
static
int
tfileReaderLoadHeader
(
TFileReader
*
reader
)
{
...
@@ -892,7 +890,6 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
...
@@ -892,7 +890,6 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
}
else
{
}
else
{
indexInfo
(
"actual Read: %d, to read: %d, filename: %s"
,
(
int
)(
nread
),
(
int
)
sizeof
(
buf
),
reader
->
ctx
->
file
.
buf
);
indexInfo
(
"actual Read: %d, to read: %d, filename: %s"
,
(
int
)(
nread
),
(
int
)
sizeof
(
buf
),
reader
->
ctx
->
file
.
buf
);
}
}
// assert(nread == sizeof(buf));
memcpy
(
&
reader
->
header
,
buf
,
sizeof
(
buf
));
memcpy
(
&
reader
->
header
,
buf
,
sizeof
(
buf
));
return
0
;
return
0
;
...
@@ -914,7 +911,10 @@ static int tfileReaderLoadFst(TFileReader* reader) {
...
@@ -914,7 +911,10 @@ static int tfileReaderLoadFst(TFileReader* reader) {
indexInfo
(
"nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %d, time cost: %"
PRId64
"us"
,
nread
,
indexInfo
(
"nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %d, time cost: %"
PRId64
"us"
,
nread
,
reader
->
header
.
fstOffset
,
fstSize
,
ctx
->
file
.
buf
,
size
,
cost
);
reader
->
header
.
fstOffset
,
fstSize
,
ctx
->
file
.
buf
,
size
,
cost
);
// we assuse fst size less than FST_MAX_SIZE
// we assuse fst size less than FST_MAX_SIZE
assert
(
nread
>
0
&&
nread
<=
fstSize
);
ASSERTS
(
nread
>
0
&&
nread
<=
fstSize
,
"index read incomplete fst"
);
if
(
nread
<=
0
||
nread
>
fstSize
)
{
return
-
1
;
}
FstSlice
st
=
fstSliceCreate
((
uint8_t
*
)
buf
,
nread
);
FstSlice
st
=
fstSliceCreate
((
uint8_t
*
)
buf
,
nread
);
reader
->
fst
=
fstCreate
(
&
st
);
reader
->
fst
=
fstCreate
(
&
st
);
...
@@ -929,7 +929,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
...
@@ -929,7 +929,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
// add block cache
// add block cache
char
block
[
4096
]
=
{
0
};
char
block
[
4096
]
=
{
0
};
int32_t
nread
=
ctx
->
readFrom
(
ctx
,
block
,
sizeof
(
block
),
offset
);
int32_t
nread
=
ctx
->
readFrom
(
ctx
,
block
,
sizeof
(
block
),
offset
);
assert
(
nread
>=
sizeof
(
uint32_t
));
ASSERT
(
nread
>=
sizeof
(
uint32_t
));
char
*
p
=
block
;
char
*
p
=
block
;
int32_t
nid
=
*
(
int32_t
*
)
p
;
int32_t
nid
=
*
(
int32_t
*
)
p
;
...
...
source/libs/transport/src/trans.c
浏览文件 @
d5c64f71
...
@@ -160,21 +160,12 @@ int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
...
@@ -160,21 +160,12 @@ int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
int
rpcSendResponse
(
const
SRpcMsg
*
pMsg
)
{
return
transSendResponse
(
pMsg
);
}
int
rpcSendResponse
(
const
SRpcMsg
*
pMsg
)
{
return
transSendResponse
(
pMsg
);
}
void
rpcRefHandle
(
void
*
handle
,
int8_t
type
)
{
void
rpcRefHandle
(
void
*
handle
,
int8_t
type
)
{
(
*
taosRefHandle
[
type
])(
handle
);
}
assert
(
type
==
TAOS_CONN_SERVER
||
type
==
TAOS_CONN_CLIENT
);
(
*
taosRefHandle
[
type
])(
handle
);
}
void
rpcUnrefHandle
(
void
*
handle
,
int8_t
type
)
{
void
rpcUnrefHandle
(
void
*
handle
,
int8_t
type
)
{
(
*
taosUnRefHandle
[
type
])(
handle
);
}
assert
(
type
==
TAOS_CONN_SERVER
||
type
==
TAOS_CONN_CLIENT
);
(
*
taosUnRefHandle
[
type
])(
handle
);
}
int
rpcRegisterBrokenLinkArg
(
SRpcMsg
*
msg
)
{
return
transRegisterMsg
(
msg
);
}
int
rpcRegisterBrokenLinkArg
(
SRpcMsg
*
msg
)
{
return
transRegisterMsg
(
msg
);
}
int
rpcReleaseHandle
(
void
*
handle
,
int8_t
type
)
{
int
rpcReleaseHandle
(
void
*
handle
,
int8_t
type
)
{
return
(
*
transReleaseHandle
[
type
])(
handle
);
}
assert
(
type
==
TAOS_CONN_SERVER
||
type
==
TAOS_CONN_CLIENT
);
return
(
*
transReleaseHandle
[
type
])(
handle
);
}
int
rpcSetDefaultAddr
(
void
*
thandle
,
const
char
*
ip
,
const
char
*
fqdn
)
{
int
rpcSetDefaultAddr
(
void
*
thandle
,
const
char
*
ip
,
const
char
*
fqdn
)
{
// later
// later
...
...
source/libs/transport/src/transCli.c
浏览文件 @
d5c64f71
...
@@ -651,7 +651,6 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
...
@@ -651,7 +651,6 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
return
;
return
;
}
}
assert
(
nread
<=
0
);
if
(
nread
==
0
)
{
if
(
nread
==
0
)
{
// ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
// ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
// nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
// nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under
...
@@ -801,7 +800,11 @@ static void cliSendCb(uv_write_t* req, int status) {
...
@@ -801,7 +800,11 @@ static void cliSendCb(uv_write_t* req, int status) {
}
}
void
cliSend
(
SCliConn
*
pConn
)
{
void
cliSend
(
SCliConn
*
pConn
)
{
assert
(
!
transQueueEmpty
(
&
pConn
->
cliMsgs
));
bool
empty
=
transQueueEmpty
(
&
pConn
->
cliMsgs
);
ASSERTS
(
empty
==
false
,
"trans-cli get invalid msg"
);
if
(
empty
==
true
)
{
return
;
}
SCliMsg
*
pCliMsg
=
NULL
;
SCliMsg
*
pCliMsg
=
NULL
;
CONN_GET_NEXT_SENDMSG
(
pConn
);
CONN_GET_NEXT_SENDMSG
(
pConn
);
...
@@ -933,7 +936,6 @@ void cliConnCb(uv_connect_t* req, int status) {
...
@@ -933,7 +936,6 @@ void cliConnCb(uv_connect_t* req, int status) {
transSockInfo2Str
(
&
sockname
,
pConn
->
src
);
transSockInfo2Str
(
&
sockname
,
pConn
->
src
);
tTrace
(
"%s conn %p connect to server successfully"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
tTrace
(
"%s conn %p connect to server successfully"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
assert
(
pConn
->
stream
==
req
->
handle
);
cliSend
(
pConn
);
cliSend
(
pConn
);
}
}
...
@@ -1237,7 +1239,7 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
...
@@ -1237,7 +1239,7 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) {
for
(
int
i
=
0
;
ahandle
==
0
&&
i
<
transQueueSize
(
&
conn
->
cliMsgs
);
i
++
)
{
for
(
int
i
=
0
;
ahandle
==
0
&&
i
<
transQueueSize
(
&
conn
->
cliMsgs
);
i
++
)
{
SCliMsg
*
cliMsg
=
transQueueGet
(
&
conn
->
cliMsgs
,
i
);
SCliMsg
*
cliMsg
=
transQueueGet
(
&
conn
->
cliMsgs
,
i
);
if
(
cliMsg
->
type
==
Release
)
{
if
(
cliMsg
->
type
==
Release
)
{
assert
(
pMsg
==
NULL
);
ASSERTS
(
pMsg
==
NULL
,
"trans-cli recv invaid release-req"
);
return
true
;
return
true
;
}
}
}
}
...
@@ -1665,7 +1667,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
...
@@ -1665,7 +1667,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if
(
pCtx
->
retryCode
!=
TSDB_CODE_SUCCESS
)
{
if
(
pCtx
->
retryCode
!=
TSDB_CODE_SUCCESS
)
{
int32_t
code
=
pResp
->
code
;
int32_t
code
=
pResp
->
code
;
// return internal code app
// return internal code app
if
(
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
||
code
==
TSDB_CODE_RPC_BROKEN_LINK
||
code
==
TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
)
{
if
(
code
==
TSDB_CODE_RPC_NETWORK_UNAVAIL
||
code
==
TSDB_CODE_RPC_BROKEN_LINK
||
code
==
TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
)
{
pResp
->
code
=
pCtx
->
retryCode
;
pResp
->
code
=
pCtx
->
retryCode
;
}
}
}
}
...
...
source/libs/transport/src/transComm.c
浏览文件 @
d5c64f71
...
@@ -134,7 +134,9 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
...
@@ -134,7 +134,9 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
if
(
total
>=
HEADSIZE
&&
!
p
->
invalid
)
{
if
(
total
>=
HEADSIZE
&&
!
p
->
invalid
)
{
*
buf
=
taosMemoryCalloc
(
1
,
total
);
*
buf
=
taosMemoryCalloc
(
1
,
total
);
memcpy
(
*
buf
,
p
->
buf
,
total
);
memcpy
(
*
buf
,
p
->
buf
,
total
);
transResetBuffer
(
connBuf
);
if
(
transResetBuffer
(
connBuf
)
<
0
)
{
return
-
1
;
}
}
else
{
}
else
{
total
=
-
1
;
total
=
-
1
;
}
}
...
@@ -154,7 +156,8 @@ int transResetBuffer(SConnBuffer* connBuf) {
...
@@ -154,7 +156,8 @@ int transResetBuffer(SConnBuffer* connBuf) {
p
->
total
=
0
;
p
->
total
=
0
;
p
->
len
=
0
;
p
->
len
=
0
;
}
else
{
}
else
{
assert
(
0
);
ASSERTS
(
0
,
"invalid read from sock buf"
);
return
-
1
;
}
}
return
0
;
return
0
;
}
}
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
d5c64f71
...
@@ -267,7 +267,10 @@ static bool uvHandleReq(SSvrConn* pConn) {
...
@@ -267,7 +267,10 @@ static bool uvHandleReq(SSvrConn* pConn) {
tGTrace
(
"%s handle %p conn:%p translated to app, refId:%"
PRIu64
,
transLabel
(
pTransInst
),
transMsg
.
info
.
handle
,
pConn
,
tGTrace
(
"%s handle %p conn:%p translated to app, refId:%"
PRIu64
,
transLabel
(
pTransInst
),
transMsg
.
info
.
handle
,
pConn
,
pConn
->
refId
);
pConn
->
refId
);
assert
(
transMsg
.
info
.
handle
!=
NULL
);
ASSERTS
(
transMsg
.
info
.
handle
!=
NULL
,
"trans-svr failed to alloc handle to msg"
);
if
(
transMsg
.
info
.
handle
==
NULL
)
{
return
false
;
}
if
(
pHead
->
noResp
==
1
)
{
if
(
pHead
->
noResp
==
1
)
{
transMsg
.
info
.
refId
=
-
1
;
transMsg
.
info
.
refId
=
-
1
;
...
@@ -718,8 +721,8 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
...
@@ -718,8 +721,8 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
return
;
return
;
}
}
// free memory allocated by
// free memory allocated by
assert
(
nread
==
strlen
(
notify
)
);
ASSERTS
(
nread
==
strlen
(
notify
),
"trans-svr mem corrupted"
);
assert
(
buf
->
base
[
0
]
==
notify
[
0
]
);
ASSERTS
(
buf
->
base
[
0
]
==
notify
[
0
],
"trans-svr mem corrupted"
);
taosMemoryFree
(
buf
->
base
);
taosMemoryFree
(
buf
->
base
);
SWorkThrd
*
pThrd
=
q
->
data
;
SWorkThrd
*
pThrd
=
q
->
data
;
...
@@ -731,7 +734,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
...
@@ -731,7 +734,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
}
}
uv_handle_type
pending
=
uv_pipe_pending_type
(
pipe
);
uv_handle_type
pending
=
uv_pipe_pending_type
(
pipe
);
assert
(
pending
==
UV_TCP
);
SSvrConn
*
pConn
=
createConn
(
pThrd
);
SSvrConn
*
pConn
=
createConn
(
pThrd
);
...
@@ -971,19 +973,24 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) {
...
@@ -971,19 +973,24 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) {
uv_pipe_t
*
pipe
=
&
(
srv
->
pipe
[
srv
->
numOfWorkerReady
][
0
]);
uv_pipe_t
*
pipe
=
&
(
srv
->
pipe
[
srv
->
numOfWorkerReady
][
0
]);
int
ret
=
uv_pipe_init
(
srv
->
loop
,
pipe
,
1
);
int
ret
=
uv_pipe_init
(
srv
->
loop
,
pipe
,
1
);
assert
(
ret
==
0
);
ASSERTS
(
ret
==
0
,
"trans-svr failed to init pipe"
);
if
(
ret
!=
0
)
return
;
ret
=
uv_accept
((
uv_stream_t
*
)
&
srv
->
pipeListen
,
(
uv_stream_t
*
)
pipe
);
ret
=
uv_accept
((
uv_stream_t
*
)
&
srv
->
pipeListen
,
(
uv_stream_t
*
)
pipe
);
assert
(
ret
==
0
);
ASSERTS
(
ret
==
0
,
"trans-svr failed to accept pipe msg"
);
if
(
ret
!=
0
)
return
;
ret
=
uv_is_readable
((
uv_stream_t
*
)
pipe
);
ret
=
uv_is_readable
((
uv_stream_t
*
)
pipe
);
assert
(
ret
==
1
);
ASSERTS
(
ret
==
1
,
"trans-svr pipe status corrupted"
);
if
(
ret
!=
1
)
return
;
ret
=
uv_is_writable
((
uv_stream_t
*
)
pipe
);
ret
=
uv_is_writable
((
uv_stream_t
*
)
pipe
);
assert
(
ret
==
1
);
ASSERTS
(
ret
==
1
,
"trans-svr pipe status corrupted"
);
if
(
ret
!=
1
)
return
;
ret
=
uv_is_closing
((
uv_handle_t
*
)
pipe
);
ret
=
uv_is_closing
((
uv_handle_t
*
)
pipe
);
assert
(
ret
==
0
);
ASSERTS
(
ret
==
0
,
"trans-svr pipe status corrupted"
);
if
(
ret
!=
0
)
return
;
srv
->
numOfWorkerReady
++
;
srv
->
numOfWorkerReady
++
;
}
}
...
@@ -1272,7 +1279,6 @@ int transSendResponse(const STransMsg* msg) {
...
@@ -1272,7 +1279,6 @@ int transSendResponse(const STransMsg* msg) {
SExHandle
*
exh
=
msg
->
info
.
handle
;
SExHandle
*
exh
=
msg
->
info
.
handle
;
int64_t
refId
=
msg
->
info
.
refId
;
int64_t
refId
=
msg
->
info
.
refId
;
ASYNC_CHECK_HANDLE
(
exh
,
refId
);
ASYNC_CHECK_HANDLE
(
exh
,
refId
);
assert
(
refId
!=
0
);
STransMsg
tmsg
=
*
msg
;
STransMsg
tmsg
=
*
msg
;
tmsg
.
info
.
refId
=
refId
;
tmsg
.
info
.
refId
=
refId
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录