Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
407db99a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
407db99a
编写于
6月 30, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feature/queryredirect' of
https://github.com/taosdata/TDengine
into redir
上级
25b3f88e
47bc3687
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
302 addition
and
124 deletion
+302
-124
include/common/tmsg.h
include/common/tmsg.h
+6
-0
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+1
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+2
-2
source/common/src/tmsg.c
source/common/src/tmsg.c
+2
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+18
-17
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+1
-0
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+7
-0
source/libs/qworker/inc/qwInt.h
source/libs/qworker/inc/qwInt.h
+27
-20
source/libs/qworker/src/qwDbg.c
source/libs/qworker/src/qwDbg.c
+58
-1
source/libs/qworker/src/qwMsg.c
source/libs/qworker/src/qwMsg.c
+48
-0
source/libs/qworker/src/qwUtil.c
source/libs/qworker/src/qwUtil.c
+14
-14
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+33
-4
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+14
-13
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+62
-47
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+9
-6
未找到文件。
include/common/tmsg.h
浏览文件 @
407db99a
...
...
@@ -1494,6 +1494,7 @@ typedef struct SSubQueryMsg {
uint64_t
queryId
;
uint64_t
taskId
;
int64_t
refId
;
int32_t
execId
;
int8_t
taskType
;
int8_t
explain
;
uint32_t
sqlLen
;
// the query sql,
...
...
@@ -1513,6 +1514,7 @@ typedef struct {
uint64_t
sId
;
uint64_t
queryId
;
uint64_t
taskId
;
int32_t
execId
;
}
SQueryContinueReq
;
typedef
struct
{
...
...
@@ -1534,6 +1536,7 @@ typedef struct {
uint64_t
sId
;
uint64_t
queryId
;
uint64_t
taskId
;
int32_t
execId
;
}
SResFetchReq
;
typedef
struct
{
...
...
@@ -1545,6 +1548,7 @@ typedef struct {
uint64_t
queryId
;
uint64_t
taskId
;
int64_t
refId
;
int32_t
execId
;
int8_t
status
;
}
STaskStatus
;
...
...
@@ -1590,6 +1594,7 @@ typedef struct {
uint64_t
queryId
;
uint64_t
taskId
;
int64_t
refId
;
int32_t
execId
;
}
STaskCancelReq
;
typedef
struct
{
...
...
@@ -1602,6 +1607,7 @@ typedef struct {
uint64_t
queryId
;
uint64_t
taskId
;
int64_t
refId
;
int32_t
execId
;
}
STaskDropReq
;
typedef
struct
{
...
...
include/libs/nodes/plannodes.h
浏览文件 @
407db99a
...
...
@@ -346,6 +346,7 @@ typedef struct SDownstreamSourceNode {
SQueryNodeAddr
addr
;
uint64_t
taskId
;
uint64_t
schedId
;
int32_t
execId
;
}
SDownstreamSourceNode
;
typedef
struct
SExchangePhysiNode
{
...
...
source/client/src/clientImpl.c
浏览文件 @
407db99a
...
...
@@ -1294,10 +1294,10 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
*/
int32_t
elapsed
=
pRequest
->
metric
.
rsp
-
pRequest
->
metric
.
start
;
if
(
pMsg
->
code
==
TSDB_CODE_SUCCESS
)
{
tscDebug
(
"0x%"
PRIx64
"
message
:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"
PRIx64
,
pRequest
->
self
,
tscDebug
(
"0x%"
PRIx64
"
rsp msg
:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"
PRIx64
,
pRequest
->
self
,
TMSG_INFO
(
pMsg
->
msgType
),
tstrerror
(
pMsg
->
code
),
pMsg
->
contLen
,
elapsed
/
1000
,
pRequest
->
requestId
);
}
else
{
tscError
(
"0x%"
PRIx64
"
SQL cmd
:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"
PRIx64
,
pRequest
->
self
,
tscError
(
"0x%"
PRIx64
"
rsp msg
:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"
PRIx64
,
pRequest
->
self
,
TMSG_INFO
(
pMsg
->
msgType
),
tstrerror
(
pMsg
->
code
),
pMsg
->
contLen
,
elapsed
/
1000
,
pRequest
->
requestId
);
}
...
...
source/common/src/tmsg.c
浏览文件 @
407db99a
...
...
@@ -4312,6 +4312,7 @@ int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pR
if
(
tEncodeU64
(
&
encoder
,
status
->
queryId
)
<
0
)
return
-
1
;
if
(
tEncodeU64
(
&
encoder
,
status
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
status
->
refId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
status
->
execId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
status
->
status
)
<
0
)
return
-
1
;
}
}
else
{
...
...
@@ -4342,6 +4343,7 @@ int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *
if
(
tDecodeU64
(
&
decoder
,
&
status
.
queryId
)
<
0
)
return
-
1
;
if
(
tDecodeU64
(
&
decoder
,
&
status
.
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
status
.
refId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
status
.
execId
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
status
.
status
)
<
0
)
return
-
1
;
taosArrayPush
(
pRsp
->
taskStatus
,
&
status
);
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
407db99a
...
...
@@ -2010,13 +2010,14 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
ASSERT
(
pDataInfo
->
status
==
EX_SOURCE_DATA_NOT_READY
);
qDebug
(
"%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%"
PRIx64
", %d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
addr
.
epSet
.
eps
[
0
].
fqdn
,
pSource
->
taskId
,
sourceIndex
,
totalSources
);
qDebug
(
"%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%"
PRIx64
",
execId:%d,
%d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
addr
.
epSet
.
eps
[
0
].
fqdn
,
pSource
->
taskId
,
pSource
->
execId
,
sourceIndex
,
totalSources
);
pMsg
->
header
.
vgId
=
htonl
(
pSource
->
addr
.
nodeId
);
pMsg
->
sId
=
htobe64
(
pSource
->
schedId
);
pMsg
->
taskId
=
htobe64
(
pSource
->
taskId
);
pMsg
->
queryId
=
htobe64
(
pTaskInfo
->
id
.
queryId
);
pMsg
->
execId
=
htonl
(
pSource
->
execId
);
// send the fetch remote task result reques
SMsgSendInfo
*
pMsgSendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
...
...
@@ -2145,9 +2146,9 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
SSDataBlock
*
pRes
=
pExchangeInfo
->
pResult
;
SLoadRemoteDataInfo
*
pLoadInfo
=
&
pExchangeInfo
->
loadInfo
;
if
(
pRsp
->
numOfRows
==
0
)
{
qDebug
(
"%s vgId:%d, taskId:0x%"
PRIx64
" index:%d completed, rowsOfSource:%"
PRIu64
", totalRows:%"
PRIu64
qDebug
(
"%s vgId:%d, taskId:0x%"
PRIx64
"
execId:%d
index:%d completed, rowsOfSource:%"
PRIu64
", totalRows:%"
PRIu64
", completed:%d try next %d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
i
,
pDataInfo
->
totalRows
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pSource
->
execId
,
i
,
pDataInfo
->
totalRows
,
pExchangeInfo
->
loadInfo
.
totalRows
,
completed
+
1
,
i
+
1
,
totalSources
);
pDataInfo
->
status
=
EX_SOURCE_DATA_EXHAUSTED
;
completed
+=
1
;
...
...
@@ -2165,17 +2166,17 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
}
if
(
pRsp
->
completed
==
1
)
{
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" execId:%d"
" index:%d completed, numOfRows:%d, rowsOfSource:%"
PRIu64
", totalRows:%"
PRIu64
", totalBytes:%"
PRIu64
", completed:%d try next %d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
i
,
pRes
->
info
.
rows
,
pDataInfo
->
totalRows
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pSource
->
execId
,
i
,
pRes
->
info
.
rows
,
pDataInfo
->
totalRows
,
pLoadInfo
->
totalRows
,
pLoadInfo
->
totalSize
,
completed
+
1
,
i
+
1
,
totalSources
);
completed
+=
1
;
pDataInfo
->
status
=
EX_SOURCE_DATA_EXHAUSTED
;
}
else
{
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" numOfRows:%d, totalRows:%"
PRIu64
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
"
execId:%d
numOfRows:%d, totalRows:%"
PRIu64
", totalBytes:%"
PRIu64
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pRes
->
info
.
rows
,
pLoadInfo
->
totalRows
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
p
Source
->
execId
,
p
Res
->
info
.
rows
,
pLoadInfo
->
totalRows
,
pLoadInfo
->
totalSize
);
}
...
...
@@ -2249,8 +2250,8 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
SDownstreamSourceNode
*
pSource
=
taosArrayGet
(
pExchangeInfo
->
pSources
,
pExchangeInfo
->
current
);
if
(
pDataInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s vgId:%d, taskID:0x%"
PRIx64
" error happens, code:%s"
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
tstrerror
(
pDataInfo
->
code
));
qError
(
"%s vgId:%d, taskID:0x%"
PRIx64
" e
xecId:%d e
rror happens, code:%s"
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pSource
->
execId
,
tstrerror
(
pDataInfo
->
code
));
pOperator
->
pTaskInfo
->
code
=
pDataInfo
->
code
;
return
NULL
;
}
...
...
@@ -2258,9 +2259,9 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
SRetrieveTableRsp
*
pRsp
=
pDataInfo
->
pRsp
;
SLoadRemoteDataInfo
*
pLoadInfo
=
&
pExchangeInfo
->
loadInfo
;
if
(
pRsp
->
numOfRows
==
0
)
{
qDebug
(
"%s vgId:%d, taskID:0x%"
PRIx64
" %d of total completed, rowsOfSource:%"
PRIu64
", totalRows:%"
PRIu64
qDebug
(
"%s vgId:%d, taskID:0x%"
PRIx64
"
execId:%d
%d of total completed, rowsOfSource:%"
PRIu64
", totalRows:%"
PRIu64
" try next"
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pExchangeInfo
->
current
+
1
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
p
Source
->
execId
,
p
ExchangeInfo
->
current
+
1
,
pDataInfo
->
totalRows
,
pLoadInfo
->
totalRows
);
pDataInfo
->
status
=
EX_SOURCE_DATA_EXHAUSTED
;
...
...
@@ -2276,17 +2277,17 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
pTableRsp
->
compLen
,
pTableRsp
->
numOfCols
,
startTs
,
&
pDataInfo
->
totalRows
,
NULL
);
if
(
pRsp
->
completed
==
1
)
{
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" numOfRows:%d, rowsOfSource:%"
PRIu64
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
"
execId:%d
numOfRows:%d, rowsOfSource:%"
PRIu64
", totalRows:%"
PRIu64
", totalBytes:%"
PRIu64
" try next %d/%"
PRIzu
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pRes
->
info
.
rows
,
pDataInfo
->
totalRows
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
p
Source
->
execId
,
p
Res
->
info
.
rows
,
pDataInfo
->
totalRows
,
pLoadInfo
->
totalRows
,
pLoadInfo
->
totalSize
,
pExchangeInfo
->
current
+
1
,
totalSources
);
pDataInfo
->
status
=
EX_SOURCE_DATA_EXHAUSTED
;
pExchangeInfo
->
current
+=
1
;
}
else
{
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
" numOfRows:%d, totalRows:%"
PRIu64
qDebug
(
"%s fetch msg rsp from vgId:%d, taskId:0x%"
PRIx64
"
execId:%d
numOfRows:%d, totalRows:%"
PRIu64
", totalBytes:%"
PRIu64
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
pRes
->
info
.
rows
,
pLoadInfo
->
totalRows
,
GET_TASKID
(
pTaskInfo
),
pSource
->
addr
.
nodeId
,
pSource
->
taskId
,
p
Source
->
execId
,
p
Res
->
info
.
rows
,
pLoadInfo
->
totalRows
,
pLoadInfo
->
totalSize
);
}
...
...
@@ -2378,7 +2379,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
}
for
(
int32_t
i
=
0
;
i
<
numOfSources
;
++
i
)
{
S
NodeListNode
*
pNode
=
(
SNodeList
Node
*
)
nodesListGetNode
((
SNodeList
*
)
pExNode
->
pSrcEndPoints
,
i
);
S
DownstreamSourceNode
*
pNode
=
(
SDownstreamSource
Node
*
)
nodesListGetNode
((
SNodeList
*
)
pExNode
->
pSrcEndPoints
,
i
);
taosArrayPush
(
pInfo
->
pSources
,
pNode
);
}
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
407db99a
...
...
@@ -582,6 +582,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre
COPY_OBJECT_FIELD
(
addr
,
sizeof
(
SQueryNodeAddr
));
COPY_SCALAR_FIELD
(
taskId
);
COPY_SCALAR_FIELD
(
schedId
);
COPY_SCALAR_FIELD
(
execId
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
407db99a
...
...
@@ -3430,6 +3430,7 @@ static int32_t jsonToSlotDescNode(const SJson* pJson, void* pObj) {
static
const
char
*
jkDownstreamSourceAddr
=
"Addr"
;
static
const
char
*
jkDownstreamSourceTaskId
=
"TaskId"
;
static
const
char
*
jkDownstreamSourceSchedId
=
"SchedId"
;
static
const
char
*
jkDownstreamSourceExecId
=
"ExecId"
;
static
int32_t
downstreamSourceNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SDownstreamSourceNode
*
pNode
=
(
const
SDownstreamSourceNode
*
)
pObj
;
...
...
@@ -3441,6 +3442,9 @@ static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkDownstreamSourceSchedId
,
pNode
->
schedId
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkDownstreamSourceExecId
,
pNode
->
execId
);
}
return
code
;
}
...
...
@@ -3455,6 +3459,9 @@ static int32_t jsonToDownstreamSourceNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetUBigIntValue
(
pJson
,
jkDownstreamSourceSchedId
,
&
pNode
->
schedId
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkDownstreamSourceExecId
,
&
pNode
->
execId
);
}
return
code
;
}
...
...
source/libs/qworker/inc/qwInt.h
浏览文件 @
407db99a
...
...
@@ -75,6 +75,7 @@ typedef struct SQWDebug {
bool
lockEnable
;
bool
statusEnable
;
bool
dumpEnable
;
bool
tmp
;
}
SQWDebug
;
extern
SQWDebug
gQWDebug
;
...
...
@@ -122,6 +123,7 @@ typedef struct SQWTaskCtx {
int8_t
taskType
;
int8_t
explain
;
int32_t
queryType
;
int32_t
execId
;
bool
queryFetched
;
bool
queryEnd
;
...
...
@@ -200,8 +202,8 @@ typedef struct SQWorkerMgmt {
int32_t
paramIdx
;
}
SQWorkerMgmt
;
#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId
#define QW_IDS() sId, qId, tId, rId
#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId
, int32_t eId
#define QW_IDS() sId, qId, tId, rId
, eId
#define QW_FPARAMS() mgmt, QW_IDS()
#define QW_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n)
...
...
@@ -226,15 +228,18 @@ typedef struct SQWorkerMgmt {
#define QW_TASK_READY(status) \
(status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || \
status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
#define QW_SET_QTID(id, qId, tId) \
do { \
*(uint64_t *)(id) = (qId); \
*(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); \
#define QW_SET_QTID(id, qId, tId, eId) \
do { \
*(uint64_t *)(id) = (qId); \
*(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); \
*(int32_t *)((char *)(id) + sizeof(qId) + sizeof(tId)) = (eId); \
} while (0)
#define QW_GET_QTID(id, qId, tId) \
do { \
(qId) = *(uint64_t *)(id); \
(tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); \
#define QW_GET_QTID(id, qId, tId, eId) \
do { \
(qId) = *(uint64_t *)(id); \
(tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); \
(eId) = *(int32_t *)((char *)(id) + sizeof(qId) + sizeof(tId)); \
} while (0)
#define QW_ERR_RET(c) \
...
...
@@ -276,22 +281,22 @@ typedef struct SQWorkerMgmt {
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("Q
W:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, t
Id, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("Q
W:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, t
Id, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("Q
W:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, t
Id, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("Q
ID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, e
Id, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("Q
ID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, e
Id, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("Q
ID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, e
Id, __VA_ARGS__)
#define QW_TASK_DLOGL(param, ...) \
qDebugL("Q
W:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, t
Id, __VA_ARGS__)
qDebugL("Q
ID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, e
Id, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) qError("Q
W:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, t
Id)
#define QW_TASK_WLOG_E(param) qWarn("Q
W:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, t
Id)
#define QW_TASK_DLOG_E(param) qDebug("Q
W:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, t
Id)
#define QW_TASK_ELOG_E(param) qError("Q
ID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, e
Id)
#define QW_TASK_WLOG_E(param) qWarn("Q
ID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, e
Id)
#define QW_TASK_DLOG_E(param) qDebug("Q
ID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, tId, e
Id)
#define QW_SCH_TASK_ELOG(param, ...) \
qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 "
" param, mgmt, sId, qId, t
Id, __VA_ARGS__)
qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 "
,EID:%d " param, mgmt, sId, qId, tId, e
Id, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) \
qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 "
" param, mgmt, sId, qId, t
Id, __VA_ARGS__)
qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 "
,EID:%d " param, mgmt, sId, qId, tId, e
Id, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) \
qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 "
" param, mgmt, sId, qId, t
Id, __VA_ARGS__)
qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 "
,EID:%d " param, mgmt, sId, qId, tId, e
Id, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) \
do { \
...
...
@@ -365,6 +370,8 @@ void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
void
qwDbgDumpMgmtInfo
(
SQWorker
*
mgmt
);
int32_t
qwDbgValidateStatus
(
QW_FPARAMS_DEF
,
int8_t
oriStatus
,
int8_t
newStatus
,
bool
*
ignore
);
int32_t
qwDbgBuildAndSendRedirectRsp
(
int32_t
rspType
,
SRpcHandleInfo
*
pConn
,
int32_t
code
,
SEpSet
*
pEpSet
);
int32_t
qwAddTaskCtx
(
QW_FPARAMS_DEF
);
#ifdef __cplusplus
...
...
source/libs/qworker/src/qwDbg.c
浏览文件 @
407db99a
...
...
@@ -9,7 +9,7 @@
#include "tmsg.h"
#include "tname.h"
SQWDebug
gQWDebug
=
{.
statusEnable
=
true
,
.
dumpEnable
=
false
};
SQWDebug
gQWDebug
=
{.
statusEnable
=
true
,
.
dumpEnable
=
false
,
.
tmp
=
true
};
int32_t
qwDbgValidateStatus
(
QW_FPARAMS_DEF
,
int8_t
oriStatus
,
int8_t
newStatus
,
bool
*
ignore
)
{
if
(
!
gQWDebug
.
statusEnable
)
{
...
...
@@ -121,3 +121,60 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
}
int32_t
qwDbgBuildAndSendRedirectRsp
(
int32_t
rspType
,
SRpcHandleInfo
*
pConn
,
int32_t
code
,
SEpSet
*
pEpSet
)
{
int32_t
contLen
=
0
;
char
*
rsp
=
NULL
;
if
(
pEpSet
)
{
contLen
=
tSerializeSEpSet
(
NULL
,
0
,
pEpSet
);
rsp
=
rpcMallocCont
(
contLen
);
tSerializeSEpSet
(
rsp
,
contLen
,
pEpSet
);
}
SRpcMsg
rpcRsp
=
{
.
msgType
=
rspType
,
.
pCont
=
rsp
,
.
contLen
=
contLen
,
.
code
=
code
,
.
info
=
*
pConn
,
};
tmsgSendRsp
(
&
rpcRsp
);
qDebug
(
"response %s msg, code: %s"
,
TMSG_INFO
(
rspType
),
tstrerror
(
code
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwDbgEnableDebug
(
char
*
option
)
{
if
(
0
==
strcasecmp
(
option
,
"lock"
))
{
gQWDebug
.
lockEnable
=
true
;
qDebug
(
"qw lock debug enabled"
);
return
TSDB_CODE_SUCCESS
;
}
if
(
0
==
strcasecmp
(
option
,
"status"
))
{
gQWDebug
.
statusEnable
=
true
;
qDebug
(
"qw status debug enabled"
);
return
TSDB_CODE_SUCCESS
;
}
if
(
0
==
strcasecmp
(
option
,
"dump"
))
{
gQWDebug
.
dumpEnable
=
true
;
qDebug
(
"qw dump debug enabled"
);
return
TSDB_CODE_SUCCESS
;
}
if
(
0
==
strcasecmp
(
option
,
"tmp"
))
{
gQWDebug
.
tmp
=
true
;
qDebug
(
"qw tmp debug enabled"
);
return
TSDB_CODE_SUCCESS
;
}
qError
(
"invalid qw debug option:%s"
,
option
);
return
TSDB_CODE_APP_ERROR
;
}
source/libs/qworker/src/qwMsg.c
浏览文件 @
407db99a
...
...
@@ -156,6 +156,41 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendDropMsg
(
QW_FPARAMS_DEF
,
SRpcHandleInfo
*
pConn
)
{
STaskDropReq
*
req
=
(
STaskDropReq
*
)
rpcMallocCont
(
sizeof
(
STaskDropReq
));
if
(
NULL
==
req
)
{
QW_SCH_TASK_ELOG
(
"rpcMallocCont %d failed"
,
(
int32_t
)
sizeof
(
STaskDropReq
));
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
req
->
header
.
vgId
=
mgmt
->
nodeId
;
req
->
sId
=
sId
;
req
->
queryId
=
qId
;
req
->
taskId
=
tId
;
req
->
refId
=
rId
;
req
->
execId
=
eId
;
SRpcMsg
pNewMsg
=
{
.
msgType
=
TDMT_SCH_DROP_TASK
,
.
pCont
=
req
,
.
contLen
=
sizeof
(
STaskDropReq
),
.
code
=
0
,
.
info
=
*
pConn
,
};
int32_t
code
=
tmsgPutToQueue
(
&
mgmt
->
msgCb
,
FETCH_QUEUE
,
&
pNewMsg
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
QW_SCH_TASK_ELOG
(
"put drop task msg to queue failed, vgId:%d, code:%s"
,
mgmt
->
nodeId
,
tstrerror
(
code
));
rpcFreeCont
(
req
);
QW_ERR_RET
(
code
);
}
QW_SCH_TASK_DLOG
(
"drop task msg put to queue, vgId:%d"
,
mgmt
->
nodeId
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendCQueryMsg
(
QW_FPARAMS_DEF
,
SRpcHandleInfo
*
pConn
)
{
SQueryContinueReq
*
req
=
(
SQueryContinueReq
*
)
rpcMallocCont
(
sizeof
(
SQueryContinueReq
));
if
(
NULL
==
req
)
{
...
...
@@ -167,6 +202,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
req
->
sId
=
sId
;
req
->
queryId
=
qId
;
req
->
taskId
=
tId
;
req
->
execId
=
eId
;
SRpcMsg
pNewMsg
=
{
.
msgType
=
TDMT_SCH_QUERY_CONTINUE
,
...
...
@@ -266,6 +302,7 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
msg
->
refId
=
be64toh
(
msg
->
refId
);
msg
->
execId
=
ntohl
(
msg
->
execId
);
msg
->
phyLen
=
ntohl
(
msg
->
phyLen
);
msg
->
sqlLen
=
ntohl
(
msg
->
sqlLen
);
...
...
@@ -273,6 +310,7 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
msg
->
refId
;
int32_t
eId
=
msg
->
execId
;
SQWMsg
qwMsg
=
{.
msg
=
msg
->
msg
+
msg
->
sqlLen
,
.
msgLen
=
msg
->
phyLen
,
.
connInfo
=
pMsg
->
info
};
...
...
@@ -295,6 +333,7 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
msg
->
refId
;
int32_t
eId
=
msg
->
execId
;
QW_SCH_TASK_DLOG
(
"Abort prerocessQuery start, handle:%p"
,
pMsg
->
info
.
handle
);
qwAbortPrerocessQuery
(
QW_FPARAMS
());
...
...
@@ -324,6 +363,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
msg
->
refId
;
int32_t
eId
=
msg
->
execId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
msg
->
msg
+
msg
->
sqlLen
,
.
msgLen
=
msg
->
phyLen
,
.
connInfo
=
pMsg
->
info
,
.
msgType
=
pMsg
->
msgType
};
char
*
sql
=
strndup
(
msg
->
msg
,
msg
->
sqlLen
);
...
...
@@ -356,6 +396,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
0
;
int32_t
eId
=
msg
->
execId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
connInfo
=
pMsg
->
info
};
...
...
@@ -387,11 +428,13 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
msg
->
execId
=
ntohl
(
msg
->
execId
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
0
;
int32_t
eId
=
msg
->
execId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
connInfo
=
pMsg
->
info
};
...
...
@@ -437,11 +480,13 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
msg
->
refId
=
be64toh
(
msg
->
refId
);
msg
->
execId
=
ntohl
(
msg
->
execId
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
msg
->
refId
;
int32_t
eId
=
msg
->
execId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
connInfo
=
pMsg
->
info
};
...
...
@@ -476,11 +521,13 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
msg
->
refId
=
be64toh
(
msg
->
refId
);
msg
->
execId
=
ntohl
(
msg
->
execId
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
msg
->
refId
;
int32_t
eId
=
msg
->
execId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
code
=
pMsg
->
code
,
.
connInfo
=
pMsg
->
info
};
...
...
@@ -553,6 +600,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SR
uint64_t
qId
=
req
.
queryId
;
uint64_t
tId
=
req
.
taskId
;
int64_t
rId
=
0
;
int32_t
eId
=
-
1
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
req
.
msg
,
.
msgLen
=
req
.
phyLen
,
.
connInfo
=
pMsg
->
info
};
QW_SCH_TASK_DLOG
(
"processDelete start, node:%p, handle:%p, sql:%s"
,
node
,
pMsg
->
info
.
handle
,
req
.
sql
);
...
...
source/libs/qworker/src/qwUtil.c
浏览文件 @
407db99a
...
...
@@ -135,8 +135,8 @@ int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchS
void
qwReleaseScheduler
(
int32_t
rwType
,
SQWorker
*
mgmt
)
{
QW_UNLOCK
(
rwType
,
&
mgmt
->
schLock
);
}
int32_t
qwAcquireTaskStatus
(
QW_FPARAMS_DEF
,
int32_t
rwType
,
SQWSchStatus
*
sch
,
SQWTaskStatus
**
task
)
{
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)
+
sizeof
(
eId
)
]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
,
eId
);
QW_LOCK
(
rwType
,
&
sch
->
tasksLock
);
*
task
=
taosHashGet
(
sch
->
tasksHash
,
id
,
sizeof
(
id
));
...
...
@@ -151,8 +151,8 @@ int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, S
int32_t
qwAddTaskStatusImpl
(
QW_FPARAMS_DEF
,
SQWSchStatus
*
sch
,
int32_t
rwType
,
int32_t
status
,
SQWTaskStatus
**
task
)
{
int32_t
code
=
0
;
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)
+
sizeof
(
eId
)
]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
,
eId
);
SQWTaskStatus
ntask
=
{
0
};
ntask
.
status
=
status
;
...
...
@@ -207,8 +207,8 @@ int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch
void
qwReleaseTaskStatus
(
int32_t
rwType
,
SQWSchStatus
*
sch
)
{
QW_UNLOCK
(
rwType
,
&
sch
->
tasksLock
);
}
int32_t
qwAcquireTaskCtx
(
QW_FPARAMS_DEF
,
SQWTaskCtx
**
ctx
)
{
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)
+
sizeof
(
eId
)
]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
,
eId
);
*
ctx
=
taosHashAcquire
(
mgmt
->
ctxHash
,
id
,
sizeof
(
id
));
if
(
NULL
==
(
*
ctx
))
{
...
...
@@ -220,8 +220,8 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
}
int32_t
qwGetTaskCtx
(
QW_FPARAMS_DEF
,
SQWTaskCtx
**
ctx
)
{
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)
+
sizeof
(
eId
)
]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
,
eId
);
*
ctx
=
taosHashGet
(
mgmt
->
ctxHash
,
id
,
sizeof
(
id
));
if
(
NULL
==
(
*
ctx
))
{
...
...
@@ -233,8 +233,8 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
}
int32_t
qwAddTaskCtxImpl
(
QW_FPARAMS_DEF
,
bool
acquire
,
SQWTaskCtx
**
ctx
)
{
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)
+
sizeof
(
eId
)
]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
,
eId
);
SQWTaskCtx
nctx
=
{
0
};
...
...
@@ -314,8 +314,8 @@ void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
}
int32_t
qwDropTaskCtx
(
QW_FPARAMS_DEF
)
{
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)
+
sizeof
(
eId
)
]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
,
eId
);
SQWTaskCtx
octx
;
SQWTaskCtx
*
ctx
=
taosHashGet
(
mgmt
->
ctxHash
,
id
,
sizeof
(
id
));
...
...
@@ -348,8 +348,8 @@ int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)
+
sizeof
(
eId
)
]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
,
eId
);
if
(
qwAcquireScheduler
(
mgmt
,
sId
,
QW_WRITE
,
&
sch
))
{
QW_TASK_WLOG_E
(
"scheduler does not exist"
);
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
407db99a
...
...
@@ -168,7 +168,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo)
// TODO GET EXECUTOR API TO GET MORE INFO
QW_GET_QTID
(
key
,
status
.
queryId
,
status
.
taskId
);
QW_GET_QTID
(
key
,
status
.
queryId
,
status
.
taskId
,
status
.
execId
);
status
.
status
=
taskStatus
->
status
;
status
.
refId
=
taskStatus
->
refId
;
...
...
@@ -493,7 +493,9 @@ int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET
(
qwRegisterQueryBrokenLinkArg
(
QW_FPARAMS
(),
&
qwMsg
->
connInfo
));
QW_ERR_JRET
(
qwAddAcquireTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
QW_ERR_JRET
(
qwAddTaskCtx
(
QW_FPARAMS
()));
QW_ERR_JRET
(
qwAcquireTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
ctx
->
ctrlConnInfo
=
qwMsg
->
connInfo
;
...
...
@@ -562,6 +564,35 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
QW_ERR_JRET
(
qwExecTask
(
QW_FPARAMS
(),
ctx
,
NULL
));
}
if
(
gQWDebug
.
tmp
)
{
#if 1
if
(
TDMT_SCH_QUERY
==
qwMsg
->
msgType
)
{
SEpSet
epSet
=
{
0
};
epSet
.
inUse
=
1
;
epSet
.
numOfEps
=
3
;
strcpy
(
epSet
.
eps
[
0
].
fqdn
,
"localhost"
);
epSet
.
eps
[
0
].
port
=
7100
;
strcpy
(
epSet
.
eps
[
1
].
fqdn
,
"localhost"
);
epSet
.
eps
[
1
].
port
=
7200
;
strcpy
(
epSet
.
eps
[
2
].
fqdn
,
"localhost"
);
epSet
.
eps
[
2
].
port
=
7300
;
qwDbgBuildAndSendRedirectRsp
(
qwMsg
->
msgType
+
1
,
&
qwMsg
->
connInfo
,
TSDB_CODE_RPC_REDIRECT
,
&
epSet
);
gQWDebug
.
tmp
=
false
;
return
TSDB_CODE_SUCCESS
;
}
#else
if
(
TDMT_SCH_MERGE_QUERY
==
qwMsg
->
msgType
)
{
ctx
->
phase
=
QW_PHASE_POST_QUERY
;
qwDbgBuildAndSendRedirectRsp
(
qwMsg
->
msgType
+
1
,
&
qwMsg
->
connInfo
,
TSDB_CODE_RPC_REDIRECT
,
NULL
);
gQWDebug
.
tmp
=
false
;
return
TSDB_CODE_SUCCESS
;
}
#endif
}
_return:
input
.
code
=
code
;
...
...
@@ -734,8 +765,6 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
SQWTaskCtx
*
ctx
=
NULL
;
bool
locked
=
false
;
// TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED
QW_ERR_JRET
(
qwAcquireTaskCtx
(
QW_FPARAMS
(),
&
ctx
));
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
407db99a
...
...
@@ -125,7 +125,7 @@ typedef struct SSchTaskCallbackParam {
uint64_t
queryId
;
int64_t
refId
;
uint64_t
taskId
;
int32_t
execId
x
;
int32_t
execId
;
void
*
pTrans
;
}
SSchTaskCallbackParam
;
...
...
@@ -171,7 +171,7 @@ typedef struct SSchTask {
uint64_t
taskId
;
// task id
SRWLatch
lock
;
// task lock
int32_t
maxExecTimes
;
// task may exec times
int32_t
execId
x
;
// task current execute try index
int32_t
execId
;
// task current execute try index
SSchLevel
*
level
;
// level
SRWLatch
planLock
;
// task update plan lock
SSubplan
*
plan
;
// subplan
...
...
@@ -243,9 +243,9 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOG_TASK_START_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId
x
% SCH_TASK_MAX_EXEC_TIMES; \
int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.execUseTime[idx] = us; \
if (0 == (_task)->execId
x
) { \
if (0 == (_task)->execId) { \
(_task)->profile.startTs = us; \
} \
} while (0)
...
...
@@ -253,7 +253,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOG_TASK_WAIT_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId
x
% SCH_TASK_MAX_EXEC_TIMES; \
int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.waitTime += us - (_task)->profile.execUseTime[idx]; \
} while (0)
...
...
@@ -261,12 +261,12 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOG_TASK_END_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId
x
% SCH_TASK_MAX_EXEC_TIMES; \
int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.execUseTime[idx] = us - (_task)->profile.execUseTime[idx]; \
(_task)->profile.endTs = us; \
} while (0)
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execUseTime[(_task)->execId
x
% SCH_TASK_MAX_EXEC_TIMES]) > (_task)->timeoutUsec)
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execUseTime[(_task)->execId % SCH_TASK_MAX_EXEC_TIMES]) > (_task)->timeoutUsec)
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
...
...
@@ -274,6 +274,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock)
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0)
#define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1)
...
...
@@ -318,13 +319,13 @@ extern SSchedulerMgmt schMgmt;
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) \
qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "
" param, pJob->queryId, SCH_TASK_ID(pTask),
__VA_ARGS__)
qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "
,EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),
__VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) \
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "
" param, pJob->queryId, SCH_TASK_ID(pTask),
__VA_ARGS__)
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "
,EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),
__VA_ARGS__)
#define SCH_TASK_DLOGL(param, ...) \
qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "
" param, pJob->queryId, SCH_TASK_ID(pTask),
__VA_ARGS__)
qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "
,EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),
__VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) \
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "
" param, pJob->queryId, SCH_TASK_ID(pTask),
__VA_ARGS__)
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "
,EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),
__VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
...
...
@@ -366,7 +367,7 @@ void schProcessOnDataFetched(SSchJob *job);
int32_t
schGetTaskInJob
(
SSchJob
*
pJob
,
uint64_t
taskId
,
SSchTask
**
pTask
);
void
schFreeRpcCtxVal
(
const
void
*
arg
);
int32_t
schMakeBrokenLinkVal
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRpcBrokenlinkVal
*
brokenVal
,
bool
isHb
);
int32_t
schAppendTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
,
int32_t
execId
x
);
int32_t
schAppendTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
,
int32_t
execId
);
int32_t
schExecStaticExplainJob
(
SSchedulerReq
*
pReq
,
int64_t
*
job
,
bool
sync
);
int32_t
schExecJobImpl
(
SSchedulerReq
*
pReq
,
SSchJob
*
pJob
,
bool
sync
);
int32_t
schUpdateJobStatus
(
SSchJob
*
pJob
,
int8_t
newStatus
);
...
...
@@ -378,7 +379,7 @@ int32_t schExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes);
int32_t
schAsyncExecJob
(
SSchedulerReq
*
pReq
,
int64_t
*
pJob
);
int32_t
schFetchRows
(
SSchJob
*
pJob
);
int32_t
schAsyncFetchRows
(
SSchJob
*
pJob
);
int32_t
schUpdateTaskHandle
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
bool
dropExecNode
,
void
*
handle
,
int32_t
execId
x
);
int32_t
schUpdateTaskHandle
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
bool
dropExecNode
,
void
*
handle
,
int32_t
execId
);
int32_t
schProcessOnTaskStatusRsp
(
SQueryNodeEpId
*
pEpId
,
SArray
*
pStatusList
);
void
schFreeSMsgSendInfo
(
SMsgSendInfo
*
msgSendInfo
);
char
*
schGetOpStr
(
SCH_OP_TYPE
type
);
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
407db99a
...
...
@@ -28,7 +28,7 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) { qDebug("sch release jobId:0x
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
)
{
pTask
->
plan
=
pPlan
;
pTask
->
level
=
pLevel
;
pTask
->
execId
x
=
-
1
;
pTask
->
execId
=
-
1
;
pTask
->
maxExecTimes
=
SCH_TASK_MAX_EXEC_TIMES
;
pTask
->
timeoutUsec
=
SCH_DEFAULT_TASK_TIMEOUT_USEC
;
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_NOT_START
);
...
...
@@ -428,59 +428,59 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schAppendTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
,
int32_t
execId
x
)
{
int32_t
schAppendTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
,
int32_t
execId
)
{
SSchNodeInfo
nodeInfo
=
{.
addr
=
*
addr
,
.
handle
=
NULL
};
if
(
taosHashPut
(
pTask
->
execNodes
,
&
execId
x
,
sizeof
(
execIdx
),
&
nodeInfo
,
sizeof
(
nodeInfo
)))
{
if
(
taosHashPut
(
pTask
->
execNodes
,
&
execId
,
sizeof
(
execId
),
&
nodeInfo
,
sizeof
(
nodeInfo
)))
{
SCH_TASK_ELOG
(
"taosHashPut nodeInfo to execNodes failed, errno:%d"
,
errno
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_TASK_DLOG
(
"task execNode added, execId
x:%d"
,
execIdx
);
SCH_TASK_DLOG
(
"task execNode added, execId
:%d"
,
execId
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
schDropTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
void
*
handle
,
int32_t
execId
x
)
{
int32_t
schDropTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
void
*
handle
,
int32_t
execId
)
{
if
(
NULL
==
pTask
->
execNodes
)
{
return
TSDB_CODE_SUCCESS
;
}
if
(
taosHashRemove
(
pTask
->
execNodes
,
&
execId
x
,
sizeof
(
execIdx
)))
{
SCH_TASK_ELOG
(
"fail to remove execId
x %d from execNodeList"
,
execIdx
);
if
(
taosHashRemove
(
pTask
->
execNodes
,
&
execId
,
sizeof
(
execId
)))
{
SCH_TASK_ELOG
(
"fail to remove execId
%d from execNodeList"
,
execId
);
}
else
{
SCH_TASK_DLOG
(
"execId
x %d removed from execNodeList"
,
execIdx
);
SCH_TASK_DLOG
(
"execId
%d removed from execNodeList"
,
execId
);
}
if
(
execId
x
!=
pTask
->
execIdx
)
{
// ignore it
SCH_TASK_DLOG
(
"execId
x %d is not current execIdx %d"
,
execIdx
,
pTask
->
execIdx
);
if
(
execId
!=
pTask
->
execId
)
{
// ignore it
SCH_TASK_DLOG
(
"execId
%d is not current execId %d"
,
execId
,
pTask
->
execId
);
SCH_RET
(
TSDB_CODE_SCH_IGNORE_ERROR
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schUpdateTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
void
*
handle
,
int32_t
execId
x
)
{
int32_t
schUpdateTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
void
*
handle
,
int32_t
execId
)
{
if
(
taosHashGetSize
(
pTask
->
execNodes
)
<=
0
)
{
return
TSDB_CODE_SUCCESS
;
}
SSchNodeInfo
*
nodeInfo
=
taosHashGet
(
pTask
->
execNodes
,
&
execId
x
,
sizeof
(
execIdx
));
SSchNodeInfo
*
nodeInfo
=
taosHashGet
(
pTask
->
execNodes
,
&
execId
,
sizeof
(
execId
));
nodeInfo
->
handle
=
handle
;
SCH_TASK_DLOG
(
"handle updated to %p for execId
x %d"
,
handle
,
execIdx
);
SCH_TASK_DLOG
(
"handle updated to %p for execId
%d"
,
handle
,
execId
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
schUpdateTaskHandle
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
bool
dropExecNode
,
void
*
handle
,
int32_t
execId
x
)
{
int32_t
schUpdateTaskHandle
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
bool
dropExecNode
,
void
*
handle
,
int32_t
execId
)
{
if
(
dropExecNode
)
{
SCH_RET
(
schDropTaskExecNode
(
pJob
,
pTask
,
handle
,
execId
x
));
SCH_RET
(
schDropTaskExecNode
(
pJob
,
pTask
,
handle
,
execId
));
}
SCH_SET_TASK_HANDLE
(
pTask
,
handle
);
schUpdateTaskExecNode
(
pJob
,
pTask
,
handle
,
execId
x
);
schUpdateTaskExecNode
(
pJob
,
pTask
,
handle
,
execId
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -828,9 +828,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
}
}
if
((
pTask
->
execId
x
+
1
)
>=
pTask
->
maxExecTimes
)
{
if
((
pTask
->
execId
+
1
)
>=
pTask
->
maxExecTimes
)
{
*
needRetry
=
false
;
SCH_TASK_DLOG
(
"task no more retry since reach max try times, execId
x:%d"
,
pTask
->
execIdx
);
SCH_TASK_DLOG
(
"task no more retry since reach max try times, execId
:%d"
,
pTask
->
execId
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -841,9 +841,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
}
if
(
SCH_IS_DATA_SRC_TASK
(
pTask
))
{
if
((
pTask
->
execId
x
+
1
)
>=
SCH_TASK_NUM_OF_EPS
(
&
pTask
->
plan
->
execNode
))
{
if
((
pTask
->
execId
+
1
)
>=
SCH_TASK_NUM_OF_EPS
(
&
pTask
->
plan
->
execNode
))
{
*
needRetry
=
false
;
SCH_TASK_DLOG
(
"task no more retry since all ep tried, execId
x:%d, epNum:%d"
,
pTask
->
execIdx
,
SCH_TASK_DLOG
(
"task no more retry since all ep tried, execId
:%d, epNum:%d"
,
pTask
->
execId
,
SCH_TASK_NUM_OF_EPS
(
&
pTask
->
plan
->
execNode
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -859,7 +859,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
}
*
needRetry
=
true
;
SCH_TASK_DLOG
(
"task need the %dth retry, errCode:%x - %s"
,
pTask
->
execId
x
+
1
,
errCode
,
tstrerror
(
errCode
));
SCH_TASK_DLOG
(
"task need the %dth retry, errCode:%x - %s"
,
pTask
->
execId
+
1
,
errCode
,
tstrerror
(
errCode
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1171,6 +1171,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SDownstreamSourceNode
source
=
{.
type
=
QUERY_NODE_DOWNSTREAM_SOURCE
,
.
taskId
=
pTask
->
taskId
,
.
schedId
=
schMgmt
.
sId
,
.
execId
=
pTask
->
execId
,
.
addr
=
pTask
->
succeedAddr
};
qSetSubplanExecutionNode
(
parent
->
plan
,
pTask
->
plan
->
id
.
groupId
,
&
source
);
SCH_UNLOCK
(
SCH_WRITE
,
&
parent
->
lock
);
...
...
@@ -1256,7 +1257,7 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
SCH_LOCK_TASK
(
pTask
);
if
(
SCH_TASK_TIMEOUT
(
pTask
)
&&
JOB_TASK_STATUS_EXECUTING
==
pTask
->
status
&&
pJob
->
fetchTask
!=
pTask
&&
taosArrayGetSize
(
pTask
->
candidateAddrs
)
>
1
)
{
SCH_TASK_DLOG
(
"task execId
x %d will be rescheduled now"
,
pTask
->
execIdx
);
SCH_TASK_DLOG
(
"task execId
%d will be rescheduled now"
,
pTask
->
execId
);
schDropTaskOnExecNode
(
pJob
,
pTask
);
taosHashClear
(
pTask
->
execNodes
);
schProcessOnTaskFailure
(
pJob
,
pTask
,
TSDB_CODE_SCH_TIMEOUT_ERROR
);
...
...
@@ -1275,6 +1276,9 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
STaskStatus
*
taskStatus
=
taosArrayGet
(
pStatusList
,
i
);
qDebug
(
"QID:%"
PRIx64
",TID:0x%"
PRIx64
",EID:%d task status in server: %s"
,
taskStatus
->
queryId
,
taskStatus
->
taskId
,
taskStatus
->
execId
,
jobTaskStatusStr
(
taskStatus
->
status
));
SSchJob
*
pJob
=
schAcquireJob
(
taskStatus
->
refId
);
if
(
NULL
==
pJob
)
{
qWarn
(
"job not found, refId:0x%"
PRIx64
",QID:0x%"
PRIx64
",TID:0x%"
PRIx64
,
taskStatus
->
refId
,
...
...
@@ -1283,8 +1287,6 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
continue
;
}
SCH_JOB_DLOG
(
"TID:0x%"
PRIx64
" task status in server: %s"
,
taskStatus
->
taskId
,
jobTaskStatusStr
(
taskStatus
->
status
));
pTask
=
NULL
;
schGetTaskInJob
(
pJob
,
taskStatus
->
taskId
,
&
pTask
);
if
(
NULL
==
pTask
)
{
...
...
@@ -1292,6 +1294,13 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
schReleaseJob
(
taskStatus
->
refId
);
continue
;
}
if
(
taskStatus
->
execId
!=
pTask
->
execId
)
{
// TODO DROP TASK FROM SERVER!!!!
SCH_TASK_DLOG
(
"EID %d in hb rsp mis-match"
,
taskStatus
->
execId
);
schReleaseJob
(
taskStatus
->
refId
);
continue
;
}
if
(
taskStatus
->
status
==
JOB_TASK_STATUS_FAILED
)
{
// RECORD AND HANDLE ERROR!!!!
...
...
@@ -1362,9 +1371,9 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
int32_t
code
=
0
;
atomic_add_fetch_32
(
&
pTask
->
level
->
taskLaunchedNum
,
1
);
pTask
->
execId
x
++
;
pTask
->
execId
++
;
SCH_TASK_DLOG
(
"start to launch task's %dth exec"
,
pTask
->
execId
x
);
SCH_TASK_DLOG
(
"start to launch task's %dth exec"
,
pTask
->
execId
);
SCH_LOG_TASK_START_TS
(
pTask
);
...
...
@@ -1677,10 +1686,14 @@ _return:
int32_t
schDoTaskRedirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
int8_t
status
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_TASK_ELOG
(
"redirect will no continue cause of job status %s"
,
jobTaskStatusStr
(
status
));
SCH_RET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
if
((
pTask
->
execIdx
+
1
)
>=
pTask
->
maxExecTimes
)
{
SCH_TASK_DLOG
(
"task no more retry since reach max try times, execIdx:%d"
,
pTask
->
execIdx
);
SCH_UNLOCK_TASK
(
pTask
);
if
((
pTask
->
execId
+
1
)
>=
pTask
->
maxExecTimes
)
{
SCH_TASK_DLOG
(
"task no more retry since reach max try times, execId:%d"
,
pTask
->
execId
);
schProcessOnJobFailure
(
pJob
,
rspCode
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1703,34 +1716,36 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, int32_t rspCode) {
SCH_ERR_JRET
(
schLaunchTasksInFlowCtrlList
(
pJob
,
pTask
));
}
}
}
else
{
pTask
->
childReady
=
0
;
int32_t
childrenNum
=
taosArrayGetSize
(
pTask
->
children
);
for
(
int32_t
i
=
0
;
i
<
childrenNum
;
++
i
)
{
SSchTask
*
pChild
=
taosArrayGetP
(
pTask
->
children
,
i
);
SCH_LOCK_TASK
(
pChild
);
code
=
schDoTaskRedirect
(
pJob
,
pChild
,
rspCode
);
SCH_UNLOCK_TASK
(
pChild
);
SCH_ERR_JRET
(
code
);
}
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_NOT_START
);
qClearSubplanExecutionNode
(
pTask
->
plan
);
SCH_ERR_JRET
(
schLaunchTask
(
pJob
,
pTask
));
return
TSDB_CODE_SUCCESS
;
}
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_NOT_START
);
// merge plan
pTask
->
childReady
=
0
;
SCH_ERR_JRET
(
schLaunchTask
(
pJob
,
pTask
)
);
qClearSubplanExecutionNode
(
pTask
->
plan
);
SCH_UNLOCK_TASK
(
pTask
);
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_NOT_START
);
int32_t
childrenNum
=
taosArrayGetSize
(
pTask
->
children
);
for
(
int32_t
i
=
0
;
i
<
childrenNum
;
++
i
)
{
SSchTask
*
pChild
=
taosArrayGetP
(
pTask
->
children
,
i
);
SCH_LOCK_TASK
(
pChild
);
schDoTaskRedirect
(
pJob
,
pChild
,
rspCode
);
SCH_UNLOCK_TASK
(
pChild
);
}
return
TSDB_CODE_SUCCESS
;
_return:
code
=
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
);
SCH_UNLOCK_TASK
(
pTask
);
SCH_RET
(
code
);
}
...
...
@@ -1747,7 +1762,7 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32
SCH_ERR_JRET
(
schUpdateTaskCandidateAddr
(
pJob
,
pTask
,
pData
->
pEpSet
));
}
schDoTaskRedirect
(
pJob
,
pTask
,
rspCode
);
SCH_RET
(
schDoTaskRedirect
(
pJob
,
pTask
,
rspCode
)
);
_return:
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
407db99a
...
...
@@ -379,13 +379,13 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
SCH_TASK_DLOG
(
"rsp msg received, type:%s, handle:%p, code:%s"
,
TMSG_INFO
(
msgType
),
pMsg
->
handle
,
tstrerror
(
rspCode
));
if
(
pParam
->
execId
x
!=
pTask
->
execIdx
)
{
SCH_TASK_DLOG
(
"execId
x %d mis-match current execIdx %d"
,
pParam
->
execIdx
,
pTask
->
execIdx
);
if
(
pParam
->
execId
!=
pTask
->
execId
)
{
SCH_TASK_DLOG
(
"execId
%d mis-match current execId %d"
,
pParam
->
execId
,
pTask
->
execId
);
goto
_return
;
}
bool
dropExecNode
=
(
msgType
==
TDMT_SCH_LINK_BROKEN
||
SCH_NETWORK_ERR
(
rspCode
));
SCH_ERR_JRET
(
schUpdateTaskHandle
(
pJob
,
pTask
,
dropExecNode
,
pMsg
->
handle
,
pParam
->
execId
x
));
SCH_ERR_JRET
(
schUpdateTaskHandle
(
pJob
,
pTask
,
dropExecNode
,
pMsg
->
handle
,
pParam
->
execId
));
int8_t
status
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
...
...
@@ -401,7 +401,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
goto
_return
;
}
SCH_ERR_JRET
(
schHandleResponseMsg
(
pJob
,
pTask
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
)
);
code
=
schHandleResponseMsg
(
pJob
,
pTask
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
);
pMsg
->
pData
=
NULL
;
_return:
...
...
@@ -458,7 +458,7 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bo
param
->
refId
=
pJob
->
refId
;
param
->
taskId
=
SCH_TASK_ID
(
pTask
);
param
->
pTrans
=
pJob
->
conn
.
pTrans
;
param
->
execId
x
=
pTask
->
execIdx
;
param
->
execId
=
pTask
->
execId
;
*
pParam
=
param
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1015,6 +1015,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg
->
queryId
=
htobe64
(
pJob
->
queryId
);
pMsg
->
taskId
=
htobe64
(
pTask
->
taskId
);
pMsg
->
refId
=
htobe64
(
pJob
->
refId
);
pMsg
->
execId
=
htonl
(
pTask
->
execId
);
pMsg
->
taskType
=
TASK_TYPE_TEMP
;
pMsg
->
explain
=
SCH_IS_EXPLAIN_JOB
(
pJob
);
pMsg
->
phyLen
=
htonl
(
pTask
->
msgLen
);
...
...
@@ -1041,6 +1042,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
pJob
->
queryId
);
pMsg
->
taskId
=
htobe64
(
pTask
->
taskId
);
pMsg
->
execId
=
htonl
(
pTask
->
execId
);
break
;
}
...
...
@@ -1060,6 +1062,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg
->
queryId
=
htobe64
(
pJob
->
queryId
);
pMsg
->
taskId
=
htobe64
(
pTask
->
taskId
);
pMsg
->
refId
=
htobe64
(
pJob
->
refId
);
pMsg
->
execId
=
htonl
(
pTask
->
execId
);
break
;
}
case
TDMT_SCH_QUERY_HEARTBEAT
:
{
...
...
@@ -1102,7 +1105,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
(
rpcCtx
.
args
?
&
rpcCtx
:
NULL
)));
if
(
msgType
==
TDMT_SCH_QUERY
||
msgType
==
TDMT_SCH_MERGE_QUERY
)
{
SCH_ERR_RET
(
schAppendTaskExecNode
(
pJob
,
pTask
,
addr
,
pTask
->
execId
x
));
SCH_ERR_RET
(
schAppendTaskExecNode
(
pJob
,
pTask
,
addr
,
pTask
->
execId
));
}
return
TSDB_CODE_SUCCESS
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录