Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
68a1c12d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
68a1c12d
编写于
9月 06, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add compile opt
上级
954a1bab
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
78 addition
and
85 deletion
+78
-85
source/libs/index/src/indexComm.c
source/libs/index/src/indexComm.c
+6
-6
source/libs/index/src/indexFilter.c
source/libs/index/src/indexFilter.c
+33
-34
source/libs/transport/src/thttp.c
source/libs/transport/src/thttp.c
+5
-5
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+22
-28
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+1
-1
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+11
-11
未找到文件。
source/libs/index/src/indexComm.c
浏览文件 @
68a1c12d
...
...
@@ -81,28 +81,28 @@ __compar_fn_t idxGetCompar(int8_t type) {
}
return
getComparFunc
(
type
,
0
);
}
static
TExeCond
tCompareLessThan
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
static
FORCE_INLINE
TExeCond
tCompareLessThan
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
idxGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_LESS_THAN
,
a
,
b
,
type
);
}
static
TExeCond
tCompareLessEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
static
FORCE_INLINE
TExeCond
tCompareLessEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
idxGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_LESS_EQUAL
,
a
,
b
,
type
);
}
static
TExeCond
tCompareGreaterThan
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
static
FORCE_INLINE
TExeCond
tCompareGreaterThan
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
idxGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_GREATER_THAN
,
a
,
b
,
type
);
}
static
TExeCond
tCompareGreaterEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
static
FORCE_INLINE
TExeCond
tCompareGreaterEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
idxGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_GREATER_EQUAL
,
a
,
b
,
type
);
}
static
TExeCond
tCompareContains
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
static
FORCE_INLINE
TExeCond
tCompareContains
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
idxGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_TERM
,
a
,
b
,
type
);
}
static
TExeCond
tCompareEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
static
FORCE_INLINE
TExeCond
tCompareEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
idxGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_TERM
,
a
,
b
,
type
);
}
...
...
source/libs/index/src/indexFilter.c
浏览文件 @
68a1c12d
...
...
@@ -88,7 +88,7 @@ typedef struct SIFCtx {
SIndexMetaArg
arg
;
}
SIFCtx
;
static
int32_t
sifGetFuncFromSql
(
EOperatorType
src
,
EIndexQueryType
*
dst
)
{
static
FORCE_INLINE
int32_t
sifGetFuncFromSql
(
EOperatorType
src
,
EIndexQueryType
*
dst
)
{
if
(
src
==
OP_TYPE_GREATER_THAN
)
{
*
dst
=
QUERY_GREATER_THAN
;
}
else
if
(
src
==
OP_TYPE_GREATER_EQUAL
)
{
...
...
@@ -110,10 +110,9 @@ static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
}
typedef
int32_t
(
*
sif_func_t
)(
SIFParam
*
left
,
SIFParam
*
rigth
,
SIFParam
*
output
);
static
sif_func_t
sifNullFunc
=
NULL
;
static
void
sifFreeParam
(
SIFParam
*
param
)
{
static
FORCE_INLINE
void
sifFreeParam
(
SIFParam
*
param
)
{
if
(
param
==
NULL
)
return
;
taosArrayDestroy
(
param
->
result
);
...
...
@@ -123,7 +122,7 @@ static void sifFreeParam(SIFParam *param) {
param
->
pFilter
=
NULL
;
}
static
int32_t
sifGetOperParamNum
(
EOperatorType
ty
)
{
static
FORCE_INLINE
int32_t
sifGetOperParamNum
(
EOperatorType
ty
)
{
if
(
OP_TYPE_IS_NULL
==
ty
||
OP_TYPE_IS_NOT_NULL
==
ty
||
OP_TYPE_IS_TRUE
==
ty
||
OP_TYPE_IS_NOT_TRUE
==
ty
||
OP_TYPE_IS_FALSE
==
ty
||
OP_TYPE_IS_NOT_FALSE
==
ty
||
OP_TYPE_IS_UNKNOWN
==
ty
||
OP_TYPE_IS_NOT_UNKNOWN
==
ty
||
OP_TYPE_MINUS
==
ty
)
{
...
...
@@ -131,14 +130,14 @@ static int32_t sifGetOperParamNum(EOperatorType ty) {
}
return
2
;
}
static
int32_t
sifValidOp
(
EOperatorType
ty
)
{
static
FORCE_INLINE
int32_t
sifValidOp
(
EOperatorType
ty
)
{
if
((
ty
>=
OP_TYPE_ADD
&&
ty
<=
OP_TYPE_BIT_OR
)
||
(
ty
==
OP_TYPE_IN
||
ty
==
OP_TYPE_NOT_IN
)
||
(
ty
==
OP_TYPE_LIKE
||
ty
==
OP_TYPE_NOT_LIKE
||
ty
==
OP_TYPE_MATCH
||
ty
==
OP_TYPE_NMATCH
))
{
return
-
1
;
}
return
0
;
}
static
int32_t
sifValidColumn
(
SColumnNode
*
cn
)
{
static
FORCE_INLINE
int32_t
sifValidColumn
(
SColumnNode
*
cn
)
{
// add more check
if
(
cn
==
NULL
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
...
...
@@ -149,7 +148,7 @@ static int32_t sifValidColumn(SColumnNode *cn) {
return
TSDB_CODE_SUCCESS
;
}
static
SIdxFltStatus
sifMergeCond
(
ELogicConditionType
type
,
SIdxFltStatus
ls
,
SIdxFltStatus
rs
)
{
static
FORCE_INLINE
SIdxFltStatus
sifMergeCond
(
ELogicConditionType
type
,
SIdxFltStatus
ls
,
SIdxFltStatus
rs
)
{
// enh rule later
if
(
type
==
LOGIC_COND_TYPE_AND
)
{
if
(
ls
==
SFLT_NOT_INDEX
||
rs
==
SFLT_NOT_INDEX
)
{
...
...
@@ -167,7 +166,7 @@ static SIdxFltStatus sifMergeCond(ELogicConditionType type, SIdxFltStatus ls, SI
return
SFLT_NOT_INDEX
;
}
static
int32_t
sifGetValueFromNode
(
SNode
*
node
,
char
**
value
)
{
static
FORCE_INLINE
int32_t
sifGetValueFromNode
(
SNode
*
node
,
char
**
value
)
{
// covert data From snode;
SValueNode
*
vn
=
(
SValueNode
*
)
node
;
...
...
@@ -205,7 +204,7 @@ static int32_t sifGetValueFromNode(SNode *node, char **value) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
sifInitJsonParam
(
SNode
*
node
,
SIFParam
*
param
,
SIFCtx
*
ctx
)
{
static
FORCE_INLINE
int32_t
sifInitJsonParam
(
SNode
*
node
,
SIFParam
*
param
,
SIFCtx
*
ctx
)
{
SOperatorNode
*
nd
=
(
SOperatorNode
*
)
node
;
assert
(
nodeType
(
node
)
==
QUERY_NODE_OPERATOR
);
SColumnNode
*
l
=
(
SColumnNode
*
)
nd
->
pLeft
;
...
...
@@ -355,30 +354,30 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
typedef
int
(
*
Filter
)(
void
*
a
,
void
*
b
,
int16_t
dtype
);
typedef
int
(
*
Filter
Func
)(
void
*
a
,
void
*
b
,
int16_t
dtype
);
int
sifGreaterThan
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
static
FORCE_INLINE
int
sifGreaterThan
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
__compar_fn_t
func
=
getComparFunc
(
dtype
,
0
);
return
tDoCompare
(
func
,
QUERY_GREATER_THAN
,
a
,
b
);
}
int
sifGreaterEqual
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
static
FORCE_INLINE
int
sifGreaterEqual
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
__compar_fn_t
func
=
getComparFunc
(
dtype
,
0
);
return
tDoCompare
(
func
,
QUERY_GREATER_EQUAL
,
a
,
b
);
}
int
sifLessEqual
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
static
FORCE_INLINE
int
sifLessEqual
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
__compar_fn_t
func
=
getComparFunc
(
dtype
,
0
);
return
tDoCompare
(
func
,
QUERY_LESS_EQUAL
,
a
,
b
);
}
int
sifLessThan
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
static
FORCE_INLINE
int
sifLessThan
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
__compar_fn_t
func
=
getComparFunc
(
dtype
,
0
);
return
(
int
)
tDoCompare
(
func
,
QUERY_LESS_THAN
,
a
,
b
);
}
int
sifEqual
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
static
FORCE_INLINE
int
sifEqual
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
__compar_fn_t
func
=
getComparFunc
(
dtype
,
0
);
//__compar_fn_t func = idxGetCompar(dtype);
return
(
int
)
tDoCompare
(
func
,
QUERY_TERM
,
a
,
b
);
}
static
F
ilter
sifGetFilterFunc
(
EIndexQueryType
type
,
bool
*
reverse
)
{
static
F
ORCE_INLINE
FilterFunc
sifGetFilterFunc
(
EIndexQueryType
type
,
bool
*
reverse
)
{
if
(
type
==
QUERY_LESS_EQUAL
||
type
==
QUERY_LESS_THAN
)
{
*
reverse
=
true
;
}
else
{
...
...
@@ -470,8 +469,8 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
indexMultiTermQueryAdd
(
mtm
,
tm
,
qtype
);
ret
=
indexJsonSearch
(
arg
->
ivtIdx
,
mtm
,
output
->
result
);
}
else
{
bool
reverse
;
Filter
filterFunc
=
sifGetFilterFunc
(
qtype
,
&
reverse
);
bool
reverse
;
Filter
Func
filterFunc
=
sifGetFilterFunc
(
qtype
,
&
reverse
);
SMetaFltParam
param
=
{.
suid
=
arg
->
suid
,
.
cid
=
left
->
colId
,
...
...
@@ -498,72 +497,72 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
return
ret
;
}
static
int32_t
sifLessThanFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifLessThanFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_LOWER_THAN
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifLessEqualFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifLessEqualFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_LOWER_EQUAL
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifGreaterThanFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifGreaterThanFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_GREATER_THAN
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifGreaterEqualFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifGreaterEqualFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_GREATER_EQUAL
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifEqualFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifEqualFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_EQUAL
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifNotEqualFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifNotEqualFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_NOT_EQUAL
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifInFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifInFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_IN
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifNotInFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifNotInFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_NOT_IN
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifLikeFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifLikeFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_LIKE
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifNotLikeFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifNotLikeFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_NOT_LIKE
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifMatchFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifMatchFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_MATCH
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifNotMatchFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifNotMatchFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_NMATCH
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifJsonContains
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifJsonContains
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_JSON_CONTAINS
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifJsonGetValue
(
SIFParam
*
left
,
SIFParam
*
rigth
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifJsonGetValue
(
SIFParam
*
left
,
SIFParam
*
rigth
,
SIFParam
*
output
)
{
// return 0
return
0
;
}
static
int32_t
sifDefaultFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifDefaultFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
// add more except
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
static
int32_t
sifGetOperFn
(
int32_t
funcId
,
sif_func_t
*
func
,
SIdxFltStatus
*
status
)
{
static
FORCE_INLINE
int32_t
sifGetOperFn
(
int32_t
funcId
,
sif_func_t
*
func
,
SIdxFltStatus
*
status
)
{
// impl later
*
status
=
SFLT_ACCURATE_INDEX
;
switch
(
funcId
)
{
...
...
source/libs/transport/src/thttp.c
浏览文件 @
68a1c12d
...
...
@@ -126,22 +126,22 @@ _OVER:
return
code
;
}
static
void
destroyHttpClient
(
SHttpClient
*
cli
)
{
static
FORCE_INLINE
void
destroyHttpClient
(
SHttpClient
*
cli
)
{
taosMemoryFree
(
cli
->
wbuf
);
taosMemoryFree
(
cli
->
rbuf
);
taosMemoryFree
(
cli
->
addr
);
taosMemoryFree
(
cli
);
}
static
void
clientCloseCb
(
uv_handle_t
*
handle
)
{
static
FORCE_INLINE
void
clientCloseCb
(
uv_handle_t
*
handle
)
{
SHttpClient
*
cli
=
handle
->
data
;
destroyHttpClient
(
cli
);
}
static
void
clientAllocBuffCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
static
FORCE_INLINE
void
clientAllocBuffCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
SHttpClient
*
cli
=
handle
->
data
;
buf
->
base
=
cli
->
rbuf
;
buf
->
len
=
HTTP_RECV_BUF_SIZE
;
}
static
void
clientRecvCb
(
uv_stream_t
*
handle
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
static
FORCE_INLINE
void
clientRecvCb
(
uv_stream_t
*
handle
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
SHttpClient
*
cli
=
handle
->
data
;
if
(
nread
<
0
)
{
uError
(
"http-report recv error:%s"
,
uv_err_name
(
nread
));
...
...
@@ -173,7 +173,7 @@ static void clientConnCb(uv_connect_t* req, int32_t status) {
uv_write
(
&
cli
->
req
,
(
uv_stream_t
*
)
&
cli
->
tcp
,
cli
->
wbuf
,
2
,
clientSentCb
);
}
static
int32_t
taosBuildDstAddr
(
const
char
*
server
,
uint16_t
port
,
struct
sockaddr_in
*
dest
)
{
static
FORCE_INLINE
int32_t
taosBuildDstAddr
(
const
char
*
server
,
uint16_t
port
,
struct
sockaddr_in
*
dest
)
{
uint32_t
ip
=
taosGetIpv4FromFqdn
(
server
);
if
(
ip
==
0xffffffff
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
source/libs/transport/src/transCli.c
浏览文件 @
68a1c12d
...
...
@@ -69,11 +69,9 @@ typedef struct SCliThrd {
SAsyncPool
*
asyncPool
;
uv_prepare_t
*
prepare
;
void
*
pool
;
// conn pool
// timer handles
SArray
*
timerList
;
// msg queue
queue
msg
;
TdThreadMutex
msgMtx
;
SDelayQueue
*
delayQueue
;
...
...
@@ -108,7 +106,7 @@ static void cliReadTimeoutCb(uv_timer_t* handle);
// register timer in each thread to clear expire conn
// static void cliTimeoutCb(uv_timer_t* handle);
// alloc buffer for recv
static
void
cliAllocRecvBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
);
static
FORCE_INLINE
void
cliAllocRecvBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
);
// callback after recv nbytes from socket
static
void
cliRecvCb
(
uv_stream_t
*
cli
,
ssize_t
nread
,
const
uv_buf_t
*
buf
);
// callback after send data to socket
...
...
@@ -132,10 +130,10 @@ static void cliSend(SCliConn* pConn);
static
void
cliDestroyConnMsgs
(
SCliConn
*
conn
,
bool
destroy
);
// cli util func
static
bool
cliIsEpsetUpdated
(
int32_t
code
,
STransConnCtx
*
pCtx
);
static
void
cliMayCvtFqdnToIp
(
SEpSet
*
pEpSet
,
SCvtAddr
*
pCvtAddr
);
static
FORCE_INLINE
bool
cliIsEpsetUpdated
(
int32_t
code
,
STransConnCtx
*
pCtx
);
static
FORCE_INLINE
void
cliMayCvtFqdnToIp
(
SEpSet
*
pEpSet
,
SCvtAddr
*
pCvtAddr
);
static
int32_t
cliBuildExceptResp
(
SCliMsg
*
pMsg
,
STransMsg
*
resp
);
static
FORCE_INLINE
int32_t
cliBuildExceptResp
(
SCliMsg
*
pMsg
,
STransMsg
*
resp
);
// process data read from server, add decompress etc later
static
void
cliHandleResp
(
SCliConn
*
conn
);
...
...
@@ -150,12 +148,10 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
static
void
(
*
cliAsyncHandle
[])(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
)
=
{
cliHandleReq
,
cliHandleQuit
,
cliHandleRelease
,
NULL
,
cliHandleUpdate
};
static
void
cliSendQuit
(
SCliThrd
*
thrd
);
static
void
destroyUserdata
(
STransMsg
*
userdata
);
static
int
cliRBChoseIdx
(
STrans
*
pTransInst
);
static
FORCE_INLINE
void
destroyUserdata
(
STransMsg
*
userdata
);
static
FORCE_INLINE
void
destroyCmsg
(
void
*
cmsg
);
static
FORCE_INLINE
int
cliRBChoseIdx
(
STrans
*
pTransInst
);
static
void
destroyCmsg
(
void
*
cmsg
);
static
void
transDestroyConnCtx
(
STransConnCtx
*
ctx
);
// thread obj
static
SCliThrd
*
createThrdObj
();
...
...
@@ -885,26 +881,23 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
}
return
conn
;
}
void
cliMayCvtFqdnToIp
(
SEpSet
*
pEpSet
,
SCvtAddr
*
pCvtAddr
)
{
FORCE_INLINE
void
cliMayCvtFqdnToIp
(
SEpSet
*
pEpSet
,
SCvtAddr
*
pCvtAddr
)
{
if
(
pCvtAddr
->
cvt
==
false
)
{
return
;
}
for
(
int
i
=
0
;
i
<
pEpSet
->
numOfEps
&&
pEpSet
->
numOfEps
==
1
;
i
++
)
{
if
(
strncmp
(
pEpSet
->
eps
[
i
].
fqdn
,
pCvtAddr
->
fqdn
,
TSDB_FQDN_LEN
)
==
0
)
{
memset
(
pEpSet
->
eps
[
i
].
fqdn
,
0
,
TSDB_FQDN_LEN
);
memcpy
(
pEpSet
->
eps
[
i
].
fqdn
,
pCvtAddr
->
ip
,
TSDB_FQDN_LEN
);
}
if
(
pEpSet
->
numOfEps
==
1
&&
strncmp
(
pEpSet
->
eps
[
0
].
fqdn
,
pCvtAddr
->
fqdn
,
TSDB_FQDN_LEN
)
==
0
)
{
memset
(
pEpSet
->
eps
[
0
].
fqdn
,
0
,
TSDB_FQDN_LEN
);
memcpy
(
pEpSet
->
eps
[
0
].
fqdn
,
pCvtAddr
->
ip
,
TSDB_FQDN_LEN
);
}
}
bool
cliIsEpsetUpdated
(
int32_t
code
,
STransConnCtx
*
pCtx
)
{
FORCE_INLINE
bool
cliIsEpsetUpdated
(
int32_t
code
,
STransConnCtx
*
pCtx
)
{
if
(
code
!=
0
)
return
false
;
if
(
pCtx
->
retryCnt
==
0
)
return
false
;
if
(
transEpSetIsEqual
(
&
pCtx
->
epSet
,
&
pCtx
->
origEpSet
))
return
false
;
return
true
;
}
int32_t
cliBuildExceptResp
(
SCliMsg
*
pMsg
,
STransMsg
*
pResp
)
{
FORCE_INLINE
int32_t
cliBuildExceptResp
(
SCliMsg
*
pMsg
,
STransMsg
*
pResp
)
{
if
(
pMsg
==
NULL
)
return
-
1
;
memset
(
pResp
,
0
,
sizeof
(
STransMsg
));
...
...
@@ -1128,14 +1121,15 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
return
cli
;
}
static
void
destroyUserdata
(
STransMsg
*
userdata
)
{
FORCE_INLINE
void
destroyUserdata
(
STransMsg
*
userdata
)
{
if
(
userdata
->
pCont
==
NULL
)
{
return
;
}
transFreeMsg
(
userdata
->
pCont
);
userdata
->
pCont
=
NULL
;
}
static
void
destroyCmsg
(
void
*
arg
)
{
FORCE_INLINE
void
destroyCmsg
(
void
*
arg
)
{
SCliMsg
*
pMsg
=
arg
;
if
(
pMsg
==
NULL
)
{
return
;
...
...
@@ -1220,7 +1214,7 @@ void cliWalkCb(uv_handle_t* handle, void* arg) {
}
}
int
cliRBChoseIdx
(
STrans
*
pTransInst
)
{
FORCE_INLINE
int
cliRBChoseIdx
(
STrans
*
pTransInst
)
{
int8_t
index
=
pTransInst
->
index
;
if
(
pTransInst
->
numOfThreads
==
0
)
{
return
-
1
;
...
...
@@ -1230,7 +1224,7 @@ int cliRBChoseIdx(STrans* pTransInst) {
}
return
index
%
pTransInst
->
numOfThreads
;
}
static
void
doDelayTask
(
void
*
param
)
{
static
FORCE_INLINE
void
doDelayTask
(
void
*
param
)
{
STaskArg
*
arg
=
param
;
SCliMsg
*
pMsg
=
arg
->
param1
;
SCliThrd
*
pThrd
=
arg
->
param2
;
...
...
@@ -1264,13 +1258,13 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
transDQSched
(
pThrd
->
delayQueue
,
doDelayTask
,
arg
,
TRANS_RETRY_INTERVAL
);
}
void
cliCompareAndSwap
(
int8_t
*
val
,
int8_t
exp
,
int8_t
newVal
)
{
FORCE_INLINE
void
cliCompareAndSwap
(
int8_t
*
val
,
int8_t
exp
,
int8_t
newVal
)
{
if
(
*
val
!=
exp
)
{
*
val
=
newVal
;
}
}
bool
cliTryExtractEpSet
(
STransMsg
*
pResp
,
SEpSet
*
dst
)
{
FORCE_INLINE
bool
cliTryExtractEpSet
(
STransMsg
*
pResp
,
SEpSet
*
dst
)
{
if
((
pResp
==
NULL
||
pResp
->
info
.
hasEpSet
==
0
))
{
return
false
;
}
...
...
@@ -1402,7 +1396,7 @@ void transUnrefCliHandle(void* handle) {
cliDestroyConn
((
SCliConn
*
)
handle
,
true
);
}
}
SCliThrd
*
transGetWorkThrdFromHandle
(
int64_t
handle
,
bool
*
validHandle
)
{
static
FORCE_INLINE
SCliThrd
*
transGetWorkThrdFromHandle
(
int64_t
handle
,
bool
*
validHandle
)
{
SCliThrd
*
pThrd
=
NULL
;
SExHandle
*
exh
=
transAcquireExHandle
(
transGetRefMgt
(),
handle
);
if
(
exh
==
NULL
)
{
...
...
source/libs/transport/src/transComm.c
浏览文件 @
68a1c12d
...
...
@@ -424,7 +424,7 @@ void transQueueDestroy(STransQueue* queue) {
taosArrayDestroy
(
queue
->
q
);
}
static
int32_t
timeCompare
(
const
HeapNode
*
a
,
const
HeapNode
*
b
)
{
static
FORCE_INLINE
int32_t
timeCompare
(
const
HeapNode
*
a
,
const
HeapNode
*
b
)
{
SDelayTask
*
arg1
=
container_of
(
a
,
SDelayTask
,
node
);
SDelayTask
*
arg2
=
container_of
(
b
,
SDelayTask
,
node
);
if
(
arg1
->
execTime
>
arg2
->
execTime
)
{
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
68a1c12d
...
...
@@ -125,17 +125,17 @@ static void uvWorkAfterTask(uv_work_t* req, int status);
static
void
uvWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
);
static
void
uvFreeCb
(
uv_handle_t
*
handle
);
static
void
uvStartSendRespImpl
(
SSvrMsg
*
smsg
);
static
FORCE_INLINE
void
uvStartSendRespImpl
(
SSvrMsg
*
smsg
);
static
void
uvPrepareSendData
(
SSvrMsg
*
msg
,
uv_buf_t
*
wb
);
static
void
uvStartSendResp
(
SSvrMsg
*
msg
);
static
void
uvNotifyLinkBrokenToApp
(
SSvrConn
*
conn
);
static
void
destroySmsg
(
SSvrMsg
*
smsg
);
// check whether already read complete packet
static
SSvrConn
*
createConn
(
void
*
hThrd
);
static
void
destroyConn
(
SSvrConn
*
conn
,
bool
clear
/*clear handle or not*/
);
static
void
destroyConnRegArg
(
SSvrConn
*
conn
);
static
FORCE_INLINE
void
destroySmsg
(
SSvrMsg
*
smsg
);
static
FORCE_INLINE
SSvrConn
*
createConn
(
void
*
hThrd
);
static
FORCE_INLINE
void
destroyConn
(
SSvrConn
*
conn
,
bool
clear
/*clear handle or not*/
);
static
FORCE_INLINE
void
destroyConnRegArg
(
SSvrConn
*
conn
);
static
int
reallocConnRef
(
SSvrConn
*
conn
);
...
...
@@ -413,7 +413,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
wb
->
len
=
len
;
}
static
void
uvStartSendRespImpl
(
SSvrMsg
*
smsg
)
{
static
FORCE_INLINE
void
uvStartSendRespImpl
(
SSvrMsg
*
smsg
)
{
SSvrConn
*
pConn
=
smsg
->
pConn
;
if
(
pConn
->
broken
)
{
return
;
...
...
@@ -447,7 +447,7 @@ static void uvStartSendResp(SSvrMsg* smsg) {
return
;
}
static
void
destroySmsg
(
SSvrMsg
*
smsg
)
{
static
FORCE_INLINE
void
destroySmsg
(
SSvrMsg
*
smsg
)
{
if
(
smsg
==
NULL
)
{
return
;
}
...
...
@@ -812,7 +812,7 @@ void* transWorkerThread(void* arg) {
return
NULL
;
}
static
SSvrConn
*
createConn
(
void
*
hThrd
)
{
static
FORCE_INLINE
SSvrConn
*
createConn
(
void
*
hThrd
)
{
SWorkThrd
*
pThrd
=
hThrd
;
SSvrConn
*
pConn
=
(
SSvrConn
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSvrConn
));
...
...
@@ -842,7 +842,7 @@ static SSvrConn* createConn(void* hThrd) {
return
pConn
;
}
static
void
destroyConn
(
SSvrConn
*
conn
,
bool
clear
)
{
static
FORCE_INLINE
void
destroyConn
(
SSvrConn
*
conn
,
bool
clear
)
{
if
(
conn
==
NULL
)
{
return
;
}
...
...
@@ -854,7 +854,7 @@ static void destroyConn(SSvrConn* conn, bool clear) {
}
}
}
static
void
destroyConnRegArg
(
SSvrConn
*
conn
)
{
static
FORCE_INLINE
void
destroyConnRegArg
(
SSvrConn
*
conn
)
{
if
(
conn
->
regArg
.
init
==
1
)
{
transFreeMsg
(
conn
->
regArg
.
msg
.
pCont
);
conn
->
regArg
.
init
=
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录