Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1c00fd6b
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1c00fd6b
编写于
1月 22, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/warning
上级
3ba31879
963ef70e
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
230 addition
and
159 deletion
+230
-159
include/common/tmsg.h
include/common/tmsg.h
+2
-2
include/libs/planner/planner.h
include/libs/planner/planner.h
+1
-1
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+1
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+31
-35
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+4
-2
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+1
-1
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+19
-7
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+18
-2
source/libs/planner/test/phyPlanTests.cpp
source/libs/planner/test/phyPlanTests.cpp
+3
-3
source/libs/qworker/inc/qworkerInt.h
source/libs/qworker/inc/qworkerInt.h
+9
-9
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+5
-5
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+3
-12
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+75
-56
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+58
-24
未找到文件。
include/common/tmsg.h
浏览文件 @
1c00fd6b
...
...
@@ -696,13 +696,13 @@ typedef struct SVgroupInfo {
uint32_t
hashEnd
;
int8_t
inUse
;
int8_t
numOfEps
;
SEpAddr
epAddr
[
TSDB_MAX_REPLICA
];
SEpAddr
epAddr
[
TSDB_MAX_REPLICA
];
}
SVgroupInfo
;
typedef
struct
{
int32_t
vgId
;
int8_t
numOfEps
;
SEpAddr
epAddr
[
TSDB_MAX_REPLICA
];
SEpAddr
epAddr
[
TSDB_MAX_REPLICA
];
}
SVgroupMsg
;
typedef
struct
{
...
...
include/libs/planner/planner.h
浏览文件 @
1c00fd6b
...
...
@@ -178,7 +178,7 @@ struct SQueryNode;
* @param requestId
* @return
*/
int32_t
qCreateQueryDag
(
const
struct
SQueryNode
*
pQueryInfo
,
struct
SQueryDag
**
pDag
,
uint64_t
requestId
);
int32_t
qCreateQueryDag
(
const
struct
SQueryNode
*
pNode
,
struct
SQueryDag
**
pDag
,
SSchema
**
pResSchema
,
int32_t
*
numOfCols
,
SArray
*
pNodeList
,
uint64_t
requestId
);
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule
...
...
source/client/inc/clientInt.h
浏览文件 @
1c00fd6b
...
...
@@ -136,6 +136,7 @@ typedef struct SReqResultInfo {
TAOS_ROW
row
;
char
**
pCol
;
uint32_t
numOfRows
;
uint64_t
totalRows
;
uint32_t
current
;
bool
completed
;
}
SReqResultInfo
;
...
...
source/client/src/clientImpl.c
浏览文件 @
1c00fd6b
...
...
@@ -25,9 +25,9 @@
static
int32_t
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
);
static
void
destroySendMsgInfo
(
SMsgSendInfo
*
pMsgBody
);
static
void
setQueryResult
By
Rsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
);
static
void
setQueryResult
From
Rsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
);
static
bool
stringLengthCheck
(
const
char
*
str
,
size_t
maxsize
)
{
static
bool
stringLengthCheck
(
const
char
*
str
,
size_t
maxsize
)
{
if
(
str
==
NULL
)
{
return
false
;
}
...
...
@@ -59,7 +59,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
}
static
STscObj
*
taosConnectImpl
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
,
__taos_async_fn_t
fp
,
void
*
param
,
SAppInstInfo
*
pAppInfo
);
static
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SDataBlockSchema
*
pDataBlockSchema
);
static
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SSchema
*
pSchema
,
int32_t
numOfCols
);
TAOS
*
taos_connect_internal
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
)
{
if
(
taos_init
()
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -202,43 +202,38 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQueryNode
*
pQueryNode
,
SQueryDag
**
pDag
)
{
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQueryNode
*
pQueryNode
,
SQueryDag
**
pDag
,
SArray
*
pNodeList
)
{
pRequest
->
type
=
pQueryNode
->
type
;
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
int32_t
code
=
qCreateQueryDag
(
pQueryNode
,
pDag
,
pRequest
->
requestId
);
SSchema
*
pSchema
=
NULL
;
int32_t
numOfCols
=
0
;
int32_t
code
=
qCreateQueryDag
(
pQueryNode
,
pDag
,
&
pSchema
,
&
numOfCols
,
pNodeList
,
pRequest
->
requestId
);
if
(
code
!=
0
)
{
return
code
;
}
if
(
pQueryNode
->
type
==
TSDB_SQL_SELECT
)
{
SArray
*
pa
=
taosArrayGetP
((
*
pDag
)
->
pSubplans
,
0
);
SSubplan
*
pPlan
=
taosArrayGetP
(
pa
,
0
);
SDataBlockSchema
*
pDataBlockSchema
=
&
(
pPlan
->
pDataSink
->
schema
);
setResSchemaInfo
(
pResInfo
,
pDataBlockSchema
);
setResSchemaInfo
(
&
pRequest
->
body
.
resInfo
,
pSchema
,
numOfCols
);
pRequest
->
type
=
TDMT_VND_QUERY
;
}
return
code
;
}
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
S
DataBlockSchema
*
pDataBlockSchema
)
{
assert
(
p
DataBlockSchema
!=
NULL
&&
pDataBlockSchema
->
numOfCols
>
0
);
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
S
Schema
*
pSchema
,
int32_t
numOfCols
)
{
assert
(
p
Schema
!=
NULL
&&
numOfCols
>
0
);
pResInfo
->
numOfCols
=
pDataBlockSchema
->
numOfCols
;
pResInfo
->
fields
=
calloc
(
pDataBlockSchema
->
numOfCols
,
sizeof
(
pDataBlockSchema
->
pSchema
[
0
]));
pResInfo
->
numOfCols
=
numOfCols
;
pResInfo
->
fields
=
calloc
(
numOfCols
,
sizeof
(
pSchema
[
0
]));
for
(
int32_t
i
=
0
;
i
<
pResInfo
->
numOfCols
;
++
i
)
{
SSchema
*
pSchema
=
&
pDataBlockSchema
->
pSchema
[
i
];
pResInfo
->
fields
[
i
].
bytes
=
pSchema
->
bytes
;
pResInfo
->
fields
[
i
].
type
=
pSchema
->
type
;
tstrncpy
(
pResInfo
->
fields
[
i
].
name
,
pSchema
->
name
,
tListLen
(
pResInfo
->
fields
[
i
].
name
));
pResInfo
->
fields
[
i
].
bytes
=
pSchema
[
i
].
bytes
;
pResInfo
->
fields
[
i
].
type
=
pSchema
[
i
].
type
;
tstrncpy
(
pResInfo
->
fields
[
i
].
name
,
pSchema
[
i
].
name
,
tListLen
(
pResInfo
->
fields
[
i
].
name
));
}
}
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryDag
*
pDag
)
{
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryDag
*
pDag
,
SArray
*
pNodeList
)
{
if
(
TSDB_SQL_INSERT
==
pRequest
->
type
||
TSDB_SQL_CREATE_TABLE
==
pRequest
->
type
)
{
SQueryResult
res
=
{.
code
=
0
,
.
numOfRows
=
0
,
.
msgSize
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
msg
=
pRequest
->
msgBuf
};
...
...
@@ -256,14 +251,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
return
pRequest
->
code
;
}
SArray
*
execNode
=
taosArrayInit
(
4
,
sizeof
(
SQueryNodeAddr
));
SQueryNodeAddr
addr
=
{.
numOfEps
=
1
,
.
inUse
=
0
,
.
nodeId
=
2
};
addr
.
epAddr
[
0
].
port
=
7100
;
strcpy
(
addr
.
epAddr
[
0
].
fqdn
,
"localhost"
);
taosArrayPush
(
execNode
,
&
addr
);
return
scheduleAsyncExecJob
(
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
,
execNode
,
pDag
,
&
pRequest
->
body
.
pQueryJob
);
return
scheduleAsyncExecJob
(
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
,
pNodeList
,
pDag
,
&
pRequest
->
body
.
pQueryJob
);
}
typedef
struct
tmq_t
tmq_t
;
...
...
@@ -399,7 +387,9 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
// todo check for invalid sql statement and return with error code
CHECK_CODE_GOTO
(
qCreateQueryDag
(
pQueryNode
,
&
pRequest
->
body
.
pDag
,
pRequest
->
requestId
),
_return
);
SSchema
*
schema
=
NULL
;
int32_t
numOfCols
=
0
;
CHECK_CODE_GOTO
(
qCreateQueryDag
(
pQueryNode
,
&
pRequest
->
body
.
pDag
,
&
schema
,
&
numOfCols
,
NULL
,
pRequest
->
requestId
),
_return
);
pStr
=
qDagToString
(
pRequest
->
body
.
pDag
);
if
(
pStr
==
NULL
)
{
...
...
@@ -492,6 +482,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
SRequestObj
*
pRequest
=
NULL
;
SQueryNode
*
pQueryNode
=
NULL
;
SArray
*
pNodeList
=
taosArrayInit
(
4
,
sizeof
(
struct
SQueryNodeAddr
));
terrno
=
TSDB_CODE_SUCCESS
;
CHECK_CODE_GOTO
(
buildRequest
(
pTscObj
,
sql
,
sqlLen
,
&
pRequest
),
_return
);
...
...
@@ -500,8 +491,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
if
(
qIsDdlQuery
(
pQueryNode
))
{
CHECK_CODE_GOTO
(
execDdlQuery
(
pRequest
,
pQueryNode
),
_return
);
}
else
{
CHECK_CODE_GOTO
(
getPlan
(
pRequest
,
pQueryNode
,
&
pRequest
->
body
.
pDag
),
_return
);
CHECK_CODE_GOTO
(
scheduleQuery
(
pRequest
,
pRequest
->
body
.
pDag
),
_return
);
CHECK_CODE_GOTO
(
getPlan
(
pRequest
,
pQueryNode
,
&
pRequest
->
body
.
pDag
,
pNodeList
),
_return
);
CHECK_CODE_GOTO
(
scheduleQuery
(
pRequest
,
pRequest
->
body
.
pDag
,
pNodeList
),
_return
);
pRequest
->
code
=
terrno
;
}
...
...
@@ -719,13 +710,17 @@ void* doFetchRow(SRequestObj* pRequest) {
return
NULL
;
}
int32_t
code
=
scheduleFetchRows
(
pRequest
->
body
.
pQueryJob
,
(
void
**
)
&
pRequest
->
body
.
resInfo
.
pData
);
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
int32_t
code
=
scheduleFetchRows
(
pRequest
->
body
.
pQueryJob
,
(
void
**
)
&
pResInfo
->
pData
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pRequest
->
code
=
code
;
return
NULL
;
}
setQueryResultByRsp
(
&
pRequest
->
body
.
resInfo
,
(
SRetrieveTableRsp
*
)
pRequest
->
body
.
resInfo
.
pData
);
setQueryResultFromRsp
(
&
pRequest
->
body
.
resInfo
,
(
SRetrieveTableRsp
*
)
pResInfo
->
pData
);
tscDebug
(
"0x%"
PRIx64
" fetch results, numOfRows:%d total Rows:%"
PRId64
", complete:%d, reqId:0x%"
PRIx64
,
pRequest
->
self
,
pResInfo
->
numOfRows
,
pResInfo
->
totalRows
,
pResInfo
->
completed
,
pRequest
->
requestId
);
if
(
pResultInfo
->
numOfRows
==
0
)
{
return
NULL
;
}
...
...
@@ -855,7 +850,7 @@ void setConnectionDB(STscObj* pTscObj, const char* db) {
pthread_mutex_unlock
(
&
pTscObj
->
mutex
);
}
void
setQueryResult
By
Rsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
)
{
void
setQueryResult
From
Rsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
)
{
assert
(
pResultInfo
!=
NULL
&&
pRsp
!=
NULL
);
pResultInfo
->
pRspMsg
=
(
const
char
*
)
pRsp
;
...
...
@@ -864,5 +859,6 @@ void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* p
pResultInfo
->
current
=
0
;
pResultInfo
->
completed
=
(
pRsp
->
completed
==
1
);
pResultInfo
->
totalRows
+=
pResultInfo
->
numOfRows
;
setResultDataPtr
(
pResultInfo
,
pResultInfo
->
fields
,
pResultInfo
->
numOfCols
,
pResultInfo
->
numOfRows
);
}
source/libs/executor/src/executorimpl.c
浏览文件 @
1c00fd6b
...
...
@@ -5408,7 +5408,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt
return
pOperator
;
}
SOperatorInfo
*
createStreamScanOperatorInfo
(
void
*
streamReadHandle
,
SArray
*
pExprInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createStreamScanOperatorInfo
(
void
*
streamReadHandle
,
SArray
*
pExprInfo
,
uint64_t
uid
,
SExecTaskInfo
*
pTaskInfo
)
{
SStreamBlockScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SStreamBlockScanInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -5428,6 +5428,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp
// set the extract column id to streamHandle
tqReadHandleSetColIdList
((
STqReadHandle
*
)
streamReadHandle
,
pColList
);
tqReadHandleSetTbUid
(
streamReadHandle
,
uid
);
pInfo
->
readerHandle
=
streamReadHandle
;
...
...
@@ -7719,7 +7720,8 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
return
createExchangeOperatorInfo
(
pEx
->
pSrcEndPoints
,
pEx
->
node
.
pTargets
,
pTaskInfo
);
}
else
if
(
pPhyNode
->
info
.
type
==
OP_StreamScan
)
{
size_t
numOfCols
=
taosArrayGetSize
(
pPhyNode
->
pTargets
);
return
createStreamScanOperatorInfo
(
readerHandle
,
pPhyNode
->
pTargets
,
pTaskInfo
);
SScanPhyNode
*
pScanPhyNode
=
(
SScanPhyNode
*
)
pPhyNode
;
// simple child table.
return
createStreamScanOperatorInfo
(
readerHandle
,
pPhyNode
->
pTargets
,
pScanPhyNode
->
uid
,
pTaskInfo
);
}
}
...
...
source/libs/planner/inc/plannerInt.h
浏览文件 @
1c00fd6b
...
...
@@ -106,7 +106,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
*/
int32_t
queryPlanToSql
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
sql
);
int32_t
createDag
(
SQueryPlanNode
*
pQueryNode
,
struct
SCatalog
*
pCatalog
,
SQueryDag
**
pDag
,
uint64_t
requestId
);
int32_t
createDag
(
SQueryPlanNode
*
pQueryNode
,
struct
SCatalog
*
pCatalog
,
SQueryDag
**
pDag
,
SArray
*
pNodeList
,
uint64_t
requestId
);
void
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SDownstreamSource
*
pSource
);
int32_t
subPlanToString
(
const
SSubplan
*
pPhyNode
,
char
**
str
,
int32_t
*
len
);
int32_t
stringToSubplan
(
const
char
*
str
,
SSubplan
**
subplan
);
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
1c00fd6b
...
...
@@ -261,14 +261,14 @@ static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) {
return
;
}
static
uint64_t
splitSubplanByTable
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
)
{
SVgroupsInfo
*
pVgroupList
=
pTable
->
pMeta
->
vgroupList
;
static
uint64_t
splitSubplanByTable
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
Info
)
{
SVgroupsInfo
*
pVgroupList
=
pTable
Info
->
pMeta
->
vgroupList
;
for
(
int32_t
i
=
0
;
i
<
pVgroupList
->
numOfVgroups
;
++
i
)
{
STORE_CURRENT_SUBPLAN
(
pCxt
);
SSubplan
*
subplan
=
initSubplan
(
pCxt
,
QUERY_TYPE_SCAN
);
subplan
->
msgType
=
TDMT_VND_QUERY
;
vgroupMsgToEpSet
(
&
(
pTable
->
pMeta
->
vgroupList
->
vgroups
[
i
]),
&
subplan
->
execNode
);
subplan
->
pNode
=
createMultiTableScanNode
(
pPlanNode
,
pTable
);
vgroupMsgToEpSet
(
&
(
pTable
Info
->
pMeta
->
vgroupList
->
vgroups
[
i
]),
&
subplan
->
execNode
);
subplan
->
pNode
=
createMultiTableScanNode
(
pPlanNode
,
pTable
Info
);
subplan
->
pDataSink
=
createDataDispatcher
(
pCxt
,
pPlanNode
,
subplan
->
pNode
);
RECOVERY_CURRENT_SUBPLAN
(
pCxt
);
}
...
...
@@ -384,18 +384,19 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
subplan
->
msgType
=
TDMT_VND_QUERY
;
subplan
->
pNode
=
createPhyNode
(
pCxt
,
pRoot
);
subplan
->
pDataSink
=
createDataDispatcher
(
pCxt
,
pRoot
,
subplan
->
pNode
);
subplan
->
pDataSink
=
createDataDispatcher
(
pCxt
,
pRoot
,
subplan
->
pNode
);
}
// todo deal subquery
}
int32_t
createDag
(
SQueryPlanNode
*
pQueryNode
,
struct
SCatalog
*
pCatalog
,
SQueryDag
**
pDag
,
uint64_t
requestId
)
{
int32_t
createDag
(
SQueryPlanNode
*
pQueryNode
,
struct
SCatalog
*
pCatalog
,
SQueryDag
**
pDag
,
SArray
*
pNodeList
,
uint64_t
requestId
)
{
TRY
(
TSDB_MAX_TAG_CONDITIONS
)
{
SPlanContext
context
=
{
.
pCatalog
=
pCatalog
,
.
pDag
=
validPointer
(
calloc
(
1
,
sizeof
(
SQueryDag
))),
.
pCurrentSubplan
=
NULL
,
.
nextId
=
{.
queryId
=
requestId
},
//The unsigned Id starting from 1 would be better
.
nextId
=
{.
queryId
=
requestId
,
.
subplanId
=
1
,
.
templateId
=
1
},
};
*
pDag
=
context
.
pDag
;
...
...
@@ -408,6 +409,17 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
terrno
=
code
;
return
TSDB_CODE_FAILED
;
}
END_TRY
// traverse the dag again to acquire the execution node.
if
(
pNodeList
!=
NULL
)
{
SArray
**
pSubLevel
=
taosArrayGetLast
((
*
pDag
)
->
pSubplans
);
size_t
num
=
taosArrayGetSize
(
*
pSubLevel
);
for
(
int32_t
j
=
0
;
j
<
num
;
++
j
)
{
SSubplan
*
pPlan
=
taosArrayGetP
(
*
pSubLevel
,
j
);
taosArrayPush
(
pNodeList
,
&
pPlan
->
execNode
);
}
}
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/planner/src/planner.c
浏览文件 @
1c00fd6b
...
...
@@ -16,6 +16,8 @@
#include "parser.h"
#include "plannerInt.h"
static
void
extractResSchema
(
struct
SQueryDag
*
const
*
pDag
,
SSchema
**
pResSchema
,
int32_t
*
numOfCols
);
static
void
destroyDataSinkNode
(
SDataSink
*
pSinkNode
)
{
if
(
pSinkNode
==
NULL
)
{
return
;
...
...
@@ -56,7 +58,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) {
tfree
(
pDag
);
}
int32_t
qCreateQueryDag
(
const
struct
SQueryNode
*
pNode
,
struct
SQueryDag
**
pDag
,
uint64_t
requestId
)
{
int32_t
qCreateQueryDag
(
const
struct
SQueryNode
*
pNode
,
struct
SQueryDag
**
pDag
,
SSchema
**
pResSchema
,
int32_t
*
numOfCols
,
SArray
*
pNodeList
,
uint64_t
requestId
)
{
SQueryPlanNode
*
pLogicPlan
;
int32_t
code
=
createQueryPlan
(
pNode
,
&
pLogicPlan
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
...
...
@@ -76,17 +78,31 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
return
code
;
}
code
=
createDag
(
pLogicPlan
,
NULL
,
pDag
,
requestId
);
code
=
createDag
(
pLogicPlan
,
NULL
,
pDag
,
pNodeList
,
requestId
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
destroyQueryPlan
(
pLogicPlan
);
qDestroyQueryDag
(
*
pDag
);
return
code
;
}
extractResSchema
(
pDag
,
pResSchema
,
numOfCols
);
destroyQueryPlan
(
pLogicPlan
);
return
TSDB_CODE_SUCCESS
;
}
void
extractResSchema
(
struct
SQueryDag
*
const
*
pDag
,
SSchema
**
pResSchema
,
int32_t
*
numOfCols
)
{
// extract the final result schema
SArray
*
pTopSubplan
=
taosArrayGetP
((
*
pDag
)
->
pSubplans
,
0
);
SSubplan
*
pPlan
=
taosArrayGetP
(
pTopSubplan
,
0
);
SDataBlockSchema
*
pDataBlockSchema
=
&
(
pPlan
->
pDataSink
->
schema
);
*
numOfCols
=
pDataBlockSchema
->
numOfCols
;
*
pResSchema
=
calloc
(
pDataBlockSchema
->
numOfCols
,
sizeof
(
SSchema
));
memcpy
((
*
pResSchema
),
pDataBlockSchema
->
pSchema
,
pDataBlockSchema
->
numOfCols
*
sizeof
(
SSchema
));
}
void
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SDownstreamSource
*
pSource
)
{
setSubplanExecutionNode
(
subplan
,
templateId
,
pSource
);
}
...
...
source/libs/planner/test/phyPlanTests.cpp
浏览文件 @
1c00fd6b
...
...
@@ -61,7 +61,7 @@ protected:
int32_t
run
()
{
SQueryDag
*
dag
=
nullptr
;
uint64_t
requestId
=
20
;
int32_t
code
=
createDag
(
logicPlan_
.
get
(),
nullptr
,
&
dag
,
requestId
);
int32_t
code
=
createDag
(
logicPlan_
.
get
(),
nullptr
,
&
dag
,
NULL
,
requestId
);
dag_
.
reset
(
dag
);
return
code
;
}
...
...
@@ -78,9 +78,9 @@ protected:
SQueryDag
*
dag
=
nullptr
;
uint64_t
requestId
=
20
;
SSchema
*
schema
=
NULL
;
u
int32_t
numOfOutput
=
0
;
int32_t
numOfOutput
=
0
;
code
=
qCreateQueryDag
(
query
,
&
dag
,
requestId
);
code
=
qCreateQueryDag
(
query
,
&
dag
,
&
schema
,
&
numOfOutput
,
nullptr
,
requestId
);
dag_
.
reset
(
dag
);
return
code
;
}
...
...
source/libs/qworker/inc/qworkerInt.h
浏览文件 @
1c00fd6b
...
...
@@ -171,17 +171,17 @@ typedef struct SQWorkerMgmt {
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:
0x
%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
1c00fd6b
...
...
@@ -146,15 +146,15 @@ typedef struct SSchJob {
#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY)
#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob)
#define SCH_JOB_ELOG(param, ...) qError("QID:%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_ELOG(param, ...) qError("QID:
0x
%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:
0x
%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) \
qError("QID:%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
qError("QID:
0x
%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) \
qDebug("QID:%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
qDebug("QID:
0x
%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) \
qWarn("QID:%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
qWarn("QID:
0x
%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
1c00fd6b
...
...
@@ -650,15 +650,11 @@ _return:
SCH_RET
(
code
);
}
int32_t
schProcessOnDataFetched
(
SSchJob
*
job
)
{
atomic_val_compare_exchange_32
(
&
job
->
remoteFetch
,
1
,
0
);
tsem_post
(
&
job
->
rspSem
);
}
// Note: no more error processing, handled in function internal
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
)
{
bool
needRetry
=
false
;
...
...
@@ -882,7 +878,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
SCH_ERR_JRET
(
schProcessOnDataFetched
(
pJob
));
break
;
}
case
TDMT_VND_DROP_TASK
:
{
...
...
@@ -892,7 +887,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
default:
SCH_TASK_ELOG
(
"unknown rsp msg, type:%d, status:%d"
,
msgType
,
SCH_GET_TASK_STATUS
(
pTask
));
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
...
...
@@ -935,8 +929,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
}
pTask
=
*
task
;
SCH_TASK_DLOG
(
"rsp msg received, type:%d, code:%x"
,
msgType
,
rspCode
);
SCH_TASK_DLOG
(
"rsp msg received, type:%s, code:%s"
,
TMSG_INFO
(
msgType
),
tstrerror
(
rspCode
));
SCH_ERR_JRET
(
schHandleResponseMsg
(
pJob
,
pTask
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
));
...
...
@@ -1562,8 +1555,8 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
if
(
NULL
==
pJob
||
NULL
==
pData
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
code
=
0
;
int32_t
code
=
0
;
atomic_add_fetch_32
(
&
pJob
->
ref
,
1
);
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
...
...
@@ -1609,7 +1602,6 @@ _return:
while
(
true
)
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
res
);
if
(
*
pData
!=
atomic_val_compare_exchange_ptr
(
&
pJob
->
res
,
*
pData
,
NULL
))
{
continue
;
}
...
...
@@ -1628,8 +1620,7 @@ _return:
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
1
,
0
);
SCH_JOB_DLOG
(
"fetch done, code:%x"
,
code
);
SCH_JOB_DLOG
(
"fetch done, code:%s"
,
tstrerror
(
code
));
atomic_sub_fetch_32
(
&
pJob
->
ref
,
1
);
SCH_RET
(
code
);
...
...
source/libs/transport/src/transCli.c
浏览文件 @
1c00fd6b
...
...
@@ -17,6 +17,8 @@
#include "transComm.h"
#define CONN_PERSIST_TIME(para) (para * 1000 * 100)
typedef
struct
SCliConn
{
uv_connect_t
connReq
;
uv_stream_t
*
stream
;
...
...
@@ -42,7 +44,7 @@ typedef struct SCliThrdObj {
uv_loop_t
*
loop
;
uv_async_t
*
cliAsync
;
//
uv_timer_t
*
pTimer
;
void
*
cache
;
// conn pool
void
*
pool
;
// conn pool
queue
msg
;
pthread_mutex_t
msgMtx
;
uint64_t
nextTimeout
;
// next timeout
...
...
@@ -63,10 +65,10 @@ typedef struct SConnList {
// conn pool
// add expire timeout and capacity limit
static
void
*
conn
Cache
Create
(
int
size
);
static
void
*
conn
CacheDestroy
(
void
*
cache
);
static
SCliConn
*
getConnFrom
Cache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
);
static
void
addConnTo
Cache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
,
SCliConn
*
conn
);
static
void
*
conn
Pool
Create
(
int
size
);
static
void
*
conn
PoolDestroy
(
void
*
pool
);
static
SCliConn
*
getConnFrom
Pool
(
void
*
pool
,
char
*
ip
,
uint32_t
port
);
static
void
addConnTo
Pool
(
void
*
pool
,
char
*
ip
,
uint32_t
port
,
SCliConn
*
conn
);
// register timer in each thread to clear expire conn
static
void
clientTimeoutCb
(
uv_timer_t
*
handle
);
...
...
@@ -88,8 +90,14 @@ static void clientConnDestroy(SCliConn* pConn);
static
void
clientMsgDestroy
(
SCliMsg
*
pMsg
);
// thread obj
static
SCliThrdObj
*
createThrdObj
();
static
void
destroyThrdObj
(
SCliThrdObj
*
pThrd
);
// thread
static
void
*
clientThread
(
void
*
arg
);
static
void
clientHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
);
static
void
clientProcessData
(
SCliConn
*
conn
)
{
STransConnCtx
*
pCtx
=
((
SCliMsg
*
)
conn
->
data
)
->
ctx
;
SRpcInfo
*
pRpc
=
pCtx
->
pRpc
;
...
...
@@ -101,19 +109,22 @@ static void clientProcessData(SCliConn* conn) {
(
pRpc
->
cfp
)(
NULL
,
&
rpcMsg
,
NULL
);
SCliThrdObj
*
pThrd
=
conn
->
hostThrd
;
addConnToCache
(
pThrd
->
cache
,
pCtx
->
ip
,
pCtx
->
port
,
conn
);
addConnToPool
(
pThrd
->
pool
,
pCtx
->
ip
,
pCtx
->
port
,
conn
);
if
(
!
uv_is_active
((
uv_handle_t
*
)
pThrd
->
pTimer
)
&&
pRpc
->
idleTime
>
0
)
{
uv_timer_start
((
uv_timer_t
*
)
pThrd
->
pTimer
,
clientTimeoutCb
,
CONN_PERSIST_TIME
(
pRpc
->
idleTime
)
/
2
,
0
);
}
free
(
pCtx
->
ip
);
free
(
pCtx
);
// impl
}
static
void
clientHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
);
static
void
clientTimeoutCb
(
uv_timer_t
*
handle
)
{
SCliThrdObj
*
pThrd
=
handle
->
data
;
SRpcInfo
*
pRpc
=
pThrd
->
pTransInst
;
int64_t
currentTime
=
pThrd
->
nextTimeout
;
tDebug
(
"timeout, try to remove expire conn from conn pool"
);
SConnList
*
p
=
taosHashIterate
((
SHashObj
*
)
pThrd
->
cache
,
NULL
);
SConnList
*
p
=
taosHashIterate
((
SHashObj
*
)
pThrd
->
pool
,
NULL
);
while
(
p
!=
NULL
)
{
while
(
!
QUEUE_IS_EMPTY
(
&
p
->
conn
))
{
queue
*
h
=
QUEUE_HEAD
(
&
p
->
conn
);
...
...
@@ -125,18 +136,18 @@ static void clientTimeoutCb(uv_timer_t* handle) {
break
;
}
}
p
=
taosHashIterate
((
SHashObj
*
)
pThrd
->
cache
,
p
);
p
=
taosHashIterate
((
SHashObj
*
)
pThrd
->
pool
,
p
);
}
pThrd
->
nextTimeout
=
taosGetTimestampMs
()
+
pRpc
->
idleTime
*
1000
*
10
;
uv_timer_start
(
handle
,
clientTimeoutCb
,
pRpc
->
idleTime
*
10
,
0
);
pThrd
->
nextTimeout
=
taosGetTimestampMs
()
+
CONN_PERSIST_TIME
(
pRpc
->
idleTime
)
;
uv_timer_start
(
handle
,
clientTimeoutCb
,
CONN_PERSIST_TIME
(
pRpc
->
idleTime
)
/
2
,
0
);
}
static
void
*
conn
Cache
Create
(
int
size
)
{
SHashObj
*
cache
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
return
cache
;
static
void
*
conn
Pool
Create
(
int
size
)
{
SHashObj
*
pool
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
return
pool
;
}
static
void
*
conn
CacheDestroy
(
void
*
cache
)
{
SConnList
*
connList
=
taosHashIterate
((
SHashObj
*
)
cache
,
NULL
);
static
void
*
conn
PoolDestroy
(
void
*
pool
)
{
SConnList
*
connList
=
taosHashIterate
((
SHashObj
*
)
pool
,
NULL
);
while
(
connList
!=
NULL
)
{
while
(
!
QUEUE_IS_EMPTY
(
&
connList
->
conn
))
{
queue
*
h
=
QUEUE_HEAD
(
&
connList
->
conn
);
...
...
@@ -144,23 +155,22 @@ static void* connCacheDestroy(void* cache) {
SCliConn
*
c
=
QUEUE_DATA
(
h
,
SCliConn
,
conn
);
clientConnDestroy
(
c
);
}
connList
=
taosHashIterate
((
SHashObj
*
)
cache
,
connList
);
connList
=
taosHashIterate
((
SHashObj
*
)
pool
,
connList
);
}
taosHashClear
(
cache
);
taosHashClear
(
pool
);
}
static
SCliConn
*
getConnFrom
Cache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
)
{
static
SCliConn
*
getConnFrom
Pool
(
void
*
pool
,
char
*
ip
,
uint32_t
port
)
{
char
key
[
128
]
=
{
0
};
tstrncpy
(
key
,
ip
,
strlen
(
ip
));
tstrncpy
(
key
+
strlen
(
key
),
(
char
*
)(
&
port
),
sizeof
(
port
));
SHashObj
*
p
Cache
=
cache
;
SConnList
*
plist
=
taosHashGet
(
p
Cache
,
key
,
strlen
(
key
));
SHashObj
*
p
Pool
=
pool
;
SConnList
*
plist
=
taosHashGet
(
p
Pool
,
key
,
strlen
(
key
));
if
(
plist
==
NULL
)
{
SConnList
list
;
plist
=
&
list
;
taosHashPut
(
pCache
,
key
,
strlen
(
key
),
plist
,
sizeof
(
*
plist
));
plist
=
taosHashGet
(
pCache
,
key
,
strlen
(
key
));
taosHashPut
(
pPool
,
key
,
strlen
(
key
),
(
void
*
)
&
list
,
sizeof
(
list
));
plist
=
taosHashGet
(
pPool
,
key
,
strlen
(
key
));
QUEUE_INIT
(
&
plist
->
conn
);
}
...
...
@@ -171,14 +181,14 @@ static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
QUEUE_REMOVE
(
h
);
return
QUEUE_DATA
(
h
,
SCliConn
,
conn
);
}
static
void
addConnTo
Cache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
,
SCliConn
*
conn
)
{
static
void
addConnTo
Pool
(
void
*
pool
,
char
*
ip
,
uint32_t
port
,
SCliConn
*
conn
)
{
char
key
[
128
]
=
{
0
};
tstrncpy
(
key
,
ip
,
strlen
(
ip
));
tstrncpy
(
key
+
strlen
(
key
),
(
char
*
)(
&
port
),
sizeof
(
port
));
SRpcInfo
*
pRpc
=
((
SCliThrdObj
*
)
conn
->
hostThrd
)
->
pTransInst
;
conn
->
expireTime
=
taosGetTimestampMs
()
+
pRpc
->
idleTime
*
1000
*
10
;
SConnList
*
plist
=
taosHashGet
((
SHashObj
*
)
cache
,
key
,
strlen
(
key
));
conn
->
expireTime
=
taosGetTimestampMs
()
+
CONN_PERSIST_TIME
(
pRpc
->
idleTime
)
;
SConnList
*
plist
=
taosHashGet
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
));
// list already create before
assert
(
plist
!=
NULL
);
QUEUE_PUSH
(
&
plist
->
conn
,
&
conn
->
conn
);
...
...
@@ -322,7 +332,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
et
=
taosGetTimestampUs
();
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
SCliConn
*
conn
=
getConnFrom
Cache
(
pThrd
->
cache
,
pCtx
->
ip
,
pCtx
->
port
);
SCliConn
*
conn
=
getConnFrom
Pool
(
pThrd
->
pool
,
pCtx
->
ip
,
pCtx
->
port
);
if
(
conn
!=
NULL
)
{
// impl later
conn
->
data
=
pMsg
;
...
...
@@ -373,15 +383,14 @@ static void clientAsyncCb(uv_async_t* handle) {
SCliMsg
*
pMsg
=
QUEUE_DATA
(
h
,
SCliMsg
,
q
);
clientHandleReq
(
pMsg
,
pThrd
);
count
++
;
if
(
count
>=
2
)
{
tError
(
"send batch size: %d"
,
count
);
}
}
if
(
count
>=
2
)
{
tDebug
(
"already process batch size: %d"
,
count
);
}
}
static
void
*
clientThread
(
void
*
arg
)
{
SCliThrdObj
*
pThrd
=
(
SCliThrdObj
*
)
arg
;
uv_run
(
pThrd
->
loop
,
UV_RUN_DEFAULT
);
}
...
...
@@ -394,24 +403,8 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
cli
->
pThreadObj
=
(
SCliThrdObj
**
)
calloc
(
cli
->
numOfThreads
,
sizeof
(
SCliThrdObj
*
));
for
(
int
i
=
0
;
i
<
cli
->
numOfThreads
;
i
++
)
{
SCliThrdObj
*
pThrd
=
(
SCliThrdObj
*
)
calloc
(
1
,
sizeof
(
SCliThrdObj
));
QUEUE_INIT
(
&
pThrd
->
msg
);
pthread_mutex_init
(
&
pThrd
->
msgMtx
,
NULL
);
pThrd
->
loop
=
(
uv_loop_t
*
)
malloc
(
sizeof
(
uv_loop_t
));
uv_loop_init
(
pThrd
->
loop
);
pThrd
->
cliAsync
=
malloc
(
sizeof
(
uv_async_t
));
uv_async_init
(
pThrd
->
loop
,
pThrd
->
cliAsync
,
clientAsyncCb
);
pThrd
->
cliAsync
->
data
=
pThrd
;
pThrd
->
pTimer
=
malloc
(
sizeof
(
uv_timer_t
));
uv_timer_init
(
pThrd
->
loop
,
pThrd
->
pTimer
);
pThrd
->
pTimer
->
data
=
pThrd
;
pThrd
->
nextTimeout
=
taosGetTimestampMs
()
+
pRpc
->
idleTime
*
1000
*
10
;
pThrd
->
cache
=
connCacheCreate
(
1
);
SCliThrdObj
*
pThrd
=
createThrdObj
();
pThrd
->
nextTimeout
=
taosGetTimestampMs
()
+
CONN_PERSIST_TIME
(
pRpc
->
idleTime
);
pThrd
->
pTransInst
=
shandle
;
int
err
=
pthread_create
(
&
pThrd
->
thread
,
NULL
,
clientThread
,
(
void
*
)(
pThrd
));
...
...
@@ -426,16 +419,42 @@ static void clientMsgDestroy(SCliMsg* pMsg) {
// impl later
free
(
pMsg
);
}
static
SCliThrdObj
*
createThrdObj
()
{
SCliThrdObj
*
pThrd
=
(
SCliThrdObj
*
)
calloc
(
1
,
sizeof
(
SCliThrdObj
));
QUEUE_INIT
(
&
pThrd
->
msg
);
pthread_mutex_init
(
&
pThrd
->
msgMtx
,
NULL
);
pThrd
->
loop
=
(
uv_loop_t
*
)
malloc
(
sizeof
(
uv_loop_t
));
uv_loop_init
(
pThrd
->
loop
);
pThrd
->
cliAsync
=
malloc
(
sizeof
(
uv_async_t
));
uv_async_init
(
pThrd
->
loop
,
pThrd
->
cliAsync
,
clientAsyncCb
);
pThrd
->
cliAsync
->
data
=
pThrd
;
pThrd
->
pTimer
=
malloc
(
sizeof
(
uv_timer_t
));
uv_timer_init
(
pThrd
->
loop
,
pThrd
->
pTimer
);
pThrd
->
pTimer
->
data
=
pThrd
;
pThrd
->
pool
=
connPoolCreate
(
1
);
return
pThrd
;
}
static
void
destroyThrdObj
(
SCliThrdObj
*
pThrd
)
{
if
(
pThrd
==
NULL
)
{
return
;
}
pthread_join
(
pThrd
->
thread
,
NULL
);
pthread_mutex_destroy
(
&
pThrd
->
msgMtx
);
free
(
pThrd
->
cliAsync
);
free
(
pThrd
->
loop
);
free
(
pThrd
);
}
//
void
taosCloseClient
(
void
*
arg
)
{
// impl later
SClientObj
*
cli
=
arg
;
for
(
int
i
=
0
;
i
<
cli
->
numOfThreads
;
i
++
)
{
SCliThrdObj
*
pThrd
=
cli
->
pThreadObj
[
i
];
pthread_join
(
pThrd
->
thread
,
NULL
);
pthread_mutex_destroy
(
&
pThrd
->
msgMtx
);
free
(
pThrd
->
cliAsync
);
free
(
pThrd
->
loop
);
free
(
pThrd
);
destroyThrdObj
(
cli
->
pThreadObj
[
i
]);
}
free
(
cli
->
pThreadObj
);
free
(
cli
);
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
1c00fd6b
...
...
@@ -91,10 +91,14 @@ static SConn* connCreate();
static
void
connDestroy
(
SConn
*
conn
);
static
void
uvConnDestroy
(
uv_handle_t
*
handle
);
// server
worke
thread
// server
and worker
thread
static
void
*
workerThread
(
void
*
arg
);
static
void
*
acceptThread
(
void
*
arg
);
// add handle loop
static
bool
addHandleToWorkloop
(
void
*
arg
);
static
bool
addHandleToAcceptloop
(
void
*
arg
);
void
uvAllocReadBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
/*
* formate of data buffer:
...
...
@@ -268,7 +272,7 @@ static void uvProcessData(SConn* pConn) {
rpcMsg
.
handle
=
pConn
;
(
*
(
pRpc
->
cfp
))(
pRpc
->
parent
,
&
rpcMsg
,
NULL
);
uv_timer_start
(
pConn
->
pTimer
,
uvHandleActivityTimeout
,
pRpc
->
idleTime
*
10000
,
0
);
//
uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth
// validate msg type
}
...
...
@@ -451,22 +455,14 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
void
*
acceptThread
(
void
*
arg
)
{
// opt
SServerObj
*
srv
=
(
SServerObj
*
)
arg
;
uv_tcp_init
(
srv
->
loop
,
&
srv
->
server
);
struct
sockaddr_in
bind_addr
;
uv_ip4_addr
(
"0.0.0.0"
,
srv
->
port
,
&
bind_addr
);
uv_tcp_bind
(
&
srv
->
server
,
(
const
struct
sockaddr
*
)
&
bind_addr
,
0
);
int
err
=
0
;
if
((
err
=
uv_listen
((
uv_stream_t
*
)
&
srv
->
server
,
128
,
uvOnAcceptCb
))
!=
0
)
{
tError
(
"Listen error %s
\n
"
,
uv_err_name
(
err
));
return
NULL
;
}
uv_run
(
srv
->
loop
,
UV_RUN_DEFAULT
);
}
static
void
initWorkThrdObj
(
SWorkThrdObj
*
pThrd
)
{
static
bool
addHandleToWorkloop
(
void
*
arg
)
{
SWorkThrdObj
*
pThrd
=
arg
;
pThrd
->
loop
=
(
uv_loop_t
*
)
malloc
(
sizeof
(
uv_loop_t
));
uv_loop_init
(
pThrd
->
loop
);
if
(
0
!=
uv_loop_init
(
pThrd
->
loop
))
{
return
false
;
}
// SRpcInfo* pRpc = pThrd->shandle;
uv_pipe_init
(
pThrd
->
loop
,
pThrd
->
pipe
,
1
);
...
...
@@ -482,6 +478,31 @@ static void initWorkThrdObj(SWorkThrdObj* pThrd) {
pThrd
->
workerAsync
->
data
=
pThrd
;
uv_read_start
((
uv_stream_t
*
)
pThrd
->
pipe
,
uvAllocConnBufferCb
,
uvOnConnectionCb
);
return
true
;
}
static
bool
addHandleToAcceptloop
(
void
*
arg
)
{
// impl later
SServerObj
*
srv
=
arg
;
int
err
=
0
;
if
((
err
=
uv_tcp_init
(
srv
->
loop
,
&
srv
->
server
))
!=
0
)
{
tError
(
"failed to init accept server: %s"
,
uv_err_name
(
err
));
return
false
;
}
struct
sockaddr_in
bind_addr
;
uv_ip4_addr
(
"0.0.0.0"
,
srv
->
port
,
&
bind_addr
);
if
((
err
=
uv_tcp_bind
(
&
srv
->
server
,
(
const
struct
sockaddr
*
)
&
bind_addr
,
0
))
!=
0
)
{
tError
(
"failed to bind: %s"
,
uv_err_name
(
err
));
return
false
;
}
if
((
err
=
uv_listen
((
uv_stream_t
*
)
&
srv
->
server
,
128
,
uvOnAcceptCb
))
!=
0
)
{
tError
(
"failed to listen: %s"
,
uv_err_name
(
err
));
return
false
;
}
return
true
;
}
void
*
workerThread
(
void
*
arg
)
{
SWorkThrdObj
*
pThrd
=
(
SWorkThrdObj
*
)
arg
;
...
...
@@ -546,11 +567,12 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
for
(
int
i
=
0
;
i
<
srv
->
numOfThreads
;
i
++
)
{
SWorkThrdObj
*
thrd
=
(
SWorkThrdObj
*
)
calloc
(
1
,
sizeof
(
SWorkThrdObj
));
srv
->
pThreadObj
[
i
]
=
thrd
;
srv
->
pipe
[
i
]
=
(
uv_pipe_t
*
)
calloc
(
2
,
sizeof
(
uv_pipe_t
));
int
fds
[
2
];
if
(
uv_socketpair
(
AF_UNIX
,
SOCK_STREAM
,
fds
,
UV_NONBLOCK_PIPE
,
UV_NONBLOCK_PIPE
)
!=
0
)
{
return
NULL
;
goto
End
;
}
uv_pipe_init
(
srv
->
loop
,
&
(
srv
->
pipe
[
i
][
0
]),
1
);
uv_pipe_open
(
&
(
srv
->
pipe
[
i
][
0
]),
fds
[
1
]);
// init write
...
...
@@ -559,7 +581,9 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
thrd
->
fd
=
fds
[
0
];
thrd
->
pipe
=
&
(
srv
->
pipe
[
i
][
1
]);
// init read
initWorkThrdObj
(
thrd
);
if
(
false
==
addHandleToWorkloop
(
thrd
))
{
goto
End
;
}
int
err
=
pthread_create
(
&
(
thrd
->
thread
),
NULL
,
workerThread
,
(
void
*
)(
thrd
));
if
(
err
==
0
)
{
tDebug
(
"sucess to create worker-thread %d"
,
i
);
...
...
@@ -568,9 +592,10 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
// TODO: clear all other resource later
tError
(
"failed to create worker-thread %d"
,
i
);
}
srv
->
pThreadObj
[
i
]
=
thrd
;
}
if
(
false
==
addHandleToAcceptloop
(
srv
))
{
goto
End
;
}
int
err
=
pthread_create
(
&
srv
->
thread
,
NULL
,
acceptThread
,
(
void
*
)
srv
);
if
(
err
==
0
)
{
tDebug
(
"success to create accept-thread"
);
...
...
@@ -579,16 +604,25 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
}
return
srv
;
End:
taosCloseServer
(
srv
);
return
NULL
;
}
void
destroyWorkThrd
(
SWorkThrdObj
*
pThrd
)
{
if
(
pThrd
==
NULL
)
{
return
;
}
pthread_join
(
pThrd
->
thread
,
NULL
);
// free(srv->pipe[i]);
free
(
pThrd
->
loop
);
free
(
pThrd
);
}
void
taosCloseServer
(
void
*
arg
)
{
// impl later
SServerObj
*
srv
=
arg
;
for
(
int
i
=
0
;
i
<
srv
->
numOfThreads
;
i
++
)
{
SWorkThrdObj
*
pThrd
=
srv
->
pThreadObj
[
i
];
pthread_join
(
pThrd
->
thread
,
NULL
);
free
(
srv
->
pipe
[
i
]);
free
(
pThrd
->
loop
);
free
(
pThrd
);
destroyWorkThrd
(
srv
->
pThreadObj
[
i
]);
}
free
(
srv
->
loop
);
free
(
srv
->
pipe
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录