Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1832a509
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1832a509
编写于
1月 19, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
[td-11818] select *
上级
6d9ae1af
8b808fbc
变更
25
展开全部
隐藏空白更改
内联
并排
Showing
25 changed file
with
1353 addition
and
562 deletion
+1353
-562
include/libs/planner/planner.h
include/libs/planner/planner.h
+9
-3
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+8
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+2
-2
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+7
-4
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+30
-29
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+129
-32
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+1
-1
source/libs/planner/src/logicPlan.c
source/libs/planner/src/logicPlan.c
+1
-1
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+7
-6
source/libs/planner/src/physicalPlanJson.c
source/libs/planner/src/physicalPlanJson.c
+61
-18
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+2
-2
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+145
-37
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+253
-11
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+120
-0
source/libs/transport/inc/transportInt.h
source/libs/transport/inc/transportInt.h
+48
-49
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+75
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+198
-0
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+117
-0
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+119
-352
source/libs/transport/test/rclient.c
source/libs/transport/test/rclient.c
+4
-2
source/libs/transport/test/transportTests.cc
source/libs/transport/test/transportTests.cc
+2
-1
source/util/src/terror.c
source/util/src/terror.c
+1
-0
tests/script/sh/massiveTable/deployCluster.sh
tests/script/sh/massiveTable/deployCluster.sh
+9
-9
tests/script/sh/massiveTable/setupDnodes.sh
tests/script/sh/massiveTable/setupDnodes.sh
+4
-3
未找到文件。
include/libs/planner/planner.h
浏览文件 @
1832a509
...
...
@@ -114,10 +114,16 @@ typedef struct SProjectPhyNode {
SPhyNode
node
;
}
SProjectPhyNode
;
typedef
struct
SDownstreamSource
{
SQueryNodeAddr
addr
;
uint64_t
taskId
;
uint64_t
schedId
;
}
SDownstreamSource
;
typedef
struct
SExchangePhyNode
{
SPhyNode
node
;
uint64_t
srcTemplateId
;
// template id of datasource suplans
SArray
*
pSrcEndPoints
;
// S
EpAddr
, scheduler fill by calling qSetSuplanExecutionNode
uint64_t
srcTemplateId
;
// template id of datasource suplans
SArray
*
pSrcEndPoints
;
// S
Array<SDownstreamSource>
, scheduler fill by calling qSetSuplanExecutionNode
}
SExchangePhyNode
;
typedef
enum
EAggAlgo
{
...
...
@@ -178,7 +184,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag**
// @subplan subplan to be schedule
// @templateId templateId of a group of datasource subplans of this @subplan
// @ep one execution location of this group of datasource subplans
void
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
QueryNodeAddr
*
ep
);
void
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
DownstreamSource
*
pSource
);
int32_t
qExplainQuery
(
const
struct
SQueryNode
*
pQueryInfo
,
struct
SEpSet
*
pQnode
,
char
**
str
);
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
1832a509
...
...
@@ -114,6 +114,14 @@ void schedulerDestroy(void);
*/
int32_t
schedulerConvertDagToTaskList
(
SQueryDag
*
pDag
,
SArray
**
pTasks
);
/**
* make one task info's multiple copies
* @param src
* @param dst SArray**<STaskInfo>
* @return
*/
int32_t
schedulerCopyTask
(
STaskInfo
*
src
,
SArray
**
dst
,
int32_t
copyNum
);
void
schedulerFreeTaskList
(
SArray
*
taskList
);
...
...
include/util/taoserror.h
浏览文件 @
1832a509
...
...
@@ -361,6 +361,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_TASK_DROPPING TAOS_DEF_ERROR_CODE(0, 0x0717) //"Task dropping")
#define TSDB_CODE_QRY_DUPLICATTED_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0718) //"Duplicatted operation")
#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719) //"Task message error")
#define TSDB_CODE_QRY_JOB_FREED TAOS_DEF_ERROR_CODE(0, 0x071A) //"Job freed")
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired")
...
...
source/client/src/clientImpl.c
浏览文件 @
1832a509
...
...
@@ -259,9 +259,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
SArray
*
execNode
=
taosArrayInit
(
4
,
sizeof
(
SQueryNodeAddr
));
SQueryNodeAddr
addr
=
{.
numOfEps
=
1
,
.
inUse
=
0
,
.
nodeId
=
0
};
SQueryNodeAddr
addr
=
{.
numOfEps
=
1
,
.
inUse
=
0
,
.
nodeId
=
1
};
addr
.
epAddr
[
0
].
port
=
6030
;
strcpy
(
addr
.
epAddr
[
0
].
fqdn
,
"
ubuntu
"
);
strcpy
(
addr
.
epAddr
[
0
].
fqdn
,
"
localhost
"
);
taosArrayPush
(
execNode
,
&
addr
);
return
scheduleAsyncExecJob
(
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
,
execNode
,
pDag
,
&
pRequest
->
body
.
pQueryJob
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
1832a509
...
...
@@ -375,9 +375,12 @@ typedef struct STaskParam {
}
STaskParam
;
typedef
struct
SExchangeInfo
{
int32_t
numOfSources
;
SEpSet
*
pEpset
;
int32_t
bytes
;
// total load bytes from remote
SArray
*
pSources
;
int32_t
bytes
;
// total load bytes from remote
tsem_t
ready
;
void
*
pTransporter
;
SRetrieveTableRsp
*
pRsp
;
SSDataBlock
*
pResult
;
}
SExchangeInfo
;
typedef
struct
STableScanInfo
{
...
...
@@ -545,7 +548,7 @@ typedef struct SOrderOperatorInfo {
void
appendUpstream
(
SOperatorInfo
*
p
,
SOperatorInfo
*
pUpstream
);
SOperatorInfo
*
createExchangeOperatorInfo
(
const
S
VgroupInfo
*
pVgroups
,
int32_t
numOfSources
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createExchangeOperatorInfo
(
const
S
Array
*
pSources
,
const
SArray
*
pSchema
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createDataBlocksOptScanInfo
(
void
*
pTsdbReadHandle
,
int32_t
order
,
int32_t
numOfOutput
,
int32_t
repeatTime
,
int32_t
reverseTime
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableScanOperatorInfo
(
void
*
pTsdbReadHandle
,
int32_t
order
,
int32_t
numOfOutput
,
int32_t
repeatTime
,
SExecTaskInfo
*
pTaskInfo
);
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
1832a509
...
...
@@ -79,42 +79,43 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
int32_t
tableType
=
0
;
SPhyNode
*
pPhyNode
=
pSubplan
->
pNode
;
if
(
pPhyNode
->
info
.
type
==
OP_TableScan
||
pPhyNode
->
info
.
type
==
OP_DataBlocksOptScan
)
{
STableGroupInfo
groupInfo
=
{
0
};
int32_t
type
=
pPhyNode
->
info
.
type
;
if
(
type
==
OP_TableScan
||
type
==
OP_DataBlocksOptScan
)
{
STableScanPhyNode
*
pTableScanNode
=
(
STableScanPhyNode
*
)
pPhyNode
;
uid
=
pTableScanNode
->
scan
.
uid
;
window
=
pTableScanNode
->
window
;
uid
=
pTableScanNode
->
scan
.
uid
;
window
=
pTableScanNode
->
window
;
tableType
=
pTableScanNode
->
scan
.
tableType
;
}
else
{
assert
(
0
);
}
STableGroupInfo
groupInfo
=
{
0
};
if
(
tableType
==
TSDB_SUPER_TABLE
)
{
code
=
tsdbQuerySTableByTagCond
(
tsdb
,
uid
,
window
.
skey
,
NULL
,
0
,
0
,
NULL
,
&
groupInfo
,
NULL
,
0
,
pSubplan
->
id
.
queryId
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
}
else
{
// Create one table group.
groupInfo
.
numOfTables
=
1
;
groupInfo
.
pGroupList
=
taosArrayInit
(
1
,
POINTER_BYTES
);
if
(
tableType
==
TSDB_SUPER_TABLE
)
{
code
=
tsdbQuerySTableByTagCond
(
tsdb
,
uid
,
window
.
skey
,
NULL
,
0
,
0
,
NULL
,
&
groupInfo
,
NULL
,
0
,
pSubplan
->
id
.
queryId
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
}
else
{
// Create one table group.
groupInfo
.
numOfTables
=
1
;
groupInfo
.
pGroupList
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SArray
*
pa
=
taosArrayInit
(
1
,
sizeof
(
STableKeyInfo
));
SArray
*
pa
=
taosArrayInit
(
1
,
sizeof
(
STableKeyInfo
));
STableKeyInfo
info
=
{.
pTable
=
NULL
,
.
lastKey
=
0
,
.
uid
=
uid
};
taosArrayPush
(
pa
,
&
info
);
taosArrayPush
(
groupInfo
.
pGroupList
,
&
pa
);
}
STableKeyInfo
info
=
{.
pTable
=
NULL
,
.
lastKey
=
0
,
.
uid
=
uid
};
taosArrayPush
(
pa
,
&
info
);
taosArrayPush
(
groupInfo
.
pGroupList
,
&
pa
);
}
if
(
groupInfo
.
numOfTables
==
0
)
{
code
=
0
;
// qDebug("no table qualified for query, reqId:0x%"PRIx64, (*pTask)->id.queryId);
goto
_error
;
if
(
groupInfo
.
numOfTables
==
0
)
{
code
=
0
;
// qDebug("no table qualified for query, reqId:0x%"PRIx64, (*pTask)->id.queryId);
goto
_error
;
}
}
code
=
doCreateExecTaskInfo
(
pSubplan
,
pTask
,
&
groupInfo
,
tsdb
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
code
=
doCreateExecTaskInfo
(
pSubplan
,
pTask
,
&
groupInfo
,
tsdb
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
SDataSinkMgtCfg
cfg
=
{.
maxDataBlockNum
=
1000
,
.
maxDataBlockNumPerQuery
=
100
};
code
=
dsDataSinkMgtInit
(
&
cfg
);
...
...
@@ -182,7 +183,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
// todo: remove it.
if
(
tinfo
==
NULL
)
{
return
NULL
;
return
TSDB_CODE_SUCCESS
;
}
*
pRes
=
NULL
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
1832a509
...
...
@@ -4914,7 +4914,41 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
}
int32_t
loadRemoteDataCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SExchangeInfo
*
pEx
=
(
SExchangeInfo
*
)
param
;
pEx
->
pRsp
=
pMsg
->
pData
;
pEx
->
pRsp
->
numOfRows
=
htonl
(
pEx
->
pRsp
->
numOfRows
);
pEx
->
pRsp
->
useconds
=
htobe64
(
pEx
->
pRsp
->
useconds
);
pEx
->
pRsp
->
compLen
=
htonl
(
pEx
->
pRsp
->
compLen
);
tsem_post
(
&
pEx
->
ready
);
}
static
void
destroySendMsgInfo
(
SMsgSendInfo
*
pMsgBody
)
{
assert
(
pMsgBody
!=
NULL
);
tfree
(
pMsgBody
->
msgInfo
.
pData
);
tfree
(
pMsgBody
);
}
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SMsgSendInfo
*
pSendInfo
=
(
SMsgSendInfo
*
)
pMsg
->
ahandle
;
assert
(
pMsg
->
ahandle
!=
NULL
);
SDataBuf
buf
=
{.
len
=
pMsg
->
contLen
,
.
pData
=
NULL
};
if
(
pMsg
->
contLen
>
0
)
{
buf
.
pData
=
calloc
(
1
,
pMsg
->
contLen
);
if
(
buf
.
pData
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
pMsg
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
memcpy
(
buf
.
pData
,
pMsg
->
pCont
,
pMsg
->
contLen
);
}
}
pSendInfo
->
fp
(
pSendInfo
->
param
,
&
buf
,
pMsg
->
code
);
rpcFreeCont
(
pMsg
->
pCont
);
destroySendMsgInfo
(
pSendInfo
);
}
static
SSDataBlock
*
doLoadRemoteData
(
void
*
param
,
bool
*
newgroup
)
{
...
...
@@ -4925,46 +4959,66 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
*
newgroup
=
false
;
if
(
pExchangeInfo
->
pRsp
!=
NULL
&&
pExchangeInfo
->
pRsp
->
completed
==
1
)
{
return
NULL
;
}
SResFetchReq
*
pMsg
=
calloc
(
1
,
sizeof
(
SResFetchReq
));
if
(
NULL
==
pMsg
)
{
// todo handle malloc error
}
SEpSet
epSet
;
SDownstreamSource
*
pSource
=
taosArrayGet
(
pExchangeInfo
->
pSources
,
0
);
SEpSet
epSet
=
{
0
};
int64_t
sId
=
-
1
,
queryId
=
0
,
taskId
=
1
,
vgId
=
1
;
pMsg
->
header
.
vgId
=
htonl
(
vgId
);
epSet
.
numOfEps
=
pSource
->
addr
.
numOfEps
;
epSet
.
port
[
0
]
=
pSource
->
addr
.
epAddr
[
0
].
port
;
tstrncpy
(
epSet
.
fqdn
[
0
],
pSource
->
addr
.
epAddr
[
0
].
fqdn
,
tListLen
(
epSet
.
fqdn
[
0
]));
pMsg
->
sId
=
htobe64
(
sId
);
pMsg
->
taskId
=
htobe64
(
taskId
);
pMsg
->
queryId
=
htobe64
(
queryId
);
pMsg
->
header
.
vgId
=
htonl
(
pSource
->
addr
.
nodeId
);
pMsg
->
sId
=
htobe64
(
pSource
->
schedId
);
pMsg
->
taskId
=
htobe64
(
pSource
->
taskId
);
pMsg
->
queryId
=
htobe64
(
pTaskInfo
->
id
.
queryId
);
// send the fetch remote task result reques
SMsgSendInfo
*
pMsgSendInfo
=
calloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
NULL
==
pMsgSendInfo
)
{
qError
(
"QID:%"
PRIx64
",TID:%"
PRIx64
" calloc %d failed"
,
queryId
,
taskId
,
(
int32_t
)
sizeof
(
SMsgSendInfo
));
qError
(
"QID:%"
PRIx64
" calloc %d failed"
,
GET_TASKID
(
pTaskInfo
)
,
(
int32_t
)
sizeof
(
SMsgSendInfo
));
}
pMsgSendInfo
->
param
=
NULL
;
pMsgSendInfo
->
param
=
pExchangeInfo
;
pMsgSendInfo
->
msgInfo
.
pData
=
pMsg
;
pMsgSendInfo
->
msgInfo
.
len
=
sizeof
(
SResFetchReq
);
pMsgSendInfo
->
msgType
=
TDMT_VND_FETCH
;
pMsgSendInfo
->
fp
=
loadRemoteDataCallback
;
int64_t
transporterId
=
0
;
void
*
pTransporter
=
NULL
;
int32_t
code
=
asyncSendMsgToServer
(
pTransporter
,
&
epSet
,
&
transporterId
,
pMsgSendInfo
);
int32_t
code
=
asyncSendMsgToServer
(
pExchangeInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
pMsgSendInfo
);
printf
(
"abc
\n
"
);
getchar
();
tsem_wait
(
&
pExchangeInfo
->
ready
);
// add it into the sink node
if
(
pExchangeInfo
->
pRsp
->
numOfRows
==
0
)
{
return
NULL
;
}
}
SSDataBlock
*
pRes
=
pExchangeInfo
->
pResult
;
char
*
pData
=
pExchangeInfo
->
pRsp
->
data
;
for
(
int32_t
i
=
0
;
i
<
pOperator
->
numOfOutput
;
++
i
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pRes
->
pDataBlock
,
i
);
char
*
tmp
=
realloc
(
pColInfoData
->
pData
,
pColInfoData
->
info
.
bytes
*
pExchangeInfo
->
pRsp
->
numOfRows
);
if
(
tmp
==
NULL
)
{
// todo
}
SOperatorInfo
*
createExchangeOperatorInfo
(
const
SVgroupInfo
*
pVgroups
,
int32_t
numOfSources
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
assert
(
numOfSources
>
0
);
size_t
len
=
pExchangeInfo
->
pRsp
->
numOfRows
*
pColInfoData
->
info
.
bytes
;
memcpy
(
tmp
,
pData
,
len
);
pData
+=
len
;
}
return
pExchangeInfo
->
pResult
;
}
SOperatorInfo
*
createExchangeOperatorInfo
(
const
SArray
*
pSources
,
const
SArray
*
pExprInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
SExchangeInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
SExchangeInfo
));
SOperatorInfo
*
pOperator
=
calloc
(
1
,
sizeof
(
SOperatorInfo
));
...
...
@@ -4975,18 +5029,57 @@ SOperatorInfo* createExchangeOperatorInfo(const SVgroupInfo* pVgroups, int32_t n
return
NULL
;
}
pInfo
->
numOfSources
=
numOfSources
;
pInfo
->
pSources
=
taosArrayDup
(
pSources
);
assert
(
taosArrayGetSize
(
pInfo
->
pSources
)
>
0
);
size_t
size
=
taosArrayGetSize
(
pExprInfo
);
pInfo
->
pResult
=
calloc
(
1
,
sizeof
(
SSDataBlock
));
pInfo
->
pResult
->
pDataBlock
=
taosArrayInit
(
pOperator
->
numOfOutput
,
sizeof
(
SColumnInfoData
));
SArray
*
pResult
=
pInfo
->
pResult
->
pDataBlock
;
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SColumnInfoData
colInfoData
=
{
0
};
SExprInfo
*
p
=
taosArrayGetP
(
pExprInfo
,
i
);
SSchema
*
pSchema
=
&
p
->
base
.
resSchema
;
colInfoData
.
info
.
type
=
pSchema
->
type
;
colInfoData
.
info
.
colId
=
pSchema
->
colId
;
colInfoData
.
info
.
bytes
=
pSchema
->
bytes
;
taosArrayPush
(
pResult
,
&
colInfoData
);
}
pOperator
->
name
=
"ExchangeOperator"
;
pOperator
->
operatorType
=
OP_Exchange
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
numOfOutput
=
size
;
pOperator
->
pRuntimeEnv
=
NULL
;
pOperator
->
exec
=
doLoadRemoteData
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
{
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"TSC"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
processMsgFromServer
;
rpcInit
.
sessions
=
tsMaxConnections
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
user
=
(
char
*
)
"root"
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
ckey
=
"key"
;
// rpcInit.spi = 1;
rpcInit
.
secret
=
(
char
*
)
"dcc5bed04851fec854c035b2e40263b6"
;
pInfo
->
pTransporter
=
rpcOpen
(
&
rpcInit
);
if
(
pInfo
->
pTransporter
==
NULL
)
{
return
NULL
;
// todo
}
}
return
pOperator
;
}
...
...
@@ -5016,7 +5109,6 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pRuntimeEnv
=
NULL
;
pOperator
->
exec
=
doTableScan
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
...
...
@@ -5049,7 +5141,6 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order,
pOperator
->
status
=
OP_IN_EXECUTING
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pRuntimeEnv
=
NULL
;
pOperator
->
exec
=
doTableScan
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
...
...
@@ -7363,6 +7454,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
SScanPhyNode
*
pScanPhyNode
=
(
SScanPhyNode
*
)
pPhyNode
;
size_t
numOfCols
=
taosArrayGetSize
(
pPhyNode
->
pTargets
);
return
createDataBlocksOptScanInfo
(
param
,
pScanPhyNode
->
order
,
numOfCols
,
pScanPhyNode
->
count
,
pScanPhyNode
->
reverse
,
pTaskInfo
);
}
else
if
(
pPhyNode
->
info
.
type
==
OP_Exchange
)
{
SExchangePhyNode
*
pEx
=
(
SExchangePhyNode
*
)
pPhyNode
;
return
createExchangeOperatorInfo
(
pEx
->
pSrcEndPoints
,
pEx
->
node
.
pTargets
,
pTaskInfo
);
}
else
{
assert
(
0
);
}
...
...
@@ -7372,32 +7466,35 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
int32_t
doCreateExecTaskInfo
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
STableGroupInfo
*
pGroupInfo
,
void
*
readerHandle
)
{
STsdbQueryCond
cond
=
{.
loadExternalRows
=
false
};
tsdbReadHandleT
tsdbReadHandle
=
NULL
;
SPhyNode
*
pPhyNode
=
pPlan
->
pNode
;
if
(
pPhyNode
->
info
.
type
==
OP_TableScan
||
pPhyNode
->
info
.
type
==
OP_DataBlocksOptScan
)
{
STableScanPhyNode
*
pTableScanNode
=
(
STableScanPhyNode
*
)
pPhyNode
;
cond
.
order
=
pTableScanNode
->
scan
.
order
;
STableScanPhyNode
*
pTableScanNode
=
(
STableScanPhyNode
*
)
pPhyNode
;
cond
.
order
=
pTableScanNode
->
scan
.
order
;
cond
.
numOfCols
=
taosArrayGetSize
(
pTableScanNode
->
scan
.
node
.
pTargets
);
cond
.
colList
=
calloc
(
cond
.
numOfCols
,
sizeof
(
SColumnInfo
));
cond
.
twindow
=
pTableScanNode
->
window
;
cond
.
type
=
BLOCK_LOAD_OFFSET_SEQ_ORDER
;
cond
.
colList
=
calloc
(
cond
.
numOfCols
,
sizeof
(
SColumnInfo
));
cond
.
twindow
=
pTableScanNode
->
window
;
cond
.
type
=
BLOCK_LOAD_OFFSET_SEQ_ORDER
;
for
(
int32_t
i
=
0
;
i
<
cond
.
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
cond
.
numOfCols
;
++
i
)
{
SExprInfo
*
pExprInfo
=
taosArrayGetP
(
pTableScanNode
->
scan
.
node
.
pTargets
,
i
);
assert
(
pExprInfo
->
pExpr
->
nodeType
==
TEXPR_COL_NODE
);
SSchema
*
pSchema
=
pExprInfo
->
pExpr
->
pSchema
;
cond
.
colList
[
i
].
type
=
pSchema
->
type
;
cond
.
colList
[
i
].
type
=
pSchema
->
type
;
cond
.
colList
[
i
].
bytes
=
pSchema
->
bytes
;
cond
.
colList
[
i
].
colId
=
pSchema
->
colId
;
}
*
pTaskInfo
=
createExecTaskInfo
((
uint64_t
)
pPlan
->
id
.
queryId
);
tsdbReadHandle
=
tsdbQueryTables
(
readerHandle
,
&
cond
,
pGroupInfo
,
(
*
pTaskInfo
)
->
id
.
queryId
,
NULL
);
}
else
if
(
pPhyNode
->
info
.
type
==
OP_Exchange
)
{
*
pTaskInfo
=
createExecTaskInfo
((
uint64_t
)
pPlan
->
id
.
queryId
);
}
else
{
assert
(
0
);
}
*
pTaskInfo
=
createExecTaskInfo
((
uint64_t
)
pPlan
->
id
.
queryId
);
tsdbReadHandleT
tsdbReadHandle
=
tsdbQueryTables
(
readerHandle
,
&
cond
,
pGroupInfo
,
(
*
pTaskInfo
)
->
id
.
queryId
,
NULL
);
(
*
pTaskInfo
)
->
pRoot
=
doCreateOperatorTreeNode
(
pPlan
->
pNode
,
*
pTaskInfo
,
tsdbReadHandle
);
if
((
*
pTaskInfo
)
->
pRoot
==
NULL
)
{
return
terrno
;
...
...
source/libs/planner/inc/plannerInt.h
浏览文件 @
1832a509
...
...
@@ -106,7 +106,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
int32_t
queryPlanToSql
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
sql
);
int32_t
createDag
(
SQueryPlanNode
*
pQueryNode
,
struct
SCatalog
*
pCatalog
,
SQueryDag
**
pDag
,
uint64_t
requestId
);
void
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
QueryNodeAddr
*
ep
);
void
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
DownstreamSource
*
pSource
);
int32_t
subPlanToString
(
const
SSubplan
*
pPhyNode
,
char
**
str
,
int32_t
*
len
);
int32_t
stringToSubplan
(
const
char
*
str
,
SSubplan
**
subplan
);
...
...
source/libs/planner/src/logicPlan.c
浏览文件 @
1832a509
...
...
@@ -13,9 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "function.h"
#include "os.h"
#include "parser.h"
#include "function.h"
#include "plannerInt.h"
typedef
struct
SFillEssInfo
{
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
1832a509
...
...
@@ -278,7 +278,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod
static
SPhyNode
*
createExchangeNode
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
,
uint64_t
srcTemplateId
)
{
SExchangePhyNode
*
node
=
(
SExchangePhyNode
*
)
initPhyNode
(
pPlanNode
,
OP_Exchange
,
sizeof
(
SExchangePhyNode
));
node
->
srcTemplateId
=
srcTemplateId
;
node
->
pSrcEndPoints
=
validPointer
(
taosArrayInit
(
TARRAY_MIN_SIZE
,
sizeof
(
S
QueryNodeAddr
)));
node
->
pSrcEndPoints
=
validPointer
(
taosArrayInit
(
TARRAY_MIN_SIZE
,
sizeof
(
S
DownstreamSource
)));
return
(
SPhyNode
*
)
node
;
}
...
...
@@ -409,24 +409,25 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
return
TSDB_CODE_SUCCESS
;
}
void
setExchangSourceNode
(
uint64_t
templateId
,
S
QueryNodeAddr
*
pEp
,
SPhyNode
*
pNode
)
{
void
setExchangSourceNode
(
uint64_t
templateId
,
S
DownstreamSource
*
pSource
,
SPhyNode
*
pNode
)
{
if
(
NULL
==
pNode
)
{
return
;
}
if
(
OP_Exchange
==
pNode
->
info
.
type
)
{
SExchangePhyNode
*
pExchange
=
(
SExchangePhyNode
*
)
pNode
;
if
(
templateId
==
pExchange
->
srcTemplateId
)
{
taosArrayPush
(
pExchange
->
pSrcEndPoints
,
p
Ep
);
taosArrayPush
(
pExchange
->
pSrcEndPoints
,
p
Source
);
}
}
if
(
pNode
->
pChildren
!=
NULL
)
{
size_t
size
=
taosArrayGetSize
(
pNode
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
setExchangSourceNode
(
templateId
,
p
Ep
,
taosArrayGetP
(
pNode
->
pChildren
,
i
));
setExchangSourceNode
(
templateId
,
p
Source
,
taosArrayGetP
(
pNode
->
pChildren
,
i
));
}
}
}
void
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
QueryNodeAddr
*
pEp
)
{
setExchangSourceNode
(
templateId
,
p
Ep
,
subplan
->
pNode
);
void
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
DownstreamSource
*
pSource
)
{
setExchangSourceNode
(
templateId
,
p
Source
,
subplan
->
pNode
);
}
source/libs/planner/src/physicalPlanJson.c
浏览文件 @
1832a509
...
...
@@ -729,40 +729,75 @@ static bool epAddrFromJson(const cJSON* json, void* obj) {
return
true
;
}
static
const
char
*
jkNodeAddrId
=
"NodeId"
;
static
const
char
*
jkNodeAddrInUse
=
"InUse"
;
static
const
char
*
jkNodeAddrEpAddrs
=
"EpAddrs"
;
static
const
char
*
jkNodeAddrId
=
"NodeId"
;
static
const
char
*
jkNodeAddrInUse
=
"InUse"
;
static
const
char
*
jkNodeAddrEpAddrs
=
"Ep"
;
static
const
char
*
jkNodeAddr
=
"NodeAddr"
;
static
const
char
*
jkNodeTaskId
=
"TaskId"
;
static
const
char
*
jkNodeTaskSchedId
=
"SchedId"
;
static
bool
queryNodeAddrToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
SQueryNodeAddr
*
pAddr
=
(
const
SQueryNodeAddr
*
)
obj
;
bool
res
=
cJSON_AddNumberToObject
(
json
,
jkNodeAddrId
,
pAddr
->
nodeId
);
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
json
,
jkNodeAddrInUse
,
pAddr
->
inUse
);
}
if
(
res
)
{
res
=
addRawArray
(
json
,
jkNodeAddrEpAddrs
,
epAddrToJson
,
pAddr
->
epAddr
,
sizeof
(
SEpAddr
),
pAddr
->
numOfEps
);
}
return
res
;
}
static
bool
queryNodeAddrFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
SQueryNodeAddr
*
pAddr
=
(
SQueryNodeAddr
*
)
obj
;
pAddr
->
nodeId
=
getNumber
(
json
,
jkNodeAddrId
);
pAddr
->
inUse
=
getNumber
(
json
,
jkNodeAddrInUse
);
int32_t
numOfEps
=
0
;
bool
res
=
fromRawArray
(
json
,
jkNodeAddrEpAddrs
,
epAddrFromJson
,
pAddr
->
epAddr
,
sizeof
(
SEpAddr
),
&
numOfEps
);
pAddr
->
numOfEps
=
numOfEps
;
return
res
;
}
static
bool
nodeAddrToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
SQueryNodeAddr
*
ep
=
(
const
SQueryNodeAddr
*
)
obj
;
bool
res
=
cJSON_AddNumberToObject
(
json
,
jkNodeAddrId
,
ep
->
nodeId
);
const
SDownstreamSource
*
pSource
=
(
const
SDownstreamSource
*
)
obj
;
bool
res
=
cJSON_AddNumberToObject
(
json
,
jkNodeTaskId
,
pSource
->
taskId
);
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
json
,
jkNodeAddrInUse
,
ep
->
inUse
);
char
t
[
30
]
=
{
0
};
snprintf
(
t
,
tListLen
(
t
),
"%"
PRIu64
,
pSource
->
schedId
);
res
=
cJSON_AddStringToObject
(
json
,
jkNodeTaskSchedId
,
t
);
}
if
(
res
)
{
res
=
add
RawArray
(
json
,
jkNodeAddrEpAddrs
,
epAddrToJson
,
ep
->
epAddr
,
sizeof
(
SEpAddr
),
ep
->
numOfEps
);
res
=
add
Object
(
json
,
jkNodeAddr
,
queryNodeAddrToJson
,
&
pSource
->
addr
);
}
return
res
;
}
static
bool
nodeAddrFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
SQueryNodeAddr
*
ep
=
(
SQueryNodeAddr
*
)
obj
;
ep
->
nodeId
=
getNumber
(
json
,
jkNodeAddrId
);
ep
->
inUse
=
getNumber
(
json
,
jkNodeAddrInUse
);
int32_t
numOfEps
=
0
;
bool
res
=
fromRawArray
(
json
,
jkNodeAddrEpAddrs
,
nodeAddrFromJson
,
&
ep
->
epAddr
,
sizeof
(
SEpAddr
),
&
numOfEps
);
ep
->
numOfEps
=
numOfEps
;
SDownstreamSource
*
pSource
=
(
SDownstreamSource
*
)
obj
;
pSource
->
taskId
=
getNumber
(
json
,
jkNodeTaskId
);
char
*
pSchedId
=
getString
(
json
,
jkNodeTaskSchedId
);
pSource
->
schedId
=
strtoll
(
pSchedId
,
NULL
,
10
);
tfree
(
pSchedId
);
bool
res
=
fromObject
(
json
,
jkNodeAddr
,
queryNodeAddrFromJson
,
&
pSource
->
addr
,
true
);
return
res
;
}
static
const
char
*
jkExchangeNodeSrcTemplateId
=
"SrcTemplateId"
;
static
const
char
*
jkExchangeNodeSrcEndPoints
=
"Src
EndPoint
s"
;
static
const
char
*
jkExchangeNodeSrcEndPoints
=
"Src
Addr
s"
;
static
bool
exchangeNodeToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
SExchangePhyNode
*
exchange
=
(
const
SExchangePhyNode
*
)
obj
;
bool
res
=
cJSON_AddNumberToObject
(
json
,
jkExchangeNodeSrcTemplateId
,
exchange
->
srcTemplateId
);
if
(
res
)
{
res
=
add
InlineArray
(
json
,
jkExchangeNodeSrcEndPoints
,
nodeAddrToJson
,
exchange
->
pSrcEndPoints
);
res
=
add
RawArray
(
json
,
jkExchangeNodeSrcEndPoints
,
nodeAddrToJson
,
exchange
->
pSrcEndPoints
->
pData
,
sizeof
(
SDownstreamSource
),
taosArrayGetSize
(
exchange
->
pSrcEndPoints
)
);
}
return
res
;
}
...
...
@@ -770,7 +805,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) {
static
bool
exchangeNodeFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
SExchangePhyNode
*
exchange
=
(
SExchangePhyNode
*
)
obj
;
exchange
->
srcTemplateId
=
getNumber
(
json
,
jkExchangeNodeSrcTemplateId
);
return
fromInlineArray
(
json
,
jkExchangeNodeSrcEndPoints
,
nodeAddrFromJson
,
&
exchange
->
pSrcEndPoints
,
sizeof
(
S
QueryNodeAddr
));
return
fromInlineArray
(
json
,
jkExchangeNodeSrcEndPoints
,
nodeAddrFromJson
,
&
exchange
->
pSrcEndPoints
,
sizeof
(
S
DownstreamSource
));
}
static
bool
specificPhyNodeToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
...
...
@@ -979,7 +1014,11 @@ static const char* jkIdSubplanId = "SubplanId";
static
bool
subplanIdToJson
(
const
void
*
obj
,
cJSON
*
jId
)
{
const
SSubplanId
*
id
=
(
const
SSubplanId
*
)
obj
;
bool
res
=
cJSON_AddNumberToObject
(
jId
,
jkIdQueryId
,
id
->
queryId
);
char
ids
[
40
]
=
{
0
};
snprintf
(
ids
,
tListLen
(
ids
),
"%"
PRIu64
,
id
->
queryId
);
bool
res
=
cJSON_AddStringToObject
(
jId
,
jkIdQueryId
,
ids
);
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
jId
,
jkIdTemplateId
,
id
->
templateId
);
}
...
...
@@ -991,7 +1030,11 @@ static bool subplanIdToJson(const void* obj, cJSON* jId) {
static
bool
subplanIdFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
SSubplanId
*
id
=
(
SSubplanId
*
)
obj
;
id
->
queryId
=
getNumber
(
json
,
jkIdQueryId
);
char
*
queryId
=
getString
(
json
,
jkIdQueryId
);
id
->
queryId
=
strtoll
(
queryId
,
NULL
,
0
);
tfree
(
queryId
);
id
->
templateId
=
getNumber
(
json
,
jkIdTemplateId
);
id
->
subplanId
=
getNumber
(
json
,
jkIdSubplanId
);
return
true
;
...
...
source/libs/planner/src/planner.c
浏览文件 @
1832a509
...
...
@@ -87,8 +87,8 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
return
TSDB_CODE_SUCCESS
;
}
void
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
QueryNodeAddr
*
ep
)
{
setSubplanExecutionNode
(
subplan
,
templateId
,
ep
);
void
qSetSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
S
DownstreamSource
*
pSource
)
{
setSubplanExecutionNode
(
subplan
,
templateId
,
pSource
);
}
int32_t
qSubPlanToString
(
const
SSubplan
*
subplan
,
char
**
str
,
int32_t
*
len
)
{
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
1832a509
...
...
@@ -20,12 +20,38 @@
static
SSchedulerMgmt
schMgmt
=
{
0
};
uint64_t
schGenTaskId
(
void
)
{
return
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
}
uint64_t
schGenUUID
(
void
)
{
static
uint64_t
hashId
=
0
;
static
int32_t
requestSerialId
=
0
;
if
(
hashId
==
0
)
{
char
uid
[
64
];
int32_t
code
=
taosGetSystemUUID
(
uid
,
tListLen
(
uid
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"Failed to get the system uid, reason:%s"
,
tstrerror
(
TAOS_SYSTEM_ERROR
(
errno
)));
}
else
{
hashId
=
MurmurHash3_32
(
uid
,
strlen
(
uid
));
}
}
int64_t
ts
=
taosGetTimestampMs
();
uint64_t
pid
=
taosGetPId
();
int32_t
val
=
atomic_add_fetch_32
(
&
requestSerialId
,
1
);
uint64_t
id
=
((
hashId
&
0x0FFF
)
<<
52
)
|
((
pid
&
0x0FFF
)
<<
40
)
|
((
ts
&
0xFFFFFF
)
<<
16
)
|
(
val
&
0xFFFF
);
return
id
;
}
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
)
{
pTask
->
plan
=
pPlan
;
pTask
->
level
=
pLevel
;
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_NOT_START
);
pTask
->
taskId
=
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
pTask
->
taskId
=
schGenTaskId
(
);
pTask
->
execAddrs
=
taosArrayInit
(
SCH_MAX_CANDIDATE_EP_NUM
,
sizeof
(
SQueryNodeAddr
));
if
(
NULL
==
pTask
->
execAddrs
)
{
SCH_TASK_ELOG
(
"taosArrayInit %d exec addrs failed"
,
SCH_MAX_CANDIDATE_EP_NUM
);
...
...
@@ -40,8 +66,7 @@ void schFreeTask(SSchTask* pTask) {
taosArrayDestroy
(
pTask
->
candidateAddrs
);
}
// TODO NEED TO VERFY WITH ASYNC_SEND MEMORY FREE
//tfree(pTask->msg);
tfree
(
pTask
->
msg
);
if
(
pTask
->
children
)
{
taosArrayDestroy
(
pTask
->
children
);
...
...
@@ -71,7 +96,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
SCH_TASK_ELOG
(
"rsp msg type mis-match, last sent msgType:%d, rspType:%d"
,
lastMsgType
,
msgType
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
SCH_GET_TASK_STATUS
(
pTask
)
!=
JOB_TASK_STATUS_EXECUTING
&&
SCH_GET_TASK_STATUS
(
pTask
)
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
SCH_TASK_ELOG
(
"rsp msg conflicted with task status, status:%d, rspType:%d"
,
SCH_GET_TASK_STATUS
(
pTask
),
msgType
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
...
...
@@ -141,7 +166,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break
;
case
JOB_TASK_STATUS_CANCELLED
:
case
JOB_TASK_STATUS_DROPPING
:
SCH_ERR_JRET
(
TSDB_CODE_QRY_
APP_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_
JOB_FREED
);
break
;
default:
...
...
@@ -541,12 +566,9 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b
return
TSDB_CODE_SUCCESS
;
}
// Note: no more error processing, handled in function internal
int32_t
schProcessOnJobFailure
(
SSchJob
*
pJob
,
int32_t
errCode
)
{
int32_t
schProcessOnJobFailureImpl
(
SSchJob
*
pJob
,
int32_t
status
,
int32_t
errCode
)
{
// if already FAILED, no more processing
SCH_ERR_RET
(
schCheckAndUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_FAILED
));
SCH_ERR_RET
(
schCheckAndUpdateJobStatus
(
pJob
,
status
));
if
(
errCode
)
{
atomic_store_32
(
&
pJob
->
errCode
,
errCode
);
...
...
@@ -563,6 +585,17 @@ int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
// Note: no more error processing, handled in function internal
int32_t
schProcessOnJobFailure
(
SSchJob
*
pJob
,
int32_t
errCode
)
{
SCH_RET
(
schProcessOnJobFailureImpl
(
pJob
,
JOB_TASK_STATUS_FAILED
,
errCode
));
}
// Note: no more error processing, handled in function internal
int32_t
schProcessOnJobDropped
(
SSchJob
*
pJob
,
int32_t
errCode
)
{
SCH_RET
(
schProcessOnJobFailureImpl
(
pJob
,
JOB_TASK_STATUS_DROPPING
,
errCode
));
}
// Note: no more error processing, handled in function internal
int32_t
schFetchFromRemote
(
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
...
...
@@ -749,7 +782,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
atomic_add_fetch_32
(
&
par
->
childReady
,
1
);
SCH_LOCK
(
SCH_WRITE
,
&
par
->
lock
);
qSetSubplanExecutionNode
(
par
->
plan
,
pTask
->
plan
->
id
.
templateId
,
&
pTask
->
succeedAddr
);
SDownstreamSource
source
=
{.
taskId
=
pTask
->
taskId
,
.
schedId
=
schMgmt
.
sId
,
.
addr
=
pTask
->
succeedAddr
};
qSetSubplanExecutionNode
(
par
->
plan
,
pTask
->
plan
->
id
.
templateId
,
&
source
);
SCH_UNLOCK
(
SCH_WRITE
,
&
par
->
lock
);
if
(
SCH_TASK_READY_TO_LUNCH
(
par
))
{
...
...
@@ -820,7 +854,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SResReadyRsp
*
rsp
=
(
SResReadyRsp
*
)
msg
;
if
(
rspCode
!=
TSDB_CODE_SUCCESS
||
NULL
==
msg
||
rsp
->
code
!=
TSDB_CODE_SUCCESS
)
{
SCH_ERR_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
rsp
->
c
ode
));
SCH_ERR_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
rsp
C
ode
));
}
SCH_ERR_RET
(
schProcessOnTaskSuccess
(
pJob
,
pTask
));
...
...
@@ -834,9 +868,19 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
rspCode
));
}
if
(
pJob
->
res
)
{
SCH_TASK_ELOG
(
"got fetch rsp while res already exists, res:%p"
,
pJob
->
res
);
tfree
(
rsp
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
atomic_store_ptr
(
&
pJob
->
res
,
rsp
);
atomic_store_32
(
&
pJob
->
resNumOfRows
,
rsp
->
numOfRows
);
if
(
rsp
->
completed
)
{
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_SUCCEED
);
}
SCH_ERR_JRET
(
schProcessOnDataFetched
(
pJob
));
break
;
...
...
@@ -871,7 +915,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
SSchJob
**
job
=
taosHashGet
(
schMgmt
.
jobs
,
&
pParam
->
queryId
,
sizeof
(
pParam
->
queryId
));
if
(
NULL
==
job
||
NULL
==
(
*
job
))
{
qError
(
"QID:%"
PRIx64
" taosHashGet queryId not exist, may be dropped"
,
pParam
->
queryId
);
SCH_ERR_JRET
(
TSDB_CODE_
SCH_INTERNAL_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_
QRY_JOB_FREED
);
}
pJob
=
*
job
;
...
...
@@ -880,13 +924,13 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
int32_t
s
=
taosHashGetSize
(
pJob
->
execTasks
);
if
(
s
<=
0
)
{
qError
(
"QID:%"
PRIx64
",TID:%"
PRI
x
64
" no task in execTask list"
,
pParam
->
queryId
,
pParam
->
taskId
);
qError
(
"QID:%"
PRIx64
",TID:%"
PRI
d
64
" no task in execTask list"
,
pParam
->
queryId
,
pParam
->
taskId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
SSchTask
**
task
=
taosHashGet
(
pJob
->
execTasks
,
&
pParam
->
taskId
,
sizeof
(
pParam
->
taskId
));
if
(
NULL
==
task
||
NULL
==
(
*
task
))
{
qError
(
"QID:%"
PRIx64
",TID:%"
PRI
x
64
" taosHashGet taskId not exist"
,
pParam
->
queryId
,
pParam
->
taskId
);
qError
(
"QID:%"
PRIx64
",TID:%"
PRI
d
64
" taosHashGet taskId not exist"
,
pParam
->
queryId
,
pParam
->
taskId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
...
...
@@ -1033,7 +1077,13 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
case
TDMT_VND_CREATE_TABLE
:
case
TDMT_VND_SUBMIT
:
{
msgSize
=
pTask
->
msgLen
;
msg
=
pTask
->
msg
;
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
SCH_TASK_ELOG
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
memcpy
(
msg
,
pTask
->
msg
,
msgSize
);
break
;
}
...
...
@@ -1430,13 +1480,13 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SSubQueryMsg
*
pMsg
=
msg
;
SSubQueryMsg
*
pMsg
=
(
SSubQueryMsg
*
)
msg
;
pMsg
->
header
.
vgId
=
htonl
(
tInfo
.
addr
.
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
plan
->
id
.
queryId
);
pMsg
->
taskId
=
htobe64
(
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
));
pMsg
->
taskId
=
htobe64
(
schGenUUID
(
));
pMsg
->
contentLen
=
htonl
(
msgLen
);
memcpy
(
pMsg
->
msg
,
msg
,
msgLen
);
...
...
@@ -1459,6 +1509,52 @@ _return:
SCH_RET
(
code
);
}
int32_t
schedulerCopyTask
(
STaskInfo
*
src
,
SArray
**
dst
,
int32_t
copyNum
)
{
if
(
NULL
==
src
||
NULL
==
dst
||
copyNum
<=
0
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
code
=
0
;
*
dst
=
taosArrayInit
(
copyNum
,
sizeof
(
STaskInfo
));
if
(
NULL
==
*
dst
)
{
qError
(
"taosArrayInit %d taskInfo failed"
,
copyNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
int32_t
msgSize
=
src
->
msg
->
contentLen
+
sizeof
(
*
src
->
msg
);
STaskInfo
info
=
{
0
};
info
.
addr
=
src
->
addr
;
for
(
int32_t
i
=
0
;
i
<
copyNum
;
++
i
)
{
info
.
msg
=
malloc
(
msgSize
);
if
(
NULL
==
info
.
msg
)
{
qError
(
"malloc %d failed"
,
msgSize
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
memcpy
(
info
.
msg
,
src
->
msg
,
msgSize
);
info
.
msg
->
taskId
=
schGenUUID
();
if
(
NULL
==
taosArrayPush
(
*
dst
,
&
info
))
{
qError
(
"taosArrayPush failed, idx:%d"
,
i
);
free
(
info
.
msg
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
return
TSDB_CODE_SUCCESS
;
_return:
schedulerFreeTaskList
(
*
dst
);
*
dst
=
NULL
;
SCH_RET
(
code
);
}
int32_t
scheduleFetchRows
(
SSchJob
*
pJob
,
void
**
pData
)
{
if
(
NULL
==
pJob
||
NULL
==
pData
)
{
...
...
@@ -1466,33 +1562,29 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
}
int32_t
code
=
0
;
atomic_add_fetch_32
(
&
pJob
->
ref
,
1
);
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
status
==
JOB_TASK_STATUS_DROPPING
)
{
SCH_JOB_ELOG
(
"job is dropping, status:%d"
,
status
);
return
TSDB_CODE_SCH_STATUS_ERROR
;
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
)
;
}
atomic_add_fetch_32
(
&
pJob
->
ref
,
1
);
if
(
!
SCH_JOB_NEED_FETCH
(
&
pJob
->
attr
))
{
SCH_JOB_ELOG
(
"no need to fetch data, status:%d"
,
SCH_GET_JOB_STATUS
(
pJob
));
atomic_sub_fetch_32
(
&
pJob
->
ref
,
1
);
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
0
,
1
)
!=
0
)
{
SCH_JOB_ELOG
(
"prior fetching not finished, userFetch:%d"
,
atomic_load_8
(
&
pJob
->
userFetch
));
atomic_sub_fetch_32
(
&
pJob
->
ref
,
1
);
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
status
==
JOB_TASK_STATUS_FAILED
)
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
res
);
atomic_store_ptr
(
&
pJob
->
res
,
NULL
);
if
(
JOB_TASK_STATUS_FAILED
==
status
||
JOB_TASK_STATUS_DROPPING
==
status
)
{
SCH_JOB_ELOG
(
"job failed or dropping, status:%d"
,
status
);
SCH_ERR_JRET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
else
if
(
status
==
JOB_TASK_STATUS_SUCCEED
)
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
res
);
atomic_store_ptr
(
&
pJob
->
res
,
NULL
);
SCH_JOB_ELOG
(
"job already succeed, status:%d"
,
status
);
goto
_return
;
}
else
if
(
status
==
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
SCH_ERR_JRET
(
schFetchFromRemote
(
pJob
));
...
...
@@ -1502,15 +1594,17 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
status
==
JOB_TASK_STATUS_FAILED
)
{
code
=
atomic_load_32
(
&
pJob
->
errCode
);
SCH_ERR_JRET
(
code
);
if
(
JOB_TASK_STATUS_FAILED
==
status
||
JOB_TASK_STATUS_DROPPING
==
status
)
{
SCH_JOB_ELOG
(
"job failed or dropping, status:%d"
,
status
);
SCH_ERR_JRET
(
atomic_load_32
(
&
pJob
->
errCode
)
);
}
if
(
pJob
->
res
&&
((
SRetrieveTableRsp
*
)
pJob
->
res
)
->
completed
)
{
SCH_ERR_JRET
(
schCheckAndUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_SUCCEED
));
}
_return:
while
(
true
)
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
res
);
...
...
@@ -1521,10 +1615,19 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
break
;
}
_return:
if
(
NULL
==
*
pData
)
{
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
calloc
(
1
,
sizeof
(
SRetrieveTableRsp
));
if
(
rsp
)
{
rsp
->
completed
=
1
;
}
*
pData
=
rsp
;
}
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
1
,
0
);
SCH_JOB_DLOG
(
"fetch done, code:%x"
,
code
);
atomic_sub_fetch_32
(
&
pJob
->
ref
,
1
);
SCH_RET
(
code
);
...
...
@@ -1549,6 +1652,7 @@ void scheduleFreeJob(void *job) {
SSchJob
*
pJob
=
job
;
uint64_t
queryId
=
pJob
->
queryId
;
bool
setJobFree
=
false
;
if
(
SCH_GET_JOB_STATUS
(
pJob
)
>
0
)
{
if
(
0
!=
taosHashRemove
(
schMgmt
.
jobs
,
&
pJob
->
queryId
,
sizeof
(
pJob
->
queryId
)))
{
...
...
@@ -1556,8 +1660,6 @@ void scheduleFreeJob(void *job) {
return
;
}
schCheckAndUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_DROPPING
);
SCH_JOB_DLOG
(
"job removed from list, no further ref, ref:%d"
,
atomic_load_32
(
&
pJob
->
ref
));
while
(
true
)
{
...
...
@@ -1565,6 +1667,11 @@ void scheduleFreeJob(void *job) {
if
(
0
==
ref
)
{
break
;
}
else
if
(
ref
>
0
)
{
if
(
1
==
ref
&&
atomic_load_8
(
&
pJob
->
userFetch
)
>
0
&&
!
setJobFree
)
{
schProcessOnJobDropped
(
pJob
,
TSDB_CODE_QRY_JOB_FREED
);
setJobFree
=
true
;
}
usleep
(
1
);
}
else
{
assert
(
0
);
...
...
@@ -1600,6 +1707,7 @@ void scheduleFreeJob(void *job) {
taosHashCleanup
(
pJob
->
succTasks
);
taosArrayDestroy
(
pJob
->
levels
);
taosArrayDestroy
(
pJob
->
nodeList
);
tfree
(
pJob
->
res
);
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
1832a509
...
...
@@ -38,6 +38,20 @@
namespace
{
extern
"C"
int32_t
schHandleResponseMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
);
extern
"C"
int32_t
schHandleCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
msgType
,
int32_t
rspCode
);
struct
SSchJob
*
pInsertJob
=
NULL
;
struct
SSchJob
*
pQueryJob
=
NULL
;
uint64_t
schtMergeTemplateId
=
0x4
;
uint64_t
schtFetchTaskId
=
0
;
uint64_t
schtQueryId
=
1
;
bool
schtTestStop
=
false
;
bool
schtTestDeadLoop
=
false
;
int32_t
schtTestMTRunSec
=
10
;
int32_t
schtTestPrintNum
=
1000
;
int32_t
schtStartFetch
=
0
;
void
schtInitLogFile
()
{
...
...
@@ -57,7 +71,7 @@ void schtInitLogFile() {
void
schtBuildQueryDag
(
SQueryDag
*
dag
)
{
uint64_t
qId
=
0x0000000000000001
;
uint64_t
qId
=
schtQueryId
;
dag
->
queryId
=
qId
;
dag
->
numOfSubplans
=
2
;
...
...
@@ -84,7 +98,7 @@ void schtBuildQueryDag(SQueryDag *dag) {
scanPlan
->
msgType
=
TDMT_VND_QUERY
;
mergePlan
->
id
.
queryId
=
qId
;
mergePlan
->
id
.
templateId
=
0x4444444444
;
mergePlan
->
id
.
templateId
=
schtMergeTemplateId
;
mergePlan
->
id
.
subplanId
=
0x5555555555
;
mergePlan
->
type
=
QUERY_TYPE_MERGE
;
mergePlan
->
level
=
0
;
...
...
@@ -173,8 +187,6 @@ void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int6
}
void
schtSetPlanToString
()
{
static
Stub
stub
;
stub
.
set
(
qSubPlanToString
,
schtPlanToString
);
...
...
@@ -214,7 +226,12 @@ void schtSetRpcSendRequest() {
}
}
int32_t
schtAsyncSendMsgToServer
(
void
*
pTransporter
,
SEpSet
*
epSet
,
int64_t
*
pTransporterId
,
const
SMsgSendInfo
*
pInfo
)
{
int32_t
schtAsyncSendMsgToServer
(
void
*
pTransporter
,
SEpSet
*
epSet
,
int64_t
*
pTransporterId
,
SMsgSendInfo
*
pInfo
)
{
if
(
pInfo
)
{
tfree
(
pInfo
->
param
);
tfree
(
pInfo
->
msgInfo
.
pData
);
free
(
pInfo
);
}
return
0
;
}
...
...
@@ -269,15 +286,224 @@ void *schtCreateFetchRspThread(void *param) {
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
calloc
(
1
,
sizeof
(
SRetrieveTableRsp
));
rsp
->
completed
=
1
;
rsp
->
numOfRows
=
10
;
code
=
schHandleResponseMsg
(
job
,
job
->
fetchTask
,
TDMT_VND_FETCH_RSP
,
(
char
*
)
rsp
,
sizeof
(
rsp
),
0
);
code
=
schHandleResponseMsg
(
job
,
job
->
fetchTask
,
TDMT_VND_FETCH_RSP
,
(
char
*
)
rsp
,
sizeof
(
*
rsp
),
0
);
assert
(
code
==
0
);
}
void
*
schtFetchRspThread
(
void
*
aa
)
{
SDataBuf
dataBuf
=
{
0
};
SSchCallbackParam
*
param
=
NULL
;
while
(
!
schtTestStop
)
{
if
(
0
==
atomic_val_compare_exchange_32
(
&
schtStartFetch
,
1
,
0
))
{
continue
;
}
usleep
(
1
);
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
param
->
taskId
=
schtFetchTaskId
;
int32_t
code
=
0
;
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
calloc
(
1
,
sizeof
(
SRetrieveTableRsp
));
rsp
->
completed
=
1
;
rsp
->
numOfRows
=
10
;
dataBuf
.
pData
=
rsp
;
dataBuf
.
len
=
sizeof
(
*
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_FETCH_RSP
,
0
);
assert
(
code
==
0
||
code
);
}
}
void
schtFreeQueryJob
(
int32_t
freeThread
)
{
static
uint32_t
freeNum
=
0
;
SSchJob
*
job
=
atomic_load_ptr
(
&
pQueryJob
);
if
(
job
&&
atomic_val_compare_exchange_ptr
(
&
pQueryJob
,
job
,
NULL
))
{
scheduleFreeJob
(
job
);
if
(
freeThread
)
{
if
(
++
freeNum
%
schtTestPrintNum
==
0
)
{
printf
(
"FreeNum:%d
\n
"
,
freeNum
);
}
}
}
}
void
*
schtRunJobThread
(
void
*
aa
)
{
void
*
mockPointer
=
(
void
*
)
0x1
;
char
*
clusterId
=
"cluster1"
;
char
*
dbname
=
"1.db1"
;
char
*
tablename
=
"table1"
;
SVgroupInfo
vgInfo
=
{
0
};
SQueryDag
dag
=
{
0
};
schtInitLogFile
();
int32_t
code
=
schedulerInit
(
NULL
);
assert
(
code
==
0
);
schtSetPlanToString
();
schtSetExecNode
();
schtSetAsyncSendMsgToServer
();
SSchJob
*
job
=
NULL
;
SSchCallbackParam
*
param
=
NULL
;
SHashObj
*
execTasks
=
NULL
;
SDataBuf
dataBuf
=
{
0
};
uint32_t
jobFinished
=
0
;
while
(
!
schtTestStop
)
{
schtBuildQueryDag
(
&
dag
);
SArray
*
qnodeList
=
taosArrayInit
(
1
,
sizeof
(
SEpAddr
));
SEpAddr
qnodeAddr
=
{
0
};
strcpy
(
qnodeAddr
.
fqdn
,
"qnode0.ep"
);
qnodeAddr
.
port
=
6031
;
taosArrayPush
(
qnodeList
,
&
qnodeAddr
);
code
=
scheduleAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
job
);
assert
(
code
==
0
);
execTasks
=
taosHashInit
(
5
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
void
*
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
schtFetchTaskId
=
task
->
taskId
-
1
;
taosHashPut
(
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
task
,
sizeof
(
*
task
));
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
pQueryJob
=
job
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
(
SSchTask
*
)
pIter
;
param
->
taskId
=
task
->
taskId
;
SQueryTableRsp
rsp
=
{
0
};
dataBuf
.
pData
=
&
rsp
;
dataBuf
.
len
=
sizeof
(
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_QUERY_RSP
,
0
);
assert
(
code
==
0
||
code
);
pIter
=
taosHashIterate
(
execTasks
,
pIter
);
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
(
SSchTask
*
)
pIter
;
param
->
taskId
=
task
->
taskId
;
SResReadyRsp
rsp
=
{
0
};
dataBuf
.
pData
=
&
rsp
;
dataBuf
.
len
=
sizeof
(
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_RES_READY_RSP
,
0
);
assert
(
code
==
0
||
code
);
pIter
=
taosHashIterate
(
execTasks
,
pIter
);
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
(
SSchTask
*
)
pIter
;
param
->
taskId
=
task
->
taskId
-
1
;
SQueryTableRsp
rsp
=
{
0
};
dataBuf
.
pData
=
&
rsp
;
dataBuf
.
len
=
sizeof
(
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_QUERY_RSP
,
0
);
assert
(
code
==
0
||
code
);
pIter
=
taosHashIterate
(
execTasks
,
pIter
);
}
param
=
(
SSchCallbackParam
*
)
calloc
(
1
,
sizeof
(
*
param
));
param
->
queryId
=
schtQueryId
;
pIter
=
taosHashIterate
(
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
(
SSchTask
*
)
pIter
;
param
->
taskId
=
task
->
taskId
-
1
;
SResReadyRsp
rsp
=
{
0
};
dataBuf
.
pData
=
&
rsp
;
dataBuf
.
len
=
sizeof
(
rsp
);
code
=
schHandleCallback
(
param
,
&
dataBuf
,
TDMT_VND_RES_READY_RSP
,
0
);
assert
(
code
==
0
||
code
);
pIter
=
taosHashIterate
(
execTasks
,
pIter
);
}
atomic_store_32
(
&
schtStartFetch
,
1
);
void
*
data
=
NULL
;
code
=
scheduleFetchRows
(
pQueryJob
,
&
data
);
assert
(
code
==
0
||
code
);
if
(
0
==
code
)
{
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
data
;
assert
(
pRsp
->
completed
==
1
);
assert
(
pRsp
->
numOfRows
==
10
);
}
data
=
NULL
;
code
=
scheduleFetchRows
(
pQueryJob
,
&
data
);
assert
(
code
==
0
||
code
);
schtFreeQueryJob
(
0
);
taosHashCleanup
(
execTasks
);
schtFreeQueryDag
(
&
dag
);
if
(
++
jobFinished
%
schtTestPrintNum
==
0
)
{
printf
(
"jobFinished:%d
\n
"
,
jobFinished
);
}
++
schtQueryId
;
}
schedulerDestroy
();
}
void
*
schtFreeJobThread
(
void
*
aa
)
{
while
(
!
schtTestStop
)
{
usleep
(
rand
()
%
100
);
schtFreeQueryJob
(
1
);
}
}
struct
SSchJob
*
pInsertJob
=
NULL
;
}
TEST
(
queryTest
,
normalCase
)
{
...
...
@@ -368,11 +594,12 @@ TEST(queryTest, normalCase) {
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
data
;
ASSERT_EQ
(
pRsp
->
completed
,
1
);
ASSERT_EQ
(
pRsp
->
numOfRows
,
10
);
tfree
(
data
);
data
=
NULL
;
code
=
scheduleFetchRows
(
job
,
&
data
);
ASSERT_EQ
(
code
,
0
);
ASSERT_
EQ
(
data
,
(
void
*
)
NULL
);
ASSERT_
TRUE
(
data
);
scheduleFreeJob
(
pJob
);
...
...
@@ -383,7 +610,6 @@ TEST(queryTest, normalCase) {
TEST
(
insertTest
,
normalCase
)
{
void
*
mockPointer
=
(
void
*
)
0x1
;
char
*
clusterId
=
"cluster1"
;
...
...
@@ -427,13 +653,29 @@ TEST(insertTest, normalCase) {
}
TEST
(
multiThread
,
forceFree
)
{
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
schtInitLogFile
();
pthread_t
thread1
,
thread2
,
thread3
;
pthread_create
(
&
(
thread1
),
&
thattr
,
schtRunJobThread
,
NULL
);
pthread_create
(
&
(
thread2
),
&
thattr
,
schtFreeJobThread
,
NULL
);
pthread_create
(
&
(
thread3
),
&
thattr
,
schtFetchRspThread
,
NULL
);
while
(
true
)
{
if
(
schtTestDeadLoop
)
{
sleep
(
1
);
}
else
{
sleep
(
schtTestMTRunSec
);
break
;
}
}
schtTestStop
=
true
;
sleep
(
3
);
}
int
main
(
int
argc
,
char
**
argv
)
{
srand
(
time
(
NULL
));
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
}
...
...
source/libs/transport/inc/transComm.h
0 → 100644
浏览文件 @
1832a509
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef USE_UV
#include <uv.h>
#include "lz4.h"
#include "os.h"
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "tidpool.h"
#include "tmd5.h"
#include "tmempool.h"
#include "tmsg.h"
#include "transportInt.h"
#include "tref.h"
#include "trpc.h"
#include "ttimer.h"
#include "tutil.h"
typedef
void
*
queue
[
2
];
/* Private macros. */
#define QUEUE_NEXT(q) (*(queue**)&((*(q))[0]))
#define QUEUE_PREV(q) (*(queue**)&((*(q))[1]))
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
/* Initialize an empty queue. */
#define QUEUE_INIT(q) \
{ \
QUEUE_NEXT(q) = (q); \
QUEUE_PREV(q) = (q); \
}
/* Return true if the queue has no element. */
#define QUEUE_IS_EMPTY(q) ((const queue*)(q) == (const queue*)QUEUE_NEXT(q))
/* Insert an element at the back of a queue. */
#define QUEUE_PUSH(q, e) \
{ \
QUEUE_NEXT(e) = (q); \
QUEUE_PREV(e) = QUEUE_PREV(q); \
QUEUE_PREV_NEXT(e) = (e); \
QUEUE_PREV(q) = (e); \
}
/* Remove the given element from the queue. Any element can be removed at any *
* time. */
#define QUEUE_REMOVE(e) \
{ \
QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \
QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \
}
/* Return the element at the front of the queue. */
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
/* Return the element at the back of the queue. */
#define QUEUE_TAIL(q) (QUEUE_PREV(q))
/* Iterate over the element of a queue. * Mutating the queue while iterating
* results in undefined behavior. */
#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q))
/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
typedef
struct
{
SRpcInfo
*
pRpc
;
// associated SRpcInfo
SEpSet
epSet
;
// ip list provided by app
void
*
ahandle
;
// handle provided by app
struct
SRpcConn
*
pConn
;
// pConn allocated
tmsg_t
msgType
;
// message type
uint8_t
*
pCont
;
// content provided by app
int32_t
contLen
;
// content length
int32_t
code
;
// error code
int16_t
numOfTry
;
// number of try for different servers
int8_t
oldInUse
;
// server EP inUse passed by app
int8_t
redirect
;
// flag to indicate redirect
int8_t
connType
;
// connection type
int64_t
rid
;
// refId returned by taosAddRef
SRpcMsg
*
pRsp
;
// for synchronous API
tsem_t
*
pSem
;
// for synchronous API
SEpSet
*
pSet
;
// for synchronous API
char
msg
[
0
];
// RpcHead starts from here
}
SRpcReqContext
;
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext))
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U)
int
rpcAuthenticateMsg
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
);
void
rpcBuildAuthHead
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
);
int32_t
rpcCompressRpcMsg
(
char
*
pCont
,
int32_t
contLen
);
SRpcHead
*
rpcDecompressRpcMsg
(
SRpcHead
*
pHead
);
#endif
source/libs/transport/inc/transportInt.h
浏览文件 @
1832a509
...
...
@@ -16,62 +16,61 @@
#ifndef _TD_TRANSPORT_INT_H_
#define _TD_TRANSPORT_INT_H_
#ifdef USE_UV
#include <uv.h>
#endif
#include "lz4.h"
#include "os.h"
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "tidpool.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "ttimer.h"
#include "tutil.h"
#ifdef __cplusplus
extern
"C"
{
#endif
#ifdef USE_UV
#include <stddef.h>
typedef
void
*
queue
[
2
];
/* Private macros. */
#define QUEUE_NEXT(q) (*(queue **)&((*(q))[0]))
#define QUEUE_PREV(q) (*(queue **)&((*(q))[1]))
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
/* Initialize an empty queue. */
#define QUEUE_INIT(q) \
{ \
QUEUE_NEXT(q) = (q); \
QUEUE_PREV(q) = (q); \
}
/* Return true if the queue has no element. */
#define QUEUE_IS_EMPTY(q) ((const queue *)(q) == (const queue *)QUEUE_NEXT(q))
/* Insert an element at the back of a queue. */
#define QUEUE_PUSH(q, e) \
{ \
QUEUE_NEXT(e) = (q); \
QUEUE_PREV(e) = QUEUE_PREV(q); \
QUEUE_PREV_NEXT(e) = (e); \
QUEUE_PREV(q) = (e); \
}
/* Remove the given element from the queue. Any element can be removed at any *
* time. */
#define QUEUE_REMOVE(e) \
{ \
QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \
QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \
}
/* Return the element at the front of the queue. */
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
/* Return the element at the back of the queue. */
#define QUEUE_TAIL(q) (QUEUE_PREV(q))
/* Iterate over the element of a queue. * Mutating the queue while iterating
* results in undefined behavior. */
#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q))
/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type *)((void *)((char *)(e)-offsetof(type, field))))
void
*
taosInitClient
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
void
*
taosInitServer
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
typedef
struct
{
int
sessions
;
// number of sessions allowed
int
numOfThreads
;
// number of threads to process incoming messages
int
idleTime
;
// milliseconds;
uint16_t
localPort
;
int8_t
connType
;
int64_t
index
;
char
label
[
TSDB_LABEL_LEN
];
char
user
[
TSDB_UNI_LEN
];
// meter ID
char
spi
;
// security parameter index
char
encrypt
;
// encrypt algorithm
char
secret
[
TSDB_PASSWORD_LEN
];
// secret for the link
char
ckey
[
TSDB_PASSWORD_LEN
];
// ciphering key
void
(
*
cfp
)(
void
*
parent
,
SRpcMsg
*
,
SEpSet
*
);
int
(
*
afp
)(
void
*
parent
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
int32_t
refCount
;
void
*
parent
;
void
*
idPool
;
// handle to ID pool
void
*
tmrCtrl
;
// handle to timer
SHashObj
*
hash
;
// handle returned by hash utility
void
*
tcphandle
;
// returned handle from TCP initialization
pthread_mutex_t
mutex
;
}
SRpcInfo
;
#endif // USE_LIBUV
...
...
source/libs/transport/src/trans.c
0 → 100644
浏览文件 @
1832a509
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef USE_UV
#include "transComm.h"
typedef
struct
SConnBuffer
{
char
*
buf
;
int
len
;
int
cap
;
int
left
;
}
SConnBuffer
;
void
*
(
*
taosHandle
[])(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
=
{
taosInitServer
,
taosInitClient
};
void
*
rpcOpen
(
const
SRpcInit
*
pInit
)
{
SRpcInfo
*
pRpc
=
calloc
(
1
,
sizeof
(
SRpcInfo
));
if
(
pRpc
==
NULL
)
{
return
NULL
;
}
if
(
pInit
->
label
)
{
tstrncpy
(
pRpc
->
label
,
pInit
->
label
,
strlen
(
pInit
->
label
));
}
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
pRpc
->
connType
=
pInit
->
connType
;
pRpc
->
tcphandle
=
(
*
taosHandle
[
pRpc
->
connType
])(
0
,
pInit
->
localPort
,
pRpc
->
label
,
pRpc
->
numOfThreads
,
NULL
,
pRpc
);
return
pRpc
;
}
void
rpcClose
(
void
*
arg
)
{
return
;
}
void
*
rpcMallocCont
(
int
contLen
)
{
int
size
=
contLen
+
RPC_MSG_OVERHEAD
;
char
*
start
=
(
char
*
)
calloc
(
1
,
(
size_t
)
size
);
if
(
start
==
NULL
)
{
tError
(
"failed to malloc msg, size:%d"
,
size
);
return
NULL
;
}
else
{
tTrace
(
"malloc mem:%p size:%d"
,
start
,
size
);
}
return
start
+
sizeof
(
SRpcReqContext
)
+
sizeof
(
SRpcHead
);
}
void
rpcFreeCont
(
void
*
cont
)
{
return
;
}
void
*
rpcReallocCont
(
void
*
ptr
,
int
contLen
)
{
return
NULL
;
}
void
rpcSendRedirectRsp
(
void
*
pConn
,
const
SEpSet
*
pEpSet
)
{}
int
rpcGetConnInfo
(
void
*
thandle
,
SRpcConnInfo
*
pInfo
)
{
return
-
1
;
}
void
rpcSendRecv
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pReq
,
SRpcMsg
*
pRsp
)
{
return
;
}
int
rpcReportProgress
(
void
*
pConn
,
char
*
pCont
,
int
contLen
)
{
return
-
1
;
}
void
rpcCancelRequest
(
int64_t
rid
)
{
return
;
}
int32_t
rpcInit
(
void
)
{
// impl later
return
-
1
;
}
void
rpcCleanup
(
void
)
{
// impl later
return
;
}
#endif
source/libs/transport/src/transCli.c
0 → 100644
浏览文件 @
1832a509
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef USE_UV
#include "transComm.h"
typedef
struct
SCliConn
{
uv_connect_t
connReq
;
uv_stream_t
*
stream
;
void
*
data
;
queue
conn
;
}
SCliConn
;
typedef
struct
SCliMsg
{
SRpcReqContext
*
context
;
queue
q
;
}
SCliMsg
;
typedef
struct
SCliThrdObj
{
pthread_t
thread
;
uv_loop_t
*
loop
;
uv_async_t
*
cliAsync
;
//
void
*
cache
;
// conn pool
queue
msg
;
pthread_mutex_t
msgMtx
;
void
*
shandle
;
}
SCliThrdObj
;
typedef
struct
SClientObj
{
char
label
[
TSDB_LABEL_LEN
];
int32_t
index
;
int
numOfThreads
;
SCliThrdObj
**
pThreadObj
;
}
SClientObj
;
static
void
clientWriteCb
(
uv_write_t
*
req
,
int
status
);
static
void
clientReadCb
(
uv_stream_t
*
cli
,
ssize_t
nread
,
const
uv_buf_t
*
buf
);
static
void
clientConnCb
(
struct
uv_connect_s
*
req
,
int
status
);
static
void
clientAsyncCb
(
uv_async_t
*
handle
);
static
void
*
clientThread
(
void
*
arg
);
static
void
clientWriteCb
(
uv_write_t
*
req
,
int
status
)
{
// impl later
}
static
void
clientFailedCb
(
uv_handle_t
*
handle
)
{
// impl later
tDebug
(
"close handle"
);
}
static
void
clientReadCb
(
uv_stream_t
*
cli
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
// impl later
}
static
void
clientConnCb
(
struct
uv_connect_s
*
req
,
int
status
)
{
SCliConn
*
pConn
=
req
->
data
;
SCliMsg
*
pMsg
=
pConn
->
data
;
SEpSet
*
pEpSet
=
&
pMsg
->
context
->
epSet
;
char
*
fqdn
=
pEpSet
->
fqdn
[
pEpSet
->
inUse
];
uint32_t
port
=
pEpSet
->
port
[
pEpSet
->
inUse
];
if
(
status
!=
0
)
{
// call user fp later
tError
(
"failed to connect server(%s, %d), errmsg: %s"
,
fqdn
,
port
,
uv_strerror
(
status
));
uv_close
((
uv_handle_t
*
)
req
->
handle
,
clientFailedCb
);
return
;
}
assert
(
pConn
->
stream
==
req
->
handle
);
// impl later
}
static
SCliConn
*
getConnFromCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
)
{
// impl later
return
NULL
;
}
static
void
clientAsyncCb
(
uv_async_t
*
handle
)
{
SCliThrdObj
*
pThrd
=
handle
->
data
;
SCliMsg
*
pMsg
=
NULL
;
pthread_mutex_lock
(
&
pThrd
->
msgMtx
);
if
(
!
QUEUE_IS_EMPTY
(
&
pThrd
->
msg
))
{
queue
*
head
=
QUEUE_HEAD
(
&
pThrd
->
msg
);
pMsg
=
QUEUE_DATA
(
head
,
SCliMsg
,
q
);
QUEUE_REMOVE
(
head
);
}
pthread_mutex_unlock
(
&
pThrd
->
msgMtx
);
SEpSet
*
pEpSet
=
&
pMsg
->
context
->
epSet
;
char
*
fqdn
=
pEpSet
->
fqdn
[
pEpSet
->
inUse
];
uint32_t
port
=
pEpSet
->
port
[
pEpSet
->
inUse
];
SCliConn
*
conn
=
getConnFromCache
(
pThrd
->
cache
,
fqdn
,
port
);
if
(
conn
!=
NULL
)
{
// impl later
}
else
{
SCliConn
*
conn
=
malloc
(
sizeof
(
SCliConn
));
conn
->
stream
=
(
uv_stream_t
*
)
malloc
(
sizeof
(
uv_tcp_t
));
uv_tcp_init
(
pThrd
->
loop
,
(
uv_tcp_t
*
)(
conn
->
stream
));
conn
->
connReq
.
data
=
conn
;
conn
->
data
=
pMsg
;
struct
sockaddr_in
addr
;
uv_ip4_addr
(
fqdn
,
port
,
&
addr
);
// handle error in callback if connect error
uv_tcp_connect
(
&
conn
->
connReq
,
(
uv_tcp_t
*
)(
conn
->
stream
),
(
const
struct
sockaddr
*
)
&
addr
,
clientConnCb
);
}
// SRpcReqContext* pCxt = pMsg->context;
// SRpcHead* pHead = rpcHeadFromCont(pCtx->pCont);
// char* msg = (char*)pHead;
// int len = rpcMsgLenFromCont(pCtx->contLen);
// tmsg_t msgType = pCtx->msgType;
// impl later
}
static
void
*
clientThread
(
void
*
arg
)
{
SCliThrdObj
*
pThrd
=
(
SCliThrdObj
*
)
arg
;
uv_run
(
pThrd
->
loop
,
UV_RUN_DEFAULT
);
}
void
*
taosInitClient
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
SClientObj
*
cli
=
calloc
(
1
,
sizeof
(
SClientObj
));
memcpy
(
cli
->
label
,
label
,
strlen
(
label
));
cli
->
numOfThreads
=
numOfThreads
;
cli
->
pThreadObj
=
(
SCliThrdObj
**
)
calloc
(
cli
->
numOfThreads
,
sizeof
(
SCliThrdObj
*
));
for
(
int
i
=
0
;
i
<
cli
->
numOfThreads
;
i
++
)
{
SCliThrdObj
*
pThrd
=
(
SCliThrdObj
*
)
calloc
(
1
,
sizeof
(
SCliThrdObj
));
QUEUE_INIT
(
&
pThrd
->
msg
);
pthread_mutex_init
(
&
pThrd
->
msgMtx
,
NULL
);
// QUEUE_INIT(&pThrd->clientCache);
pThrd
->
loop
=
(
uv_loop_t
*
)
malloc
(
sizeof
(
uv_loop_t
));
uv_loop_init
(
pThrd
->
loop
);
pThrd
->
cliAsync
=
malloc
(
sizeof
(
uv_async_t
));
uv_async_init
(
pThrd
->
loop
,
pThrd
->
cliAsync
,
clientAsyncCb
);
pThrd
->
cliAsync
->
data
=
pThrd
;
pThrd
->
shandle
=
shandle
;
int
err
=
pthread_create
(
&
pThrd
->
thread
,
NULL
,
clientThread
,
(
void
*
)(
pThrd
));
if
(
err
==
0
)
{
tDebug
(
"sucess to create tranport-client thread %d"
,
i
);
}
cli
->
pThreadObj
[
i
]
=
pThrd
;
}
return
cli
;
}
void
rpcSendRequest
(
void
*
shandle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
int64_t
*
pRid
)
{
// impl later
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
shandle
;
int
len
=
rpcCompressRpcMsg
(
pMsg
->
pCont
,
pMsg
->
contLen
);
SRpcReqContext
*
pContext
;
pContext
=
(
SRpcReqContext
*
)((
char
*
)
pMsg
->
pCont
-
sizeof
(
SRpcHead
)
-
sizeof
(
SRpcReqContext
));
pContext
->
ahandle
=
pMsg
->
ahandle
;
pContext
->
pRpc
=
(
SRpcInfo
*
)
shandle
;
pContext
->
epSet
=
*
pEpSet
;
pContext
->
contLen
=
len
;
pContext
->
pCont
=
pMsg
->
pCont
;
pContext
->
msgType
=
pMsg
->
msgType
;
pContext
->
oldInUse
=
pEpSet
->
inUse
;
assert
(
pRpc
->
connType
==
TAOS_CONN_CLIENT
);
// atomic or not
int64_t
index
=
pRpc
->
index
;
if
(
pRpc
->
index
++
>=
pRpc
->
numOfThreads
)
{
pRpc
->
index
=
0
;
}
SCliMsg
*
msg
=
malloc
(
sizeof
(
SCliMsg
));
msg
->
context
=
pContext
;
SCliThrdObj
*
thrd
=
((
SClientObj
*
)
pRpc
->
tcphandle
)
->
pThreadObj
[
index
%
pRpc
->
numOfThreads
];
pthread_mutex_lock
(
&
thrd
->
msgMtx
);
QUEUE_PUSH
(
&
thrd
->
msg
,
&
msg
->
q
);
pthread_mutex_unlock
(
&
thrd
->
msgMtx
);
uv_async_send
(
thrd
->
cliAsync
);
}
#endif
source/libs/transport/src/transComm.c
0 → 100644
浏览文件 @
1832a509
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef USE_UV
#include "transComm.h"
int
rpcAuthenticateMsg
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
)
{
T_MD5_CTX
context
;
int
ret
=
-
1
;
tMD5Init
(
&
context
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_PASSWORD_LEN
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pMsg
,
msgLen
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_PASSWORD_LEN
);
tMD5Final
(
&
context
);
if
(
memcmp
(
context
.
digest
,
pAuth
,
sizeof
(
context
.
digest
))
==
0
)
ret
=
0
;
return
ret
;
}
void
rpcBuildAuthHead
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
)
{
T_MD5_CTX
context
;
tMD5Init
(
&
context
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_PASSWORD_LEN
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pMsg
,
msgLen
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_PASSWORD_LEN
);
tMD5Final
(
&
context
);
memcpy
(
pAuth
,
context
.
digest
,
sizeof
(
context
.
digest
));
}
int32_t
rpcCompressRpcMsg
(
char
*
pCont
,
int32_t
contLen
)
{
SRpcHead
*
pHead
=
rpcHeadFromCont
(
pCont
);
int32_t
finalLen
=
0
;
int
overhead
=
sizeof
(
SRpcComp
);
if
(
!
NEEDTO_COMPRESSS_MSG
(
contLen
))
{
return
contLen
;
}
char
*
buf
=
malloc
(
contLen
+
overhead
+
8
);
// 8 extra bytes
if
(
buf
==
NULL
)
{
tError
(
"failed to allocate memory for rpc msg compression, contLen:%d"
,
contLen
);
return
contLen
;
}
int32_t
compLen
=
LZ4_compress_default
(
pCont
,
buf
,
contLen
,
contLen
+
overhead
);
tDebug
(
"compress rpc msg, before:%d, after:%d, overhead:%d"
,
contLen
,
compLen
,
overhead
);
/*
* only the compressed size is less than the value of contLen - overhead, the compression is applied
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
*/
if
(
compLen
>
0
&&
compLen
<
contLen
-
overhead
)
{
SRpcComp
*
pComp
=
(
SRpcComp
*
)
pCont
;
pComp
->
reserved
=
0
;
pComp
->
contLen
=
htonl
(
contLen
);
memcpy
(
pCont
+
overhead
,
buf
,
compLen
);
pHead
->
comp
=
1
;
tDebug
(
"compress rpc msg, before:%d, after:%d"
,
contLen
,
compLen
);
finalLen
=
compLen
+
overhead
;
}
else
{
finalLen
=
contLen
;
}
free
(
buf
);
return
finalLen
;
}
SRpcHead
*
rpcDecompressRpcMsg
(
SRpcHead
*
pHead
)
{
int
overhead
=
sizeof
(
SRpcComp
);
SRpcHead
*
pNewHead
=
NULL
;
uint8_t
*
pCont
=
pHead
->
content
;
SRpcComp
*
pComp
=
(
SRpcComp
*
)
pHead
->
content
;
if
(
pHead
->
comp
)
{
// decompress the content
assert
(
pComp
->
reserved
==
0
);
int
contLen
=
htonl
(
pComp
->
contLen
);
// prepare the temporary buffer to decompress message
char
*
temp
=
(
char
*
)
malloc
(
contLen
+
RPC_MSG_OVERHEAD
);
pNewHead
=
(
SRpcHead
*
)(
temp
+
sizeof
(
SRpcReqContext
));
// reserve SRpcReqContext
if
(
pNewHead
)
{
int
compLen
=
rpcContLenFromMsg
(
pHead
->
msgLen
)
-
overhead
;
int
origLen
=
LZ4_decompress_safe
((
char
*
)(
pCont
+
overhead
),
(
char
*
)
pNewHead
->
content
,
compLen
,
contLen
);
assert
(
origLen
==
contLen
);
memcpy
(
pNewHead
,
pHead
,
sizeof
(
SRpcHead
));
pNewHead
->
msgLen
=
rpcMsgLenFromCont
(
origLen
);
/// rpcFreeMsg(pHead); // free the compressed message buffer
pHead
=
pNewHead
;
tTrace
(
"decomp malloc mem:%p"
,
temp
);
}
else
{
tError
(
"failed to allocate memory to decompress msg, contLen:%d"
,
contLen
);
}
}
return
pHead
;
}
#endif
source/libs/transport/src/trans
port
.c
→
source/libs/transport/src/trans
Srv
.c
浏览文件 @
1832a509
此差异已折叠。
点击以展开。
source/libs/transport/test/rclient.c
浏览文件 @
1832a509
...
...
@@ -34,7 +34,8 @@ typedef struct {
static
void
processResponse
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SInfo
*
pInfo
=
(
SInfo
*
)
pMsg
->
ahandle
;
tDebug
(
"thread:%d, response is received, type:%d contLen:%d code:0x%x"
,
pInfo
->
index
,
pMsg
->
msgType
,
pMsg
->
contLen
,
pMsg
->
code
);
tDebug
(
"thread:%d, response is received, type:%d contLen:%d code:0x%x"
,
pInfo
->
index
,
pMsg
->
msgType
,
pMsg
->
contLen
,
pMsg
->
code
);
if
(
pEpSet
)
pInfo
->
epSet
=
*
pEpSet
;
...
...
@@ -185,7 +186,8 @@ int main(int argc, char *argv[]) {
// float usedTime = (endTime - startTime) / 1000.0f; // mseconds
// tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
// tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
// tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime,
// msgSize);
int
ch
=
getchar
();
UNUSED
(
ch
);
...
...
source/libs/transport/test/transportTests.cc
浏览文件 @
1832a509
...
...
@@ -22,6 +22,7 @@
#include <thread>
#include <vector>
#include "transComm.h"
#include "transportInt.h"
#include "trpc.h"
...
...
@@ -46,7 +47,7 @@ class QueueObj {
if
(
!
IsEmpty
())
{
queue
*
h
=
QUEUE_HEAD
(
&
head
);
el
=
QUEUE_DATA
(
h
,
QueueElem
,
q
);
QUEUE_REMOVE
(
&
el
->
q
);
QUEUE_REMOVE
(
h
);
}
return
el
;
}
...
...
source/util/src/terror.c
浏览文件 @
1832a509
...
...
@@ -360,6 +360,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLING, "Task cancelling")
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_DROPPING
,
"Task dropping"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_DUPLICATTED_OPERATION
,
"Duplicatted operation"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_MSG_ERROR
,
"Task message error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_JOB_FREED
,
"Job already freed"
)
...
...
tests/script/sh/massiveTable/deployCluster.sh
浏览文件 @
1832a509
...
...
@@ -6,19 +6,19 @@ set -e
#set -x
# deployCluster.sh
curr_dir
=
$(
readlink
-f
"
$(
dirname
"
$0
"
)
"
)
echo
$curr_dir
curr_dir
=
$(
pwd
)
${
curr_dir
}
/cleanCluster.sh
-r
"/data"
${
curr_dir
}
/cleanCluster.sh
-r
"/data2"
source
./cleanCluster.sh
-r
/data
source
./cleanCluster.sh
-r
/data2
${
curr_dir
}
/compileVersion.sh
-r
${
curr_dir
}
/../../../../
-v
"3.0"
source
./compileVersion.sh
-r
${
curr_dir
}
/../../../../
-v
"3.0"
${
curr_dir
}
/setupDnodes.sh
-r
"/data"
-n
1
-f
"trd02:7000"
-p
7000
${
curr_dir
}
/setupDnodes.sh
-r
"/data2"
-n
1
-f
"trd02:7000"
-p
8000
source
./setupDnodes.sh
-r
/data
-n
1
-f
trd02:7000
-p
7000
source
./setupDnodes.sh
-r
/data2
-n
1
-f
trd02:7000
-p
8000
#source ./setupDnodes.sh -r /data -n 2 -f trd02:7000 -p 7000
#source ./setupDnodes.sh -r /data2 -n 2 -f trd02:7000 -p 8000
#./setupDnodes.sh -r "/data" -n 2 -f trd02:7000 -p 7000
#./setupDnodes.sh -r "/data2" -n 2 -f trd02:7000 -p 8000
...
...
tests/script/sh/massiveTable/setupDnodes.sh
浏览文件 @
1832a509
...
...
@@ -94,7 +94,7 @@ createNewDnodesDataDir() {
mkdir
-p
${
dataRootDir
}
/dnode_
${
i
}
/data
createNewCfgFile
${
dataRootDir
}
/dnode_
${
i
}
/cfg
${
dataRootDir
}
/dnode_
${
i
}
/data
${
dataRootDir
}
/dnode_
${
i
}
/log
${
firstEp
}
${
serverPort
}
echo
"create dnode:
${
serverPort
}
,
${
dataRootDir
}
/dnode_
${
i
}
"
#
echo "create dnode: ${serverPort}, ${dataRootDir}/dnode_${i}"
serverPort
=
$((
10
#${serverPort}+100))
done
}
...
...
@@ -121,7 +121,7 @@ startDnodes() {
############################### main process ##########################################
## kill all taosd process
kill_process taosd
#
kill_process taosd
## create director for all dnode
if
[[
"
$enviMode
"
==
"new"
]]
;
then
...
...
@@ -131,6 +131,7 @@ fi
## start all dnode by nohup
startDnodes
${
dnodeNumber
}
echo
" run setupDnodes.sh end !!!"
echo
"====run setupDnodes.sh end===="
echo
" "
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录