Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
daefe101
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
daefe101
编写于
4月 20, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/check
上级
2702903e
8213315f
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
217 addition
and
129 deletion
+217
-129
include/common/trow.h
include/common/trow.h
+26
-2
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/client/src/clientHb.c
source/client/src/clientHb.c
+34
-4
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+12
-5
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+14
-6
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+1
-1
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+16
-6
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+2
-2
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+1
-4
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+7
-17
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+11
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+32
-11
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+1
-1
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+58
-70
未找到文件。
include/common/trow.h
浏览文件 @
daefe101
...
...
@@ -651,6 +651,8 @@ static int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
TD_ROW_SET_INFO
(
pBuilder
->
pBuf
,
0
);
TD_ROW_SET_TYPE
(
pBuilder
->
pBuf
,
pBuilder
->
rowType
);
TASSERT
(
pBuilder
->
nBitmaps
>
0
&&
pBuilder
->
flen
>
0
);
uint32_t
len
=
0
;
switch
(
pBuilder
->
rowType
)
{
case
TD_ROW_TP
:
...
...
@@ -1165,6 +1167,18 @@ static FORCE_INLINE int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, in
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief
*
* @param pRow
* @param colId
* @param colType
* @param flen
* @param offset
* @param colIdx start from 0
* @param pVal
* @return FORCE_INLINE
*/
static
FORCE_INLINE
bool
tdSTpRowGetVal
(
STSRow
*
pRow
,
col_id_t
colId
,
col_type_t
colType
,
int32_t
flen
,
uint32_t
offset
,
col_id_t
colIdx
,
SCellVal
*
pVal
)
{
if
(
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
...
...
@@ -1172,10 +1186,20 @@ static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t
return
true
;
}
void
*
pBitmap
=
tdGetBitmapAddrTp
(
pRow
,
flen
);
tdGetTpRowValOfCol
(
pVal
,
pRow
,
pBitmap
,
colType
,
offset
-
sizeof
(
TSKEY
),
colIdx
-
1
);
tdGetTpRowValOfCol
(
pVal
,
pRow
,
pBitmap
,
colType
,
offset
-
sizeof
(
TSKEY
),
colIdx
);
return
true
;
}
/**
* @brief
*
* @param pRow
* @param colId
* @param offset
* @param colIdx start from 0
* @param pVal
* @return FORCE_INLINE
*/
static
FORCE_INLINE
bool
tdSKvRowGetVal
(
STSRow
*
pRow
,
col_id_t
colId
,
uint32_t
offset
,
col_id_t
colIdx
,
SCellVal
*
pVal
)
{
if
(
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
...
...
@@ -1183,7 +1207,7 @@ static FORCE_INLINE bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, uint32_t o
return
true
;
}
void
*
pBitmap
=
tdGetBitmapAddrKv
(
pRow
,
tdRowGetNCols
(
pRow
));
tdGetKvRowValOfCol
(
pVal
,
pRow
,
pBitmap
,
offset
,
colIdx
-
1
);
tdGetKvRowValOfCol
(
pVal
,
pRow
,
pBitmap
,
offset
,
colIdx
);
return
true
;
}
...
...
include/util/taoserror.h
浏览文件 @
daefe101
...
...
@@ -617,6 +617,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_FUNC_FUNTION_ERROR TAOS_DEF_ERROR_CODE(0, 0x2800)
#define TSDB_CODE_FUNC_FUNTION_PARA_NUM TAOS_DEF_ERROR_CODE(0, 0x2801)
#define TSDB_CODE_FUNC_FUNTION_PARA_TYPE TAOS_DEF_ERROR_CODE(0, 0x2802)
#define TSDB_CODE_FUNC_FUNTION_PARA_VALUE TAOS_DEF_ERROR_CODE(0, 0x2803)
#ifdef __cplusplus
}
...
...
source/client/src/clientHb.c
浏览文件 @
daefe101
...
...
@@ -630,10 +630,29 @@ void appHbMgrCleanup(void) {
int
sz
=
taosArrayGetSize
(
clientHbMgr
.
appHbMgrs
);
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
SAppHbMgr
*
pTarget
=
taosArrayGetP
(
clientHbMgr
.
appHbMgrs
,
i
);
void
*
pIter
=
taosHashIterate
(
pTarget
->
activeInfo
,
NULL
);
while
(
pIter
!=
NULL
)
{
SClientHbReq
*
pOneReq
=
pIter
;
hbFreeReq
(
pOneReq
);
taosHashCleanup
(
pOneReq
->
info
);
pIter
=
taosHashIterate
(
pTarget
->
activeInfo
,
pIter
);
}
taosHashCleanup
(
pTarget
->
activeInfo
);
pTarget
->
activeInfo
=
NULL
;
pIter
=
taosHashIterate
(
pTarget
->
connInfo
,
NULL
);
while
(
pIter
!=
NULL
)
{
SHbConnInfo
*
info
=
pIter
;
taosMemoryFree
(
info
->
param
);
pIter
=
taosHashIterate
(
pTarget
->
connInfo
,
pIter
);
}
taosHashCleanup
(
pTarget
->
connInfo
);
pTarget
->
connInfo
=
NULL
;
taosMemoryFree
(
pTarget
->
key
);
taosMemoryFree
(
pTarget
);
}
}
...
...
@@ -716,12 +735,23 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in
}
void
hbDeregisterConn
(
SAppHbMgr
*
pAppHbMgr
,
SClientHbKey
connKey
)
{
int32_t
code
=
0
;
code
=
taosHashRemove
(
pAppHbMgr
->
activeInfo
,
&
connKey
,
sizeof
(
SClientHbKey
));
code
=
taosHashRemove
(
pAppHbMgr
->
connInfo
,
&
connKey
,
sizeof
(
SClientHbKey
));
if
(
code
)
{
SClientHbReq
*
pReq
=
taosHashGet
(
pAppHbMgr
->
activeInfo
,
&
connKey
,
sizeof
(
SClientHbKey
));
if
(
pReq
)
{
hbFreeReq
(
pReq
);
taosHashCleanup
(
pReq
->
info
);
taosHashRemove
(
pAppHbMgr
->
activeInfo
,
&
connKey
,
sizeof
(
SClientHbKey
));
}
SHbConnInfo
*
info
=
taosHashGet
(
pAppHbMgr
->
connInfo
,
&
connKey
,
sizeof
(
SClientHbKey
));
if
(
info
)
{
taosMemoryFree
(
info
->
param
);
taosHashRemove
(
pAppHbMgr
->
connInfo
,
&
connKey
,
sizeof
(
SClientHbKey
));
}
if
(
NULL
==
pReq
||
NULL
==
info
)
{
return
;
}
atomic_sub_fetch_32
(
&
pAppHbMgr
->
connKeyCnt
,
1
);
}
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
daefe101
...
...
@@ -1561,12 +1561,19 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
if
(
isChosenRowDataRow
)
{
colId
=
pSchema
->
columns
[
chosen_itr
].
colId
;
offset
=
pSchema
->
columns
[
chosen_itr
].
offset
;
tdSTpRowGetVal
(
row
,
colId
,
pSchema
->
columns
[
chosen_itr
].
type
,
pSchema
->
flen
,
offset
,
chosen_itr
,
&
sVal
);
// TODO: use STSRowIter
tdSTpRowGetVal
(
row
,
colId
,
pSchema
->
columns
[
chosen_itr
].
type
,
pSchema
->
flen
,
offset
,
chosen_itr
-
1
,
&
sVal
);
}
else
{
SKvRowIdx
*
pColIdx
=
tdKvRowColIdxAt
(
row
,
chosen_itr
);
colId
=
pColIdx
->
colId
;
offset
=
pColIdx
->
offset
;
tdSKvRowGetVal
(
row
,
colId
,
offset
,
chosen_itr
,
&
sVal
);
// TODO: use STSRowIter
if
(
chosen_itr
==
0
)
{
colId
=
PRIMARYKEY_TIMESTAMP_COL_ID
;
tdSKvRowGetVal
(
row
,
PRIMARYKEY_TIMESTAMP_COL_ID
,
-
1
,
-
1
,
&
sVal
);
}
else
{
SKvRowIdx
*
pColIdx
=
tdKvRowColIdxAt
(
row
,
chosen_itr
-
1
);
colId
=
pColIdx
->
colId
;
offset
=
pColIdx
->
offset
;
tdSKvRowGetVal
(
row
,
colId
,
offset
,
chosen_itr
-
1
,
&
sVal
);
}
}
if
(
colId
==
pColInfo
->
info
.
colId
)
{
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
daefe101
...
...
@@ -490,6 +490,7 @@ typedef struct SGroupbyOperatorInfo {
SExprInfo
*
pScalarExprInfo
;
int32_t
numOfScalarExpr
;
// the number of scalar expression in group operator
SqlFunctionCtx
*
pScalarFuncCtx
;
int32_t
*
rowCellInfoOffset
;
// offset value for each row result cell info
}
SGroupbyOperatorInfo
;
typedef
struct
SDataGroupInfo
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
daefe101
...
...
@@ -1173,6 +1173,9 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
setPseudoOutputColInfo
(
pResult
,
pCtx
,
pPseudoList
);
pResult
->
info
.
groupId
=
pSrcBlock
->
info
.
groupId
;
// if the source equals to the destination, it is to create a new column as the result of scalar function or some operators.
bool
createNewColModel
=
(
pResult
==
pSrcBlock
);
int32_t
numOfRows
=
0
;
for
(
int32_t
k
=
0
;
k
<
numOfOutput
;
++
k
)
{
...
...
@@ -1181,7 +1184,7 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
if
(
pExpr
[
k
].
pExpr
->
nodeType
==
QUERY_NODE_COLUMN
)
{
// it is a project query
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pResult
->
pDataBlock
,
outputSlotId
);
if
(
pResult
->
info
.
rows
>
0
)
{
if
(
pResult
->
info
.
rows
>
0
&&
!
createNewColModel
)
{
colDataMergeCol
(
pColInfoData
,
pResult
->
info
.
rows
,
pfCtx
->
input
.
pData
[
0
],
pfCtx
->
input
.
numOfRows
);
}
else
{
colDataAssign
(
pColInfoData
,
pfCtx
->
input
.
pData
[
0
],
pfCtx
->
input
.
numOfRows
);
...
...
@@ -1191,7 +1194,7 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
}
else
if
(
pExpr
[
k
].
pExpr
->
nodeType
==
QUERY_NODE_VALUE
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pResult
->
pDataBlock
,
outputSlotId
);
int32_t
offset
=
pResult
->
info
.
rows
;
int32_t
offset
=
createNewColModel
?
0
:
pResult
->
info
.
rows
;
for
(
int32_t
i
=
0
;
i
<
pSrcBlock
->
info
.
rows
;
++
i
)
{
colDataAppend
(
pColInfoData
,
i
+
offset
,
taosVariantGet
(
&
pExpr
[
k
].
base
.
pParam
[
0
].
param
,
pExpr
[
k
].
base
.
pParam
[
0
].
param
.
nType
),
TSDB_DATA_TYPE_NULL
==
pExpr
[
k
].
base
.
pParam
[
0
].
param
.
nType
);
}
...
...
@@ -1207,7 +1210,8 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
SScalarParam
dest
=
{.
columnData
=
&
idata
};
scalarCalculate
(
pExpr
[
k
].
pExpr
->
_optrRoot
.
pRootNode
,
pBlockList
,
&
dest
);
colDataMergeCol
(
pResColData
,
pResult
->
info
.
rows
,
&
idata
,
dest
.
numOfRows
);
int32_t
startOffset
=
createNewColModel
?
0
:
pResult
->
info
.
rows
;
colDataMergeCol
(
pResColData
,
startOffset
,
&
idata
,
dest
.
numOfRows
);
numOfRows
=
dest
.
numOfRows
;
taosArrayDestroy
(
pBlockList
);
...
...
@@ -1224,7 +1228,7 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
pfCtx
->
fpSet
.
init
(
&
pCtx
[
k
],
pResInfo
);
pfCtx
->
pOutput
=
taosArrayGet
(
pResult
->
pDataBlock
,
outputSlotId
);
pfCtx
->
offset
=
pResult
->
info
.
rows
;
// set the start offset
pfCtx
->
offset
=
createNewColModel
?
0
:
pResult
->
info
.
rows
;
// set the start offset
// set the timestamp(_rowts) output buffer
if
(
taosArrayGetSize
(
pPseudoList
)
>
0
)
{
...
...
@@ -1242,7 +1246,9 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
SScalarParam
dest
=
{.
columnData
=
&
idata
};
scalarCalculate
((
SNode
*
)
pExpr
[
k
].
pExpr
->
_function
.
pFunctNode
,
pBlockList
,
&
dest
);
colDataMergeCol
(
pResColData
,
pResult
->
info
.
rows
,
&
idata
,
dest
.
numOfRows
);
int32_t
startOffset
=
createNewColModel
?
0
:
pResult
->
info
.
rows
;
colDataMergeCol
(
pResColData
,
startOffset
,
&
idata
,
dest
.
numOfRows
);
numOfRows
=
dest
.
numOfRows
;
taosArrayDestroy
(
pBlockList
);
...
...
@@ -1252,7 +1258,9 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock*
}
}
pResult
->
info
.
rows
+=
numOfRows
;
if
(
!
createNewColModel
)
{
pResult
->
info
.
rows
+=
numOfRows
;
}
}
void
doTimeWindowInterpolation
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pInfo
,
SArray
*
pDataBlock
,
TSKEY
prevTs
,
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
daefe101
...
...
@@ -341,7 +341,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pInfo
->
pScalarExprInfo
=
pScalarExprInfo
;
pInfo
->
numOfScalarExpr
=
numOfScalarExpr
;
pInfo
->
pScalarFuncCtx
=
createSqlFunctionCtx
(
pExprInfo
,
numOfCols
,
&
pInfo
->
binfo
.
rowCellInfoOffset
);
pInfo
->
pScalarFuncCtx
=
createSqlFunctionCtx
(
pScalarExprInfo
,
numOfScalarExpr
,
&
pInfo
->
rowCellInfoOffset
);
int32_t
code
=
initGroupOptrInfo
(
&
pInfo
->
pGroupColVals
,
&
pInfo
->
groupKeyLen
,
&
pInfo
->
keyBuf
,
pGroupColList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
source/libs/function/src/builtins.c
浏览文件 @
daefe101
...
...
@@ -28,11 +28,15 @@ static int32_t buildFuncErrMsg(char* pErrBuf, int32_t len, int32_t errCode, cons
}
static
int32_t
invaildFuncParaNumErrMsg
(
char
*
pErrBuf
,
int32_t
len
,
const
char
*
pFuncName
)
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_PARA_NUM
,
"Invalid number of
argument
s : %s"
,
pFuncName
);
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_PARA_NUM
,
"Invalid number of
parameter
s : %s"
,
pFuncName
);
}
static
int32_t
invaildFuncParaTypeErrMsg
(
char
*
pErrBuf
,
int32_t
len
,
const
char
*
pFuncName
)
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_PARA_TYPE
,
"Inconsistent datatypes : %s"
,
pFuncName
);
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_PARA_TYPE
,
"Invalid datatypes : %s"
,
pFuncName
);
}
static
int32_t
invaildFuncParaValueErrMsg
(
char
*
pErrBuf
,
int32_t
len
,
const
char
*
pFuncName
)
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_PARA_VALUE
,
"Invalid parameter value : %s"
,
pFuncName
);
}
// There is only one parameter of numeric type, and the return type is parameter type
...
...
@@ -139,8 +143,9 @@ static int32_t translateTimePseudoColumn(SFunctionNode* pFunc, char* pErrBuf, in
}
static
int32_t
translateTimezone
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
// pseudo column do not need to check parameters
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
TD_TIMEZONE_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
};
SExprNode
*
pPara1
=
(
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
pPara1
->
resType
.
bytes
,
.
type
=
pPara1
->
resType
.
type
};
//pFunc->node.resType = (SDataType){.bytes = TD_TIMEZONE_LEN, .type = TSDB_DATA_TYPE_BINARY};
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -317,10 +322,15 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
uint8_t
para1Type
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
))
->
resType
.
type
;
// The function return type has been set during syntax parsing
uint8_t
para2Type
=
pFunc
->
node
.
resType
.
type
;
if
((
TSDB_DATA_TYPE_JSON
==
para1Type
||
TSDB_DATA_TYPE_BLOB
==
para1Type
||
TSDB_DATA_TYPE_MEDIUMBLOB
==
para1Type
)
||
(
TSDB_DATA_TYPE_JSON
==
para2Type
||
TSDB_DATA_TYPE_BLOB
==
para2Type
||
TSDB_DATA_TYPE_MEDIUMBLOB
==
para2Type
))
{
if
(
para2Type
!=
TSDB_DATA_TYPE_BIGINT
&&
para2Type
!=
TSDB_DATA_TYPE_UBIGINT
&&
para2Type
!=
TSDB_DATA_TYPE_VARCHAR
&&
para2Type
!=
TSDB_DATA_TYPE_NCHAR
&&
para2Type
!=
TSDB_DATA_TYPE_TIMESTAMP
)
{
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
int32_t
para2Bytes
=
pFunc
->
node
.
resType
.
bytes
;
if
(
para2Bytes
<=
0
)
{
//non-positive value or overflow
return
invaildFuncParaValueErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
daefe101
...
...
@@ -901,7 +901,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
TSKEY
cts
=
getRowPTs
(
pInput
->
pPTS
,
i
);
if
(
pResInfo
->
numOfRes
==
0
||
*
(
TSKEY
*
)(
buf
+
bytes
)
>
cts
)
{
if
(
pResInfo
->
numOfRes
==
0
||
*
(
TSKEY
*
)(
buf
+
bytes
)
<
cts
)
{
memcpy
(
buf
,
data
,
bytes
);
*
(
TSKEY
*
)(
buf
+
bytes
)
=
cts
;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
...
...
@@ -919,7 +919,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
char
*
data
=
colDataGetData
(
pInputCol
,
i
);
TSKEY
cts
=
getRowPTs
(
pInput
->
pPTS
,
i
);
if
(
pResInfo
->
numOfRes
==
0
||
*
(
TSKEY
*
)(
buf
+
bytes
)
>
cts
)
{
if
(
pResInfo
->
numOfRes
==
0
||
*
(
TSKEY
*
)(
buf
+
bytes
)
<
cts
)
{
memcpy
(
buf
,
data
,
bytes
);
*
(
TSKEY
*
)(
buf
+
bytes
)
=
cts
;
pResInfo
->
numOfRes
=
1
;
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
daefe101
...
...
@@ -888,10 +888,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
if
(
PRIMARYKEY_TIMESTAMP_COL_ID
==
pSchema
->
colId
)
{
TSKEY
tsKey
=
TD_ROW_KEY
(
row
);
if
(
checkTimestamp
(
pDataBlocks
,
(
const
char
*
)
&
tsKey
)
!=
TSDB_CODE_SUCCESS
)
{
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"client time/server time can not be mixed up"
,
sToken
.
z
);
return
TSDB_CODE_TSC_INVALID_TIME_STAMP
;
}
checkTimestamp
(
pDataBlocks
,
(
const
char
*
)
&
tsKey
);
}
}
...
...
source/libs/scalar/src/sclfunc.c
浏览文件 @
daefe101
...
...
@@ -659,27 +659,22 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int32_t
castFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
)
{
int16_t
inputType
=
pInput
[
0
].
columnData
->
info
.
type
;
int16_t
outputType
=
pOutput
[
0
].
columnData
->
info
.
type
;
if
(
outputType
!=
TSDB_DATA_TYPE_BIGINT
&&
outputType
!=
TSDB_DATA_TYPE_UBIGINT
&&
outputType
!=
TSDB_DATA_TYPE_VARCHAR
&&
outputType
!=
TSDB_DATA_TYPE_NCHAR
&&
outputType
!=
TSDB_DATA_TYPE_TIMESTAMP
)
{
return
TSDB_CODE_FAILED
;
}
int64_t
outputLen
=
pOutput
[
0
].
columnData
->
info
.
bytes
;
char
*
input
=
NULL
;
if
(
IS_VAR_DATA_TYPE
(
outputType
))
{
int32_t
factor
=
(
TSDB_DATA_TYPE_NCHAR
==
outputType
)
?
TSDB_NCHAR_SIZE
:
1
;
outputLen
=
outputLen
*
factor
+
VARSTR_HEADER_SIZE
;
}
char
*
outputBuf
=
taosMemoryCalloc
(
outputLen
*
pInput
[
0
].
numOfRows
,
1
);
char
*
output
=
outputBuf
;
if
(
IS_VAR_DATA_TYPE
(
inputType
))
{
input
=
pInput
[
0
].
columnData
->
pData
+
pInput
[
0
].
columnData
->
varmeta
.
offset
[
0
];
}
else
{
input
=
pInput
[
0
].
columnData
->
pData
;
}
for
(
int32_t
i
=
0
;
i
<
pInput
[
0
].
numOfRows
;
++
i
)
{
if
(
colDataIsNull_s
(
pInput
[
0
].
columnData
,
i
))
{
colDataAppendNULL
(
pOutput
->
columnData
,
i
);
continue
;
}
char
*
input
=
colDataGetData
(
pInput
[
0
].
columnData
,
i
);
switch
(
outputType
)
{
case
TSDB_DATA_TYPE_BIGINT
:
{
...
...
@@ -736,7 +731,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
}
else
if
(
inputType
==
TSDB_DATA_TYPE_BINARY
)
{
int32_t
len
=
sprintf
(
varDataVal
(
output
),
"%.*s"
,
(
int32_t
)(
outputLen
-
VARSTR_HEADER_SIZE
),
varDataVal
(
input
));
varDataSetLen
(
output
,
len
);
}
else
if
(
inputType
==
TSDB_DATA_TYPE_
BINARY
||
inputType
==
TSDB_DATA_TYPE_
NCHAR
)
{
}
else
if
(
inputType
==
TSDB_DATA_TYPE_NCHAR
)
{
//not support
return
TSDB_CODE_FAILED
;
}
else
{
...
...
@@ -789,11 +784,6 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
}
colDataAppend
(
pOutput
->
columnData
,
i
,
output
,
false
);
if
(
IS_VAR_DATA_TYPE
(
inputType
))
{
input
+=
varDataTLen
(
input
);
}
else
{
input
+=
tDataTypes
[
inputType
].
bytes
;
}
if
(
IS_VAR_DATA_TYPE
(
outputType
))
{
output
+=
varDataTLen
(
output
);
}
else
{
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
daefe101
...
...
@@ -2749,4 +2749,15 @@ void schedulerDestroy(void) {
taosCloseRef
(
schMgmt
.
jobRef
);
schMgmt
.
jobRef
=
0
;
}
if
(
schMgmt
.
hbConnections
)
{
void
*
pIter
=
taosHashIterate
(
schMgmt
.
hbConnections
,
NULL
);
while
(
pIter
!=
NULL
)
{
SSchHbTrans
*
hb
=
pIter
;
schFreeRpcCtx
(
&
hb
->
rpcCtx
);
pIter
=
taosHashIterate
(
schMgmt
.
hbConnections
,
pIter
);
}
taosHashCleanup
(
schMgmt
.
hbConnections
);
schMgmt
.
hbConnections
=
NULL
;
}
}
source/libs/transport/src/transCli.c
浏览文件 @
daefe101
...
...
@@ -129,6 +129,15 @@ static void transDestroyConnCtx(STransConnCtx* ctx);
static
SCliThrdObj
*
createThrdObj
();
static
void
destroyThrdObj
(
SCliThrdObj
*
pThrd
);
static
void
cliWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
);
#define CLI_RELEASE_UV(loop) \
do { \
uv_walk(loop, cliWalkCb, NULL); \
uv_run(loop, UV_RUN_DEFAULT); \
uv_loop_close(loop); \
} while (0);
// snprintf may cause performance problem
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
do { \
...
...
@@ -212,8 +221,10 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
} \
} while (0)
#define CONN_NO_PERSIST_BY_APP(conn) (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define CONN_RELEASE_BY_SERVER(conn) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define CONN_NO_PERSIST_BY_APP(conn) \
(((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define CONN_RELEASE_BY_SERVER(conn) \
(((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
...
...
@@ -288,8 +299,9 @@ void cliHandleResp(SCliConn* conn) {
tDebug
(
"%s cli conn %p ref by app"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
}
tDebug
(
"%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d"
,
pTransInst
->
label
,
conn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
conn
->
addr
.
sin_addr
),
ntohs
(
conn
->
addr
.
sin_port
),
taosInetNtoa
(
conn
->
locaddr
.
sin_addr
),
ntohs
(
conn
->
locaddr
.
sin_port
),
transMsg
.
contLen
);
tDebug
(
"%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d"
,
pTransInst
->
label
,
conn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
conn
->
addr
.
sin_addr
),
ntohs
(
conn
->
addr
.
sin_port
),
taosInetNtoa
(
conn
->
locaddr
.
sin_addr
),
ntohs
(
conn
->
locaddr
.
sin_port
),
transMsg
.
contLen
);
conn
->
secured
=
pHead
->
secured
;
...
...
@@ -355,10 +367,12 @@ void cliHandleExcept(SCliConn* pConn) {
if
(
pMsg
==
NULL
&&
!
CONN_NO_PERSIST_BY_APP
(
pConn
))
{
transMsg
.
ahandle
=
transCtxDumpVal
(
&
pConn
->
ctx
,
transMsg
.
msgType
);
tDebug
(
"%s cli conn %p construct ahandle %p by %s"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
transMsg
.
ahandle
,
TMSG_INFO
(
transMsg
.
msgType
));
tDebug
(
"%s cli conn %p construct ahandle %p by %s"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
transMsg
.
ahandle
,
TMSG_INFO
(
transMsg
.
msgType
));
if
(
transMsg
.
ahandle
==
NULL
)
{
transMsg
.
ahandle
=
transCtxDumpBrokenlinkVal
(
&
pConn
->
ctx
,
(
int32_t
*
)
&
(
transMsg
.
msgType
));
tDebug
(
"%s cli conn %p construct ahandle %p due to brokenlink"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
transMsg
.
ahandle
);
tDebug
(
"%s cli conn %p construct ahandle %p due to brokenlink"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
transMsg
.
ahandle
);
}
}
else
{
transMsg
.
ahandle
=
pCtx
?
pCtx
->
ahandle
:
NULL
;
...
...
@@ -631,8 +645,9 @@ void cliSend(SCliConn* pConn) {
pHead
->
release
=
REQUEST_RELEASE_HANDLE
(
pCliMsg
)
?
1
:
0
;
uv_buf_t
wb
=
uv_buf_init
((
char
*
)
pHead
,
msgLen
);
tDebug
(
"%s cli conn %p %s is send to %s:%d, local info %s:%d"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
locaddr
.
sin_addr
),
ntohs
(
pConn
->
locaddr
.
sin_port
));
tDebug
(
"%s cli conn %p %s is send to %s:%d, local info %s:%d"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
locaddr
.
sin_addr
),
ntohs
(
pConn
->
locaddr
.
sin_port
));
if
(
pHead
->
persist
==
1
)
{
CONN_SET_PERSIST_BY_APP
(
pConn
);
...
...
@@ -671,9 +686,11 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
destroyCmsg
(
pMsg
);
destroyConnPool
(
pThrd
->
pool
);
uv_timer_stop
(
&
pThrd
->
timer
);
uv_walk
(
pThrd
->
loop
,
cliWalkCb
,
NULL
);
pThrd
->
quit
=
true
;
uv_stop
(
pThrd
->
loop
);
//
uv_stop(pThrd->loop);
}
static
void
cliHandleRelease
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
)
{
SCliConn
*
conn
=
pMsg
->
msg
.
handle
;
...
...
@@ -786,7 +803,6 @@ static void* cliWorkThread(void* arg) {
SCliThrdObj
*
pThrd
=
(
SCliThrdObj
*
)
arg
;
setThreadName
(
"trans-cli-work"
);
uv_run
(
pThrd
->
loop
,
UV_RUN_DEFAULT
);
return
NULL
;
}
...
...
@@ -851,8 +867,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
if
(
pThrd
==
NULL
)
{
return
;
}
uv_stop
(
pThrd
->
loop
);
taosThreadJoin
(
pThrd
->
thread
,
NULL
);
CLI_RELEASE_UV
(
pThrd
->
loop
);
taosThreadMutexDestroy
(
&
pThrd
->
msgMtx
);
transDestroyAsyncPool
(
pThrd
->
asyncPool
);
...
...
@@ -874,6 +890,11 @@ void cliSendQuit(SCliThrdObj* thrd) {
msg
->
type
=
Quit
;
transSendAsync
(
thrd
->
asyncPool
,
&
msg
->
q
);
}
void
cliWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
)
{
if
(
!
uv_is_closing
(
handle
))
{
uv_close
(
handle
,
NULL
);
}
}
int
cliRBChoseIdx
(
STrans
*
pTransInst
)
{
int64_t
index
=
pTransInst
->
index
;
...
...
source/libs/transport/src/transComm.c
浏览文件 @
daefe101
...
...
@@ -195,7 +195,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
void
transDestroyAsyncPool
(
SAsyncPool
*
pool
)
{
for
(
int
i
=
0
;
i
<
pool
->
nAsync
;
i
++
)
{
uv_async_t
*
async
=
&
(
pool
->
asyncs
[
i
]);
uv_close
((
uv_handle_t
*
)
async
,
NULL
);
//
uv_close((uv_handle_t*)async, NULL);
SAsyncItem
*
item
=
async
->
data
;
taosThreadMutexDestroy
(
&
item
->
mtx
);
taosMemoryFree
(
item
);
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
daefe101
...
...
@@ -93,27 +93,6 @@ typedef struct SServerObj {
static
const
char
*
notify
=
"a"
;
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tTrace("server conn %p received release request", conn); \
\
STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
} while (0)
static
void
uvAllocConnBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
);
static
void
uvAllocRecvBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
);
static
void
uvOnRecvCb
(
uv_stream_t
*
cli
,
ssize_t
nread
,
const
uv_buf_t
*
buf
);
...
...
@@ -125,11 +104,8 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf)
static
void
uvWorkerAsyncCb
(
uv_async_t
*
handle
);
static
void
uvAcceptAsyncCb
(
uv_async_t
*
handle
);
static
void
uvShutDownCb
(
uv_shutdown_t
*
req
,
int
status
);
static
void
uvFreeCb
(
uv_handle_t
*
handle
)
{
//
taosMemoryFree
(
handle
);
}
static
void
uvWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
);
static
void
uvFreeCb
(
uv_handle_t
*
handle
);
static
void
uvStartSendRespInternal
(
SSrvMsg
*
smsg
);
static
void
uvPrepareSendData
(
SSrvMsg
*
msg
,
uv_buf_t
*
wb
);
...
...
@@ -146,7 +122,8 @@ static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static
void
uvHandleRelease
(
SSrvMsg
*
msg
,
SWorkThrdObj
*
thrd
);
static
void
uvHandleResp
(
SSrvMsg
*
msg
,
SWorkThrdObj
*
thrd
);
static
void
uvHandleRegister
(
SSrvMsg
*
msg
,
SWorkThrdObj
*
thrd
);
static
void
(
*
transAsyncHandle
[])(
SSrvMsg
*
msg
,
SWorkThrdObj
*
thrd
)
=
{
uvHandleResp
,
uvHandleQuit
,
uvHandleRelease
,
uvHandleRegister
};
static
void
(
*
transAsyncHandle
[])(
SSrvMsg
*
msg
,
SWorkThrdObj
*
thrd
)
=
{
uvHandleResp
,
uvHandleQuit
,
uvHandleRelease
,
uvHandleRegister
};
static
void
uvDestroyConn
(
uv_handle_t
*
handle
);
...
...
@@ -158,6 +135,34 @@ static void* transAcceptThread(void* arg);
static
bool
addHandleToWorkloop
(
void
*
arg
);
static
bool
addHandleToAcceptloop
(
void
*
arg
);
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tTrace("server conn %p received release request", conn); \
\
STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
} while (0)
#define SRV_RELEASE_UV(loop) \
do { \
uv_walk(loop, uvWalkCb, NULL); \
uv_run(loop, UV_RUN_DEFAULT); \
uv_loop_close(loop); \
} while (0);
void
uvAllocRecvBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
SSrvConn
*
conn
=
handle
->
data
;
SConnBuffer
*
pBuf
=
&
conn
->
readBuf
;
...
...
@@ -209,12 +214,13 @@ static void uvHandleReq(SSrvConn* pConn) {
}
if
(
pConn
->
status
==
ConnNormal
&&
pHead
->
noResp
==
0
)
{
transRefSrvHandle
(
pConn
);
tDebug
(
"server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d"
,
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
locaddr
.
sin_addr
),
ntohs
(
pConn
->
locaddr
.
sin_port
),
transMsg
.
contLen
);
tDebug
(
"server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d"
,
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
locaddr
.
sin_addr
),
ntohs
(
pConn
->
locaddr
.
sin_port
),
transMsg
.
contLen
);
}
else
{
tDebug
(
"server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d "
,
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
locaddr
.
sin_addr
),
ntohs
(
pConn
->
loc
addr
.
sin_port
),
transMsg
.
contLen
,
pHead
->
noResp
);
tDebug
(
"server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d "
,
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
t
aosInetNtoa
(
pConn
->
locaddr
.
sin_addr
),
ntohs
(
pConn
->
locaddr
.
sin_port
),
t
ransMsg
.
contLen
,
pHead
->
noResp
);
// no ref here
}
...
...
@@ -354,8 +360,9 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
char
*
msg
=
(
char
*
)
pHead
;
int32_t
len
=
transMsgLenFromCont
(
pMsg
->
contLen
);
tDebug
(
"server conn %p %s is sent to %s:%d, local info: %s:%d"
,
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
locaddr
.
sin_addr
),
ntohs
(
pConn
->
locaddr
.
sin_port
));
tDebug
(
"server conn %p %s is sent to %s:%d, local info: %s:%d"
,
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
locaddr
.
sin_addr
),
ntohs
(
pConn
->
locaddr
.
sin_port
));
pHead
->
msgLen
=
htonl
(
len
);
wb
->
base
=
msg
;
...
...
@@ -367,7 +374,7 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) {
uvPrepareSendData
(
smsg
,
&
wb
);
SSrvConn
*
pConn
=
smsg
->
pConn
;
uv_timer_stop
(
&
pConn
->
pTimer
);
//
uv_timer_stop(&pConn->pTimer);
uv_write
(
&
pConn
->
pWriter
,
(
uv_stream_t
*
)
pConn
->
pTcp
,
&
wb
,
1
,
uvOnSendCb
);
}
static
void
uvStartSendResp
(
SSrvMsg
*
smsg
)
{
...
...
@@ -436,36 +443,17 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
static
void
uvWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
)
{
if
(
!
uv_is_closing
(
handle
))
{
uv_close
(
handle
,
NULL
);
// uv_unref(handle);
tDebug
(
"handle: %p -----test----"
,
handle
);
}
}
#define MAKE_VALGRIND_HAPPY(loop) \
do { \
uv_walk(loop, uvWalkCb, NULL); \
uv_run(loop, UV_RUN_DEFAULT); \
uv_loop_close(loop); \
} while (0);
static
void
uvFreeCb
(
uv_handle_t
*
handle
)
{
//
taosMemoryFree
(
handle
);
}
static
void
uvAcceptAsyncCb
(
uv_async_t
*
async
)
{
SServerObj
*
srv
=
async
->
data
;
tDebug
(
"close server port %d"
,
srv
->
port
);
uv_walk
(
srv
->
loop
,
uvWalkCb
,
NULL
);
// uv_close((uv_handle_t*)async, NULL);
// uv_close((uv_handle_t*)&srv->server, NULL);
// uv_stop(srv->loop);
// uv_print_all_handles(srv->loop, stderr);
// int ref = uv_loop_alive(srv->loop);
// assert(ref == 0);
// tError("active size %d", ref);
// uv_stop(srv->loop);
// uv_run(srv->loop, UV_RUN_DEFAULT);
// fprintf(stderr, "------------------------------------");
// uv_print_all_handles(srv->loop, stderr);
// int ret = uv_loop_close(srv->loop);
// tError("(loop)->active_reqs.count: %d, ret: %d", (srv->loop)->active_reqs.count, ret);
// assert(ret == 0);
}
static
void
uvShutDownCb
(
uv_shutdown_t
*
req
,
int
status
)
{
...
...
@@ -532,8 +520,8 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
pConn
->
pTransInst
=
pThrd
->
pTransInst
;
/* init conn timer*/
uv_timer_init
(
pThrd
->
loop
,
&
pConn
->
pTimer
);
pConn
->
pTimer
.
data
=
pConn
;
//
uv_timer_init(pThrd->loop, &pConn->pTimer);
//
pConn->pTimer.data = pConn;
pConn
->
hostThrd
=
pThrd
;
...
...
@@ -677,17 +665,17 @@ static void uvDestroyConn(uv_handle_t* handle) {
SWorkThrdObj
*
thrd
=
conn
->
hostThrd
;
tDebug
(
"server conn %p destroy"
,
conn
);
uv_timer_stop
(
&
conn
->
pTimer
);
//
uv_timer_stop(&conn->pTimer);
transQueueDestroy
(
&
conn
->
srvMsgs
);
QUEUE_REMOVE
(
&
conn
->
queue
);
taosMemoryFree
(
conn
->
pTcp
);
//
taosMemoryFree(conn);
taosMemoryFree
(
conn
);
if
(
thrd
->
quit
&&
QUEUE_IS_EMPTY
(
&
thrd
->
conn
))
{
tTrace
(
"work thread quit"
);
//
uv_walk(thrd->loop, uvWalkCb, NULL);
uv_loop_close
(
thrd
->
loop
);
uv_stop
(
thrd
->
loop
);
uv_walk
(
thrd
->
loop
,
uvWalkCb
,
NULL
);
//
uv_loop_close(thrd->loop);
//
uv_stop(thrd->loop);
}
}
...
...
@@ -749,9 +737,9 @@ End:
void
uvHandleQuit
(
SSrvMsg
*
msg
,
SWorkThrdObj
*
thrd
)
{
thrd
->
quit
=
true
;
if
(
QUEUE_IS_EMPTY
(
&
thrd
->
conn
))
{
//
uv_walk(thrd->loop, uvWalkCb, NULL);
uv_loop_close
(
thrd
->
loop
);
uv_stop
(
thrd
->
loop
);
uv_walk
(
thrd
->
loop
,
uvWalkCb
,
NULL
);
//
uv_loop_close(thrd->loop);
//
uv_stop(thrd->loop);
}
else
{
destroyAllConn
(
thrd
);
}
...
...
@@ -802,7 +790,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
return
;
}
taosThreadJoin
(
pThrd
->
thread
,
NULL
);
// MAKE_VALGRIND_HAPPY
(pThrd->loop);
SRV_RELEASE_UV
(
pThrd
->
loop
);
transDestroyAsyncPool
(
pThrd
->
asyncPool
);
taosMemoryFree
(
pThrd
->
loop
);
taosMemoryFree
(
pThrd
);
...
...
@@ -822,7 +810,7 @@ void transCloseServer(void* arg) {
uv_async_send
(
srv
->
pAcceptAsync
);
taosThreadJoin
(
srv
->
thread
,
NULL
);
MAKE_VALGRIND_HAPPY
(
srv
->
loop
);
SRV_RELEASE_UV
(
srv
->
loop
);
for
(
int
i
=
0
;
i
<
srv
->
numOfThreads
;
i
++
)
{
sendQuitToWorkThrd
(
srv
->
pThreadObj
[
i
]);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录