Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
eced27c4
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
eced27c4
编写于
3月 15, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add uuid
上级
31ba02d5
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
447 addition
and
349 deletion
+447
-349
include/util/tuuid.h
include/util/tuuid.h
+39
-0
source/dnode/snode/inc/sndInt.h
source/dnode/snode/inc/sndInt.h
+7
-7
source/dnode/snode/src/snode.c
source/dnode/snode/src/snode.c
+11
-0
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+331
-342
source/util/src/tuuid.c
source/util/src/tuuid.c
+59
-0
未找到文件。
include/util/tuuid.h
0 → 100644
浏览文件 @
eced27c4
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taoserror.h"
#include "thash.h"
/**
* Generate an non-negative signed 32bit id
*+------------+-----+-----------+---------------+
*| uid|localIp| PId | timestamp | serial number |
*+------------+-----+-----------+---------------+
*| 6bit |6bit | 12bit | 8bit |
*+------------+-----+-----------+---------------+
* @return
*/
int32_t
tGenIdPI32
(
void
);
/**
* Generate an non-negative signed 64bit id
*+------------+-----+-----------+---------------+
*| uid|localIp| PId | timestamp | serial number |
*+------------+-----+-----------+---------------+
*| 12bit |12bit|24bit |16bit |
*+------------+-----+-----------+---------------+
* @return
*/
int64_t
tGenIdPI64
(
void
);
source/dnode/snode/inc/sndInt.h
浏览文件 @
eced27c4
...
...
@@ -54,18 +54,18 @@ typedef struct SSnode {
typedef
struct
{
int64_t
streamId
;
int32_t
taskId
;
int32_t
IdxInLevel
;
int32_t
level
;
}
SStreamInfo
;
}
SStream
Task
Info
;
typedef
struct
{
SStreamInfo
meta
;
int8_t
status
;
void
*
executor
;
STaosQueue
*
queue
;
void
*
stateStore
;
SStreamTaskInfo
meta
;
int8_t
status
;
void
*
executor
;
void
*
stateStore
;
// storage handle
}
SStream
Runner
;
}
SStream
Task
;
int32_t
sndCreateStream
();
int32_t
sndDropStream
();
...
...
source/dnode/snode/src/snode.c
浏览文件 @
eced27c4
...
...
@@ -14,6 +14,7 @@
*/
#include "sndInt.h"
#include "tuuid.h"
SSnode
*
sndOpen
(
const
char
*
path
,
const
SSnodeOpt
*
pOption
)
{
SSnode
*
pSnode
=
calloc
(
1
,
sizeof
(
SSnode
));
...
...
@@ -32,6 +33,16 @@ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
void
sndDestroy
(
const
char
*
path
)
{}
static
int32_t
sndDeployTask
(
SSnode
*
pSnode
,
SRpcMsg
*
pMsg
)
{
SStreamTask
*
task
=
malloc
(
sizeof
(
SStreamTask
));
if
(
task
==
NULL
)
{
return
-
1
;
}
task
->
meta
.
taskId
=
tGenIdPI32
();
taosHashPut
(
pSnode
->
pMeta
->
pHash
,
&
task
->
meta
.
taskId
,
sizeof
(
int32_t
),
&
task
,
sizeof
(
void
*
));
return
0
;
}
int32_t
sndProcessUMsg
(
SSnode
*
pSnode
,
SRpcMsg
*
pMsg
)
{
// stream deployment
// stream stop/resume
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
eced27c4
...
...
@@ -13,26 +13,21 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalog.h"
#include "query.h"
#include "schedulerInt.h"
#include "tmsg.h"
#include "query.h"
#include "catalog.h"
#include "tref.h"
SSchedulerMgmt
schMgmt
=
{
0
};
FORCE_INLINE
SSchJob
*
schAcquireJob
(
int64_t
refId
)
{
return
(
SSchJob
*
)
taosAcquireRef
(
schMgmt
.
jobRef
,
refId
);
}
FORCE_INLINE
SSchJob
*
schAcquireJob
(
int64_t
refId
)
{
return
(
SSchJob
*
)
taosAcquireRef
(
schMgmt
.
jobRef
,
refId
);
}
FORCE_INLINE
int32_t
schReleaseJob
(
int64_t
refId
)
{
return
taosReleaseRef
(
schMgmt
.
jobRef
,
refId
);
}
FORCE_INLINE
int32_t
schReleaseJob
(
int64_t
refId
)
{
return
taosReleaseRef
(
schMgmt
.
jobRef
,
refId
);
}
uint64_t
schGenTaskId
(
void
)
{
return
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
}
uint64_t
schGenTaskId
(
void
)
{
return
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
}
#if 0
uint64_t schGenUUID(void) {
static uint64_t hashId = 0;
static int32_t requestSerialId = 0;
...
...
@@ -54,11 +49,11 @@ uint64_t schGenUUID(void) {
uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
return id;
}
#endif
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
)
{
pTask
->
plan
=
pPlan
;
pTask
->
level
=
pLevel
;
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
)
{
pTask
->
plan
=
pPlan
;
pTask
->
level
=
pLevel
;
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_NOT_START
);
pTask
->
taskId
=
schGenTaskId
();
pTask
->
execAddrs
=
taosArrayInit
(
SCH_MAX_CANDIDATE_EP_NUM
,
sizeof
(
SQueryNodeAddr
));
...
...
@@ -70,7 +65,7 @@ int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *
return
TSDB_CODE_SUCCESS
;
}
void
schFreeTask
(
SSchTask
*
pTask
)
{
void
schFreeTask
(
SSchTask
*
pTask
)
{
if
(
pTask
->
candidateAddrs
)
{
taosArrayDestroy
(
pTask
->
candidateAddrs
);
}
...
...
@@ -90,22 +85,20 @@ void schFreeTask(SSchTask* pTask) {
}
}
static
FORCE_INLINE
bool
schJobNeedToStop
(
SSchJob
*
pJob
,
int8_t
*
pStatus
)
{
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
pStatus
)
{
*
pStatus
=
status
;
}
return
(
status
==
JOB_TASK_STATUS_FAILED
||
status
==
JOB_TASK_STATUS_CANCELLED
||
status
==
JOB_TASK_STATUS_CANCELLING
||
status
==
JOB_TASK_STATUS_DROPPING
||
status
==
JOB_TASK_STATUS_SUCCEED
);
return
(
status
==
JOB_TASK_STATUS_FAILED
||
status
==
JOB_TASK_STATUS_CANCELLED
||
status
==
JOB_TASK_STATUS_CANCELLING
||
status
==
JOB_TASK_STATUS_DROPPING
||
status
==
JOB_TASK_STATUS_SUCCEED
);
}
int32_t
schValidateTaskReceivedMsgType
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
msgType
)
{
int32_t
lastMsgType
=
SCH_GET_TASK_LASTMSG_TYPE
(
pTask
);
switch
(
msgType
)
{
case
TDMT_VND_CREATE_TABLE_RSP
:
case
TDMT_VND_SUBMIT_RSP
:
...
...
@@ -114,19 +107,22 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
case
TDMT_VND_FETCH_RSP
:
case
TDMT_VND_DROP_TASK
:
if
(
lastMsgType
!=
(
msgType
-
1
))
{
SCH_TASK_ELOG
(
"rsp msg type mis-match, last sent msgType:%s, rspType:%s"
,
TMSG_INFO
(
lastMsgType
),
TMSG_INFO
(
msgType
));
SCH_TASK_ELOG
(
"rsp msg type mis-match, last sent msgType:%s, rspType:%s"
,
TMSG_INFO
(
lastMsgType
),
TMSG_INFO
(
msgType
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
SCH_GET_TASK_STATUS
(
pTask
)
!=
JOB_TASK_STATUS_EXECUTING
&&
SCH_GET_TASK_STATUS
(
pTask
)
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
SCH_TASK_ELOG
(
"rsp msg conflicted with task status, status:%d, rspType:%s"
,
SCH_GET_TASK_STATUS
(
pTask
),
TMSG_INFO
(
msgType
));
if
(
SCH_GET_TASK_STATUS
(
pTask
)
!=
JOB_TASK_STATUS_EXECUTING
&&
SCH_GET_TASK_STATUS
(
pTask
)
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
SCH_TASK_ELOG
(
"rsp msg conflicted with task status, status:%d, rspType:%s"
,
SCH_GET_TASK_STATUS
(
pTask
),
TMSG_INFO
(
msgType
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
break
;
default:
SCH_TASK_ELOG
(
"unknown rsp msg, type:%s, status:%d"
,
TMSG_INFO
(
msgType
),
SCH_GET_TASK_STATUS
(
pTask
));
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
...
...
@@ -135,7 +131,6 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
return
TSDB_CODE_SUCCESS
;
}
int32_t
schCheckAndUpdateJobStatus
(
SSchJob
*
pJob
,
int8_t
newStatus
)
{
int32_t
code
=
0
;
...
...
@@ -147,37 +142,34 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
if
(
oriStatus
==
newStatus
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
switch
(
oriStatus
)
{
case
JOB_TASK_STATUS_NULL
:
if
(
newStatus
!=
JOB_TASK_STATUS_NOT_START
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_NOT_START
:
if
(
newStatus
!=
JOB_TASK_STATUS_EXECUTING
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_EXECUTING
:
if
(
newStatus
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLING
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
if
(
newStatus
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLING
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_PARTIAL_SUCCEED
:
if
(
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
if
(
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_SUCCEED
:
case
JOB_TASK_STATUS_FAILED
:
...
...
@@ -185,13 +177,13 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
if
(
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_CANCELLED
:
case
JOB_TASK_STATUS_DROPPING
:
SCH_ERR_JRET
(
TSDB_CODE_QRY_JOB_FREED
);
break
;
default:
SCH_JOB_ELOG
(
"invalid job status:%d"
,
oriStatus
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
...
...
@@ -211,27 +203,26 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
_return:
SCH_JOB_ELOG
(
"invalid job status update, from %d to %d"
,
oriStatus
,
newStatus
);
SCH_ERR_RET
(
code
);
}
int32_t
schBuildTaskRalation
(
SSchJob
*
pJob
,
SHashObj
*
planToTask
)
{
for
(
int32_t
i
=
0
;
i
<
pJob
->
levelNum
;
++
i
)
{
SSchLevel
*
pLevel
=
taosArrayGet
(
pJob
->
levels
,
i
);
for
(
int32_t
m
=
0
;
m
<
pLevel
->
taskNum
;
++
m
)
{
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
m
);
SSubplan
*
pPlan
=
pTask
->
plan
;
int32_t
childNum
=
pPlan
->
pChildren
?
(
int32_t
)
LIST_LENGTH
(
pPlan
->
pChildren
)
:
0
;
int32_t
parentNum
=
pPlan
->
pParents
?
(
int32_t
)
LIST_LENGTH
(
pPlan
->
pParents
)
:
0
;
int32_t
childNum
=
pPlan
->
pChildren
?
(
int32_t
)
LIST_LENGTH
(
pPlan
->
pChildren
)
:
0
;
int32_t
parentNum
=
pPlan
->
pParents
?
(
int32_t
)
LIST_LENGTH
(
pPlan
->
pParents
)
:
0
;
if
(
childNum
>
0
)
{
if
(
pJob
->
levelIdx
==
pLevel
->
level
)
{
SCH_JOB_ELOG
(
"invalid query plan, lowest level, childNum:%d"
,
childNum
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
pTask
->
children
=
taosArrayInit
(
childNum
,
POINTER_BYTES
);
if
(
NULL
==
pTask
->
children
)
{
SCH_TASK_ELOG
(
"taosArrayInit %d children failed"
,
childNum
);
...
...
@@ -240,7 +231,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
for
(
int32_t
n
=
0
;
n
<
childNum
;
++
n
)
{
SSubplan
*
child
=
(
SSubplan
*
)
nodesListGetNode
(
pPlan
->
pChildren
,
n
);
SSubplan
*
child
=
(
SSubplan
*
)
nodesListGetNode
(
pPlan
->
pChildren
,
n
);
SSchTask
**
childTask
=
taosHashGet
(
planToTask
,
&
child
,
POINTER_BYTES
);
if
(
NULL
==
childTask
||
NULL
==
*
childTask
)
{
SCH_TASK_ELOG
(
"subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
...
...
@@ -258,7 +249,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SCH_TASK_ELOG
(
"invalid task info, level:0, parentNum:%d"
,
parentNum
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
pTask
->
parents
=
taosArrayInit
(
parentNum
,
POINTER_BYTES
);
if
(
NULL
==
pTask
->
parents
)
{
SCH_TASK_ELOG
(
"taosArrayInit %d parents failed"
,
parentNum
);
...
...
@@ -272,7 +263,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
for
(
int32_t
n
=
0
;
n
<
parentNum
;
++
n
)
{
SSubplan
*
parent
=
(
SSubplan
*
)
nodesListGetNode
(
pPlan
->
pParents
,
n
);
SSubplan
*
parent
=
(
SSubplan
*
)
nodesListGetNode
(
pPlan
->
pParents
,
n
);
SSchTask
**
parentTask
=
taosHashGet
(
planToTask
,
&
parent
,
POINTER_BYTES
);
if
(
NULL
==
parentTask
||
NULL
==
*
parentTask
)
{
SCH_TASK_ELOG
(
"subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
...
...
@@ -283,7 +274,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SCH_TASK_ELOG
(
"taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
}
SCH_TASK_DLOG
(
"level:%d, parentNum:%d, childNum:%d"
,
i
,
parentNum
,
childNum
);
}
...
...
@@ -298,11 +289,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schRecordTaskSucceedNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
SQueryNodeAddr
*
addr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
pTask
->
candidateIdx
);
if
(
NULL
==
addr
)
{
SCH_TASK_ELOG
(
"taosArrayGet candidate addr failed, idx:%d, size:%d"
,
pTask
->
candidateIdx
,
(
int32_t
)
taosArrayGetSize
(
pTask
->
candidateAddrs
));
SCH_TASK_ELOG
(
"taosArrayGet candidate addr failed, idx:%d, size:%d"
,
pTask
->
candidateIdx
,
(
int32_t
)
taosArrayGetSize
(
pTask
->
candidateAddrs
));
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
...
...
@@ -311,7 +302,6 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schRecordTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
)
{
if
(
NULL
==
taosArrayPush
(
pTask
->
execAddrs
,
addr
))
{
SCH_TASK_ELOG
(
"taosArrayPush addr to execAddr list failed, errno:%d"
,
errno
);
...
...
@@ -321,23 +311,25 @@ int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *ad
return
TSDB_CODE_SUCCESS
;
}
int32_t
schValidateAndBuildJob
(
SQueryPlan
*
pDag
,
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
pJob
->
queryId
=
pDag
->
queryId
;
if
(
pDag
->
numOfSubplans
<=
0
)
{
SCH_JOB_ELOG
(
"invalid subplan num:%d"
,
pDag
->
numOfSubplans
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
levelNum
=
(
int32_t
)
LIST_LENGTH
(
pDag
->
pSubplans
);
if
(
levelNum
<=
0
)
{
SCH_JOB_ELOG
(
"invalid level num:%d"
,
levelNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SHashObj
*
planToTask
=
taosHashInit
(
SCHEDULE_DEFAULT_MAX_TASK_NUM
,
taosGetDefaultHashFunction
(
POINTER_BYTES
==
sizeof
(
int64_t
)
?
TSDB_DATA_TYPE_BIGINT
:
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
SHashObj
*
planToTask
=
taosHashInit
(
SCHEDULE_DEFAULT_MAX_TASK_NUM
,
taosGetDefaultHashFunction
(
POINTER_BYTES
==
sizeof
(
int64_t
)
?
TSDB_DATA_TYPE_BIGINT
:
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
planToTask
)
{
SCH_JOB_ELOG
(
"taosHashInit %d failed"
,
SCHEDULE_DEFAULT_MAX_TASK_NUM
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
...
@@ -354,10 +346,10 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
pJob
->
subPlans
=
pDag
->
pSubplans
;
SSchLevel
level
=
{
0
};
SSchLevel
level
=
{
0
};
SNodeListNode
*
plans
=
NULL
;
int32_t
taskNum
=
0
;
SSchLevel
*
pLevel
=
NULL
;
int32_t
taskNum
=
0
;
SSchLevel
*
pLevel
=
NULL
;
level
.
status
=
JOB_TASK_STATUS_NOT_START
;
...
...
@@ -369,8 +361,8 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
pLevel
=
taosArrayGet
(
pJob
->
levels
,
i
);
pLevel
->
level
=
i
;
plans
=
(
SNodeListNode
*
)
nodesListGetNode
(
pDag
->
pSubplans
,
i
);
plans
=
(
SNodeListNode
*
)
nodesListGetNode
(
pDag
->
pSubplans
,
i
);
if
(
NULL
==
plans
)
{
SCH_JOB_ELOG
(
"empty level plan, level:%d"
,
i
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
...
...
@@ -383,15 +375,15 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
}
pLevel
->
taskNum
=
taskNum
;
pLevel
->
subTasks
=
taosArrayInit
(
taskNum
,
sizeof
(
SSchTask
));
if
(
NULL
==
pLevel
->
subTasks
)
{
SCH_JOB_ELOG
(
"taosArrayInit %d failed"
,
taskNum
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
for
(
int32_t
n
=
0
;
n
<
taskNum
;
++
n
)
{
SSubplan
*
plan
=
(
SSubplan
*
)
nodesListGetNode
(
plans
->
pNodeList
,
n
);
SSubplan
*
plan
=
(
SSubplan
*
)
nodesListGetNode
(
plans
->
pNodeList
,
n
);
SCH_SET_JOB_TYPE
(
pJob
,
plan
->
subplanType
);
...
...
@@ -399,13 +391,13 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SSchTask
*
pTask
=
&
task
;
SCH_ERR_JRET
(
schInitTask
(
pJob
,
&
task
,
plan
,
pLevel
));
void
*
p
=
taosArrayPush
(
pLevel
->
subTasks
,
&
task
);
if
(
NULL
==
p
)
{
SCH_TASK_ELOG
(
"taosArrayPush task to level failed, level:%d, taskIdx:%d"
,
pLevel
->
level
,
n
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
0
!=
taosHashPut
(
planToTask
,
&
plan
,
POINTER_BYTES
,
&
p
,
POINTER_BYTES
))
{
SCH_TASK_ELOG
(
"taosHashPut to planToTaks failed, taskIdx:%d"
,
n
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
...
@@ -452,10 +444,10 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
int32_t
nodeNum
=
0
;
if
(
pJob
->
nodeList
)
{
nodeNum
=
taosArrayGetSize
(
pJob
->
nodeList
);
for
(
int32_t
i
=
0
;
i
<
nodeNum
&&
addNum
<
SCH_MAX_CANDIDATE_EP_NUM
;
++
i
)
{
SQueryNodeAddr
*
naddr
=
taosArrayGet
(
pJob
->
nodeList
,
i
);
if
(
NULL
==
taosArrayPush
(
pTask
->
candidateAddrs
,
naddr
))
{
SCH_TASK_ELOG
(
"taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d"
,
addNum
,
errno
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
...
@@ -470,14 +462,14 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
/*
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
++epSet->numOfEps;
}
*/
/*
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
++epSet->numOfEps;
}
*/
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -489,7 +481,7 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
SCH_TASK_ELOG
(
"task already in execTask list, code:%x"
,
code
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
SCH_TASK_ELOG
(
"taosHashPut task to execTask list failed, errno:%d"
,
errno
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -510,11 +502,11 @@ int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if
(
0
!=
code
)
{
if
(
HASH_NODE_EXIST
(
code
))
{
*
moved
=
true
;
SCH_TASK_ELOG
(
"task already in succTask list, status:%d"
,
SCH_GET_TASK_STATUS
(
pTask
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SCH_TASK_ELOG
(
"taosHashPut task to succTask list failed, errno:%d"
,
errno
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -522,13 +514,13 @@ int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
*
moved
=
true
;
SCH_TASK_DLOG
(
"task moved to succTask list, numOfTasks:%d"
,
taosHashGetSize
(
pJob
->
succTasks
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
schMoveTaskToFailList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
bool
*
moved
)
{
*
moved
=
false
;
if
(
0
!=
taosHashRemove
(
pJob
->
execTasks
,
&
pTask
->
taskId
,
sizeof
(
pTask
->
taskId
)))
{
SCH_TASK_WLOG
(
"remove task from execTask list failed, may not exist, status:%d"
,
SCH_GET_TASK_STATUS
(
pTask
));
}
...
...
@@ -537,11 +529,11 @@ int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if
(
0
!=
code
)
{
if
(
HASH_NODE_EXIST
(
code
))
{
*
moved
=
true
;
SCH_TASK_WLOG
(
"task already in failTask list, status:%d"
,
SCH_GET_TASK_STATUS
(
pTask
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SCH_TASK_ELOG
(
"taosHashPut task to failTask list failed, errno:%d"
,
errno
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -549,11 +541,10 @@ int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
*
moved
=
true
;
SCH_TASK_DLOG
(
"task moved to failTask list, numOfTasks:%d"
,
taosHashGetSize
(
pJob
->
failTasks
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
schMoveTaskToExecList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
bool
*
moved
)
{
if
(
0
!=
taosHashRemove
(
pJob
->
succTasks
,
&
pTask
->
taskId
,
sizeof
(
pTask
->
taskId
)))
{
SCH_TASK_WLOG
(
"remove task from succTask list failed, may not exist, status:%d"
,
SCH_GET_TASK_STATUS
(
pTask
));
...
...
@@ -563,11 +554,11 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if
(
0
!=
code
)
{
if
(
HASH_NODE_EXIST
(
code
))
{
*
moved
=
true
;
SCH_TASK_ELOG
(
"task already in execTask list, status:%d"
,
SCH_GET_TASK_STATUS
(
pTask
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SCH_TASK_ELOG
(
"taosHashPut task to execTask list failed, errno:%d"
,
errno
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -575,11 +566,10 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
*
moved
=
true
;
SCH_TASK_DLOG
(
"task moved to execTask list, numOfTasks:%d"
,
taosHashGetSize
(
pJob
->
execTasks
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
schTaskCheckSetRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
,
bool
*
needRetry
)
{
// TODO set retry or not based on task type/errCode/retry times/job status/available eps...
...
...
@@ -587,20 +577,17 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return
TSDB_CODE_SUCCESS
;
//TODO CHECK epList/condidateList
//
TODO CHECK epList/condidateList
if
(
SCH_IS_DATA_SRC_TASK
(
pTask
))
{
}
else
{
int32_t
candidateNum
=
taosArrayGetSize
(
pTask
->
candidateAddrs
);
if
((
pTask
->
candidateIdx
+
1
)
>=
candidateNum
)
{
return
TSDB_CODE_SUCCESS
;
}
++
pTask
->
candidateIdx
;
}
}
int32_t
schHandleTaskRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
...
...
@@ -623,9 +610,9 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
}
int32_t
schUpdateHbConnection
(
SQueryNodeEpId
*
epId
,
SSchHbTrans
*
trans
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
SSchHbTrans
*
hb
=
NULL
;
while
(
true
)
{
hb
=
taosHashGet
(
schMgmt
.
hbConnections
,
epId
,
sizeof
(
SQueryNodeEpId
));
if
(
NULL
==
hb
)
{
...
...
@@ -639,9 +626,11 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) {
SCH_ERR_RET
(
code
);
}
qDebug
(
"hb connection updated, seqId:%"
PRIx64
", sId:%"
PRIx64
", nodeId:%d, fqdn:%s, port:%d, instance:%p, connection:%p"
,
trans
->
seqId
,
schMgmt
.
sId
,
epId
->
nodeId
,
epId
->
ep
.
fqdn
,
epId
->
ep
.
port
,
trans
->
trans
.
transInst
,
trans
->
trans
.
transHandle
);
qDebug
(
"hb connection updated, seqId:%"
PRIx64
", sId:%"
PRIx64
", nodeId:%d, fqdn:%s, port:%d, instance:%p, connection:%p"
,
trans
->
seqId
,
schMgmt
.
sId
,
epId
->
nodeId
,
epId
->
ep
.
fqdn
,
epId
->
ep
.
port
,
trans
->
trans
.
transInst
,
trans
->
trans
.
transHandle
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -649,11 +638,11 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) {
}
SCH_LOCK
(
SCH_WRITE
,
&
hb
->
lock
);
if
(
hb
->
seqId
>=
trans
->
seqId
)
{
qDebug
(
"hb trans seqId is old, seqId:%"
PRId64
", currentId:%"
PRId64
", nodeId:%d, fqdn:%s, port:%d"
,
trans
->
seqId
,
hb
->
seqId
,
epId
->
nodeId
,
epId
->
ep
.
fqdn
,
epId
->
ep
.
port
);
qDebug
(
"hb trans seqId is old, seqId:%"
PRId64
", currentId:%"
PRId64
", nodeId:%d, fqdn:%s, port:%d"
,
trans
->
seqId
,
hb
->
seqId
,
epId
->
nodeId
,
epId
->
ep
.
fqdn
,
epId
->
ep
.
port
);
SCH_UNLOCK
(
SCH_WRITE
,
&
hb
->
lock
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -663,16 +652,18 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) {
SCH_UNLOCK
(
SCH_WRITE
,
&
hb
->
lock
);
qDebug
(
"hb connection updated, seqId:%"
PRIx64
", sId:%"
PRIx64
", nodeId:%d, fqdn:%s, port:%d, instance:%p, connection:%p"
,
trans
->
seqId
,
schMgmt
.
sId
,
epId
->
nodeId
,
epId
->
ep
.
fqdn
,
epId
->
ep
.
port
,
trans
->
trans
.
transInst
,
trans
->
trans
.
transHandle
);
qDebug
(
"hb connection updated, seqId:%"
PRIx64
", sId:%"
PRIx64
", nodeId:%d, fqdn:%s, port:%d, instance:%p, connection:%p"
,
trans
->
seqId
,
schMgmt
.
sId
,
epId
->
nodeId
,
epId
->
ep
.
fqdn
,
epId
->
ep
.
port
,
trans
->
trans
.
transInst
,
trans
->
trans
.
transHandle
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
schProcessOnJobFailureImpl
(
SSchJob
*
pJob
,
int32_t
status
,
int32_t
errCode
)
{
// if already FAILED, no more processing
SCH_ERR_RET
(
schCheckAndUpdateJobStatus
(
pJob
,
status
));
if
(
errCode
)
{
atomic_store_32
(
&
pJob
->
errCode
,
errCode
);
}
...
...
@@ -684,11 +675,10 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod
int32_t
code
=
atomic_load_32
(
&
pJob
->
errCode
);
SCH_JOB_DLOG
(
"job failed with error: %s"
,
tstrerror
(
code
));
SCH_RET
(
code
);
}
// Note: no more task error processing, handled in function internal
int32_t
schProcessOnJobFailure
(
SSchJob
*
pJob
,
int32_t
errCode
)
{
SCH_RET
(
schProcessOnJobFailureImpl
(
pJob
,
JOB_TASK_STATUS_FAILED
,
errCode
));
...
...
@@ -699,18 +689,16 @@ int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) {
SCH_RET
(
schProcessOnJobFailureImpl
(
pJob
,
JOB_TASK_STATUS_DROPPING
,
errCode
));
}
// Note: no more task error processing, handled in function internal
int32_t
schProcessOnJobPartialSuccess
(
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
SCH_ERR_RET
(
schCheckAndUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_PARTIAL_SUCCEED
));
if
(
pJob
->
attr
.
syncSchedule
)
{
tsem_post
(
&
pJob
->
rspSem
);
}
if
(
atomic_load_8
(
&
pJob
->
userFetch
))
{
SCH_ERR_JRET
(
schFetchFromRemote
(
pJob
));
}
...
...
@@ -730,22 +718,22 @@ int32_t schProcessOnDataFetched(SSchJob *job) {
// Note: no more task error processing, handled in function internal
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
)
{
int8_t
status
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_TASK_DLOG
(
"task failed not processed cause of job status, job status:%d"
,
status
);
SCH_RET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
bool
needRetry
=
false
;
bool
moved
=
false
;
bool
needRetry
=
false
;
bool
moved
=
false
;
int32_t
taskDone
=
0
;
int32_t
code
=
0
;
SCH_TASK_DLOG
(
"taskOnFailure, code:%s"
,
tstrerror
(
errCode
));
SCH_ERR_JRET
(
schTaskCheckSetRetry
(
pJob
,
pTask
,
errCode
,
&
needRetry
));
if
(
!
needRetry
)
{
SCH_TASK_ELOG
(
"task failed and no more retry, code:%s"
,
tstrerror
(
errCode
));
...
...
@@ -757,7 +745,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
}
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_FAILED
);
if
(
SCH_TASK_NEED_WAIT_ALL
(
pTask
))
{
SCH_LOCK
(
SCH_WRITE
,
&
pTask
->
level
->
lock
);
pTask
->
level
->
taskFailed
++
;
...
...
@@ -765,7 +753,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
SCH_UNLOCK
(
SCH_WRITE
,
&
pTask
->
level
->
lock
);
atomic_store_32
(
&
pJob
->
errCode
,
errCode
);
if
(
taskDone
<
pTask
->
level
->
taskNum
)
{
SCH_TASK_DLOG
(
"not all tasks done, done:%d, all:%d"
,
taskDone
,
pTask
->
level
->
taskNum
);
SCH_ERR_RET
(
errCode
);
...
...
@@ -773,7 +761,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
}
}
else
{
SCH_ERR_JRET
(
schHandleTaskRetry
(
pJob
,
pTask
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -784,7 +772,7 @@ _return:
// Note: no more task error processing, handled in function internal
int32_t
schProcessOnTaskSuccess
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
bool
moved
=
false
;
bool
moved
=
false
;
int32_t
code
=
0
;
SCH_TASK_DLOG
(
"taskOnSuccess, status:%d"
,
SCH_GET_TASK_STATUS
(
pTask
));
...
...
@@ -796,17 +784,17 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_JRET
(
schRecordTaskSucceedNode
(
pJob
,
pTask
));
SCH_ERR_JRET
(
schLaunchTasksInFlowCtrlList
(
pJob
,
pTask
));
int32_t
parentNum
=
pTask
->
parents
?
(
int32_t
)
taosArrayGetSize
(
pTask
->
parents
)
:
0
;
if
(
parentNum
==
0
)
{
int32_t
taskDone
=
0
;
if
(
SCH_TASK_NEED_WAIT_ALL
(
pTask
))
{
SCH_LOCK
(
SCH_WRITE
,
&
pTask
->
level
->
lock
);
pTask
->
level
->
taskSucceed
++
;
taskDone
=
pTask
->
level
->
taskSucceed
+
pTask
->
level
->
taskFailed
;
SCH_UNLOCK
(
SCH_WRITE
,
&
pTask
->
level
->
lock
);
if
(
taskDone
<
pTask
->
level
->
taskNum
)
{
SCH_TASK_DLOG
(
"wait all tasks, done:%d, all:%d"
,
taskDone
,
pTask
->
level
->
taskNum
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -826,28 +814,31 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
pJob
->
fetchTask
=
pTask
;
SCH_ERR_JRET
(
schMoveTaskToExecList
(
pJob
,
pTask
,
&
moved
));
SCH_RET
(
schProcessOnJobPartialSuccess
(
pJob
));
}
/*
if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
/*
if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
++job->dataSrcEps.numOfEps;
}
*/
++job->dataSrcEps.numOfEps;
}
*/
for
(
int32_t
i
=
0
;
i
<
parentNum
;
++
i
)
{
SSchTask
*
par
=
*
(
SSchTask
**
)
taosArrayGet
(
pTask
->
parents
,
i
);
int32_t
readyNum
=
atomic_add_fetch_32
(
&
par
->
childReady
,
1
);
int32_t
readyNum
=
atomic_add_fetch_32
(
&
par
->
childReady
,
1
);
SCH_LOCK
(
SCH_WRITE
,
&
par
->
lock
);
SDownstreamSourceNode
source
=
{.
type
=
QUERY_NODE_DOWNSTREAM_SOURCE
,
.
taskId
=
pTask
->
taskId
,
.
schedId
=
schMgmt
.
sId
,
.
addr
=
pTask
->
succeedAddr
};
SDownstreamSourceNode
source
=
{.
type
=
QUERY_NODE_DOWNSTREAM_SOURCE
,
.
taskId
=
pTask
->
taskId
,
.
schedId
=
schMgmt
.
sId
,
.
addr
=
pTask
->
succeedAddr
};
qSetSubplanExecutionNode
(
par
->
plan
,
pTask
->
plan
->
id
.
groupId
,
&
source
);
SCH_UNLOCK
(
SCH_WRITE
,
&
par
->
lock
);
if
(
SCH_TASK_READY_TO_LUNCH
(
readyNum
,
par
))
{
SCH_ERR_RET
(
schLaunchTaskImpl
(
pJob
,
par
));
}
...
...
@@ -860,11 +851,10 @@ _return:
SCH_RET
(
schProcessOnJobFailure
(
pJob
,
code
));
}
// Note: no more error processing, handled in function internal
int32_t
schFetchFromRemote
(
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
if
(
atomic_val_compare_exchange_32
(
&
pJob
->
remoteFetch
,
0
,
1
)
!=
0
)
{
SCH_JOB_ELOG
(
"prior fetching not finished, remoteFetch:%d"
,
atomic_load_32
(
&
pJob
->
remoteFetch
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -881,7 +871,7 @@ int32_t schFetchFromRemote(SSchJob *pJob) {
SCH_ERR_JRET
(
schBuildAndSendMsg
(
pJob
,
pJob
->
fetchTask
,
&
pJob
->
resNode
,
TDMT_VND_FETCH
));
return
TSDB_CODE_SUCCESS
;
_return:
atomic_val_compare_exchange_32
(
&
pJob
->
remoteFetch
,
1
,
0
);
...
...
@@ -889,15 +879,15 @@ _return:
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pJob
->
fetchTask
,
code
));
}
// Note: no more task error processing, handled in function internal
int32_t
schHandleResponseMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
)
{
int32_t
schHandleResponseMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
int8_t
status
=
0
;
int8_t
status
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_TASK_ELOG
(
"rsp not processed cause of job status, job status:%d"
,
status
);
SCH_RET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
...
...
@@ -905,13 +895,13 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
switch
(
msgType
)
{
case
TDMT_VND_CREATE_TABLE_RSP
:
{
SCH_ERR_JRET
(
rspCode
);
SCH_ERR_RET
(
schProcessOnTaskSuccess
(
pJob
,
pTask
));
SCH_ERR_JRET
(
rspCode
);
SCH_ERR_RET
(
schProcessOnTaskSuccess
(
pJob
,
pTask
));
break
;
}
break
;
}
case
TDMT_VND_SUBMIT_RSP
:
{
#if 0 //
TODO OPEN THIS
#if 0 //
TODO OPEN THIS
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
...
...
@@ -919,77 +909,77 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
pJob->resNumOfRows += rsp->affectedRows;
#else
SCH_ERR_JRET
(
rspCode
);
#else
SCH_ERR_JRET
(
rspCode
);
SSubmitRsp
*
rsp
=
(
SSubmitRsp
*
)
msg
;
if
(
rsp
)
{
pJob
->
resNumOfRows
+=
rsp
->
affectedRows
;
}
#endif
SSubmitRsp
*
rsp
=
(
SSubmitRsp
*
)
msg
;
if
(
rsp
)
{
pJob
->
resNumOfRows
+=
rsp
->
affectedRows
;
}
#endif
SCH_ERR_RET
(
schProcessOnTaskSuccess
(
pJob
,
pTask
));
SCH_ERR_RET
(
schProcessOnTaskSuccess
(
pJob
,
pTask
));
break
;
}
break
;
}
case
TDMT_VND_QUERY_RSP
:
{
SQueryTableRsp
*
rsp
=
(
SQueryTableRsp
*
)
msg
;
SCH_ERR_JRET
(
rspCode
);
if
(
NULL
==
msg
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SCH_ERR_JRET
(
rsp
->
code
);
SCH_ERR_JRET
(
schBuildAndSendMsg
(
pJob
,
pTask
,
NULL
,
TDMT_VND_RES_READY
));
break
;
SQueryTableRsp
*
rsp
=
(
SQueryTableRsp
*
)
msg
;
SCH_ERR_JRET
(
rspCode
);
if
(
NULL
==
msg
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SCH_ERR_JRET
(
rsp
->
code
);
SCH_ERR_JRET
(
schBuildAndSendMsg
(
pJob
,
pTask
,
NULL
,
TDMT_VND_RES_READY
));
break
;
}
case
TDMT_VND_RES_READY_RSP
:
{
SResReadyRsp
*
rsp
=
(
SResReadyRsp
*
)
msg
;
SCH_ERR_JRET
(
rspCode
);
if
(
NULL
==
msg
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SCH_ERR_JRET
(
rsp
->
code
);
SCH_ERR_RET
(
schProcessOnTaskSuccess
(
pJob
,
pTask
));
break
;
SResReadyRsp
*
rsp
=
(
SResReadyRsp
*
)
msg
;
SCH_ERR_JRET
(
rspCode
);
if
(
NULL
==
msg
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
case
TDMT_VND_FETCH_RSP
:
{
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
msg
;
SCH_ERR_JRET
(
rsp
->
code
);
SCH_ERR_JRET
(
rspCode
);
if
(
NULL
==
msg
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
if
(
pJob
->
res
)
{
SCH_TASK_ELOG
(
"got fetch rsp while res already exists, res:%p"
,
pJob
->
res
);
tfree
(
rsp
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SCH_ERR_RET
(
schProcessOnTaskSuccess
(
pJob
,
pTask
));
break
;
}
case
TDMT_VND_FETCH_RSP
:
{
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
msg
;
atomic_store_ptr
(
&
pJob
->
res
,
rsp
);
atomic_add_fetch_32
(
&
pJob
->
resNumOfRows
,
htonl
(
rsp
->
numOfRows
));
SCH_ERR_JRET
(
rspCode
);
if
(
NULL
==
msg
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
if
(
rsp
->
completed
)
{
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_SUCCEED
);
}
if
(
pJob
->
res
)
{
SCH_TASK_ELOG
(
"got fetch rsp while res already exists, res:%p"
,
pJob
->
res
);
tfree
(
rsp
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SCH_TASK_DLOG
(
"got fetch rsp, rows:%d, complete:%d"
,
htonl
(
rsp
->
numOfRows
),
rsp
->
completed
);
atomic_store_ptr
(
&
pJob
->
res
,
rsp
);
atomic_add_fetch_32
(
&
pJob
->
resNumOfRows
,
htonl
(
rsp
->
numOfRows
));
schProcessOnDataFetched
(
pJob
);
break
;
if
(
rsp
->
completed
)
{
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_SUCCEED
)
;
}
SCH_TASK_DLOG
(
"got fetch rsp, rows:%d, complete:%d"
,
htonl
(
rsp
->
numOfRows
),
rsp
->
completed
);
schProcessOnDataFetched
(
pJob
);
break
;
}
case
TDMT_VND_DROP_TASK_RSP
:
{
// SHOULD NEVER REACH HERE
SCH_TASK_ELOG
(
"invalid status to handle drop task rsp, refId:%"
PRIx64
,
pJob
->
refId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
break
;
}
// SHOULD NEVER REACH HERE
SCH_TASK_ELOG
(
"invalid status to handle drop task rsp, refId:%"
PRIx64
,
pJob
->
refId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
break
;
}
default:
SCH_TASK_ELOG
(
"unknown rsp msg, type:%d, status:%d"
,
msgType
,
SCH_GET_TASK_STATUS
(
pTask
));
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
...
...
@@ -1002,15 +992,15 @@ _return:
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
));
}
int32_t
schHandleCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
msgType
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
int32_t
schHandleCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
msgType
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
SSchCallbackParam
*
pParam
=
(
SSchCallbackParam
*
)
param
;
SSchTask
*
pTask
=
NULL
;
SSchTask
*
pTask
=
NULL
;
SSchJob
*
pJob
=
schAcquireJob
(
pParam
->
refId
);
if
(
NULL
==
pJob
)
{
qError
(
"QID:0x%"
PRIx64
",TID:0x%"
PRIx64
"taosAcquireRef job failed, may be dropped, refId:%"
PRIx64
,
pParam
->
queryId
,
pParam
->
taskId
,
pParam
->
refId
);
qError
(
"QID:0x%"
PRIx64
",TID:0x%"
PRIx64
"taosAcquireRef job failed, may be dropped, refId:%"
PRIx64
,
pParam
->
queryId
,
pParam
->
taskId
,
pParam
->
refId
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_JOB_FREED
);
}
...
...
@@ -1028,8 +1018,8 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
pTask
=
*
task
;
SCH_TASK_DLOG
(
"rsp msg received, type:%s, code:%s"
,
TMSG_INFO
(
msgType
),
tstrerror
(
rspCode
));
pTask
->
handle
=
pMsg
->
handle
;
pTask
->
handle
=
pMsg
->
handle
;
SCH_ERR_JRET
(
schHandleResponseMsg
(
pJob
,
pTask
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
));
_return:
...
...
@@ -1042,42 +1032,41 @@ _return:
SCH_RET
(
code
);
}
int32_t
schHandleSubmitCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
int32_t
schHandleSubmitCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
return
schHandleCallback
(
param
,
pMsg
,
TDMT_VND_SUBMIT_RSP
,
code
);
}
int32_t
schHandleCreateTableCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
int32_t
schHandleCreateTableCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
return
schHandleCallback
(
param
,
pMsg
,
TDMT_VND_CREATE_TABLE_RSP
,
code
);
}
int32_t
schHandleQueryCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
int32_t
schHandleQueryCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
return
schHandleCallback
(
param
,
pMsg
,
TDMT_VND_QUERY_RSP
,
code
);
}
int32_t
schHandleFetchCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
int32_t
schHandleFetchCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
return
schHandleCallback
(
param
,
pMsg
,
TDMT_VND_FETCH_RSP
,
code
);
}
int32_t
schHandleReadyCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
int32_t
schHandleReadyCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
return
schHandleCallback
(
param
,
pMsg
,
TDMT_VND_RES_READY_RSP
,
code
);
}
int32_t
schHandleDropCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
int32_t
schHandleDropCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SSchCallbackParam
*
pParam
=
(
SSchCallbackParam
*
)
param
;
qDebug
(
"QID:%"
PRIx64
",TID:%"
PRIx64
" drop task rsp received, code:%x"
,
pParam
->
queryId
,
pParam
->
taskId
,
code
);
qDebug
(
"QID:%"
PRIx64
",TID:%"
PRIx64
" drop task rsp received, code:%x"
,
pParam
->
queryId
,
pParam
->
taskId
,
code
);
}
int32_t
schHandleHbCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
int32_t
schHandleHbCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
if
(
code
)
{
qError
(
"hb rsp error:%s"
,
tstrerror
(
code
));
SCH_ERR_RET
(
code
);
}
SSchedulerHbRsp
rsp
=
{
0
};
SSchCallbackParam
*
pParam
=
(
SSchCallbackParam
*
)
param
;
if
(
tDeserializeSSchedulerHbRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
rsp
))
{
qError
(
"invalid hb rsp msg, size:%d"
,
pMsg
->
len
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
...
...
@@ -1088,21 +1077,22 @@ int32_t schHandleHbCallback(void* param, const SDataBuf* pMsg, int32_t code) {
trans
.
seqId
=
rsp
.
seqId
;
trans
.
trans
.
transInst
=
pParam
->
transport
;
trans
.
trans
.
transHandle
=
pMsg
->
handle
;
SCH_RET
(
schUpdateHbConnection
(
&
rsp
.
epId
,
&
trans
));
}
int32_t
taskNum
=
(
int32_t
)
taosArrayGetSize
(
rsp
.
taskStatus
);
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
STaskStatus
*
taskStatus
=
taosArrayGet
(
rsp
.
taskStatus
,
i
);
SSchJob
*
pJob
=
schAcquireJob
(
taskStatus
->
refId
);
if
(
NULL
==
pJob
)
{
qWarn
(
"job not found, refId:0x%"
PRIx64
",QID:0x%"
PRIx64
",TID:0x%"
PRIx64
,
taskStatus
->
refId
,
taskStatus
->
queryId
,
taskStatus
->
taskId
);
//TODO DROP TASK FROM SERVER!!!!
qWarn
(
"job not found, refId:0x%"
PRIx64
",QID:0x%"
PRIx64
",TID:0x%"
PRIx64
,
taskStatus
->
refId
,
taskStatus
->
queryId
,
taskStatus
->
taskId
);
// TODO DROP TASK FROM SERVER!!!!
continue
;
}
// TODO
schReleaseJob
(
taskStatus
->
refId
);
...
...
@@ -1115,22 +1105,21 @@ _return:
SCH_RET
(
code
);
}
int32_t
schGetCallbackFp
(
int32_t
msgType
,
__async_send_cb_fn_t
*
fp
)
{
switch
(
msgType
)
{
case
TDMT_VND_CREATE_TABLE
:
*
fp
=
schHandleCreateTableCallback
;
break
;
case
TDMT_VND_SUBMIT
:
case
TDMT_VND_SUBMIT
:
*
fp
=
schHandleSubmitCallback
;
break
;
case
TDMT_VND_QUERY
:
case
TDMT_VND_QUERY
:
*
fp
=
schHandleQueryCallback
;
break
;
case
TDMT_VND_RES_READY
:
case
TDMT_VND_RES_READY
:
*
fp
=
schHandleReadyCallback
;
break
;
case
TDMT_VND_FETCH
:
case
TDMT_VND_FETCH
:
*
fp
=
schHandleFetchCallback
;
break
;
case
TDMT_VND_DROP_TASK
:
...
...
@@ -1147,13 +1136,13 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schAsyncSendMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
void
*
transport
,
SEpSet
*
epSet
,
int32_t
msgType
,
void
*
msg
,
uint32_t
msgSize
)
{
int32_t
schAsyncSendMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
void
*
transport
,
SEpSet
*
epSet
,
int32_t
msgType
,
void
*
msg
,
uint32_t
msgSize
)
{
int32_t
code
=
0
;
SSchTrans
*
trans
=
(
SSchTrans
*
)
transport
;
SMsgSendInfo
*
pMsgSendInfo
=
calloc
(
1
,
sizeof
(
SMsgSendInfo
));
SMsgSendInfo
*
pMsgSendInfo
=
calloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
NULL
==
pMsgSendInfo
)
{
SCH_TASK_ELOG
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SMsgSendInfo
));
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
...
@@ -1173,15 +1162,14 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet*
param
->
taskId
=
SCH_TASK_ID
(
pTask
);
param
->
transport
=
trans
->
transInst
;
pMsgSendInfo
->
param
=
param
;
pMsgSendInfo
->
msgInfo
.
pData
=
msg
;
pMsgSendInfo
->
msgInfo
.
len
=
msgSize
;
pMsgSendInfo
->
msgInfo
.
handle
=
trans
->
transHandle
;
pMsgSendInfo
->
msgInfo
.
handle
=
trans
->
transHandle
;
pMsgSendInfo
->
msgType
=
msgType
;
pMsgSendInfo
->
fp
=
fp
;
int64_t
transporterId
=
0
;
int64_t
transporterId
=
0
;
code
=
asyncSendMsgToServer
(
trans
->
transInst
,
epSet
,
&
transporterId
,
pMsgSendInfo
);
if
(
code
)
{
SCH_ERR_JRET
(
code
);
...
...
@@ -1191,7 +1179,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet*
return
TSDB_CODE_SUCCESS
;
_return:
tfree
(
param
);
tfree
(
pMsgSendInfo
);
SCH_RET
(
code
);
...
...
@@ -1199,9 +1187,9 @@ _return:
int32_t
schBuildAndSendMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
,
int32_t
msgType
)
{
uint32_t
msgSize
=
0
;
void
*
msg
=
NULL
;
int32_t
code
=
0
;
bool
isCandidateAddr
=
false
;
void
*
msg
=
NULL
;
int32_t
code
=
0
;
bool
isCandidateAddr
=
false
;
if
(
NULL
==
addr
)
{
addr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
pTask
->
candidateIdx
);
isCandidateAddr
=
true
;
...
...
@@ -1235,13 +1223,13 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SSubQueryMsg
*
pMsg
=
msg
;
pMsg
->
header
.
vgId
=
htonl
(
addr
->
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
pJob
->
queryId
);
pMsg
->
taskId
=
htobe64
(
pTask
->
taskId
);
pMsg
->
refId
=
htobe64
(
pJob
->
refId
);
pMsg
->
taskType
=
TASK_TYPE_TEMP
;
pMsg
->
phyLen
=
htonl
(
pTask
->
msgLen
);
pMsg
->
sqlLen
=
htonl
(
len
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
pJob
->
queryId
);
pMsg
->
taskId
=
htobe64
(
pTask
->
taskId
);
pMsg
->
refId
=
htobe64
(
pJob
->
refId
);
pMsg
->
taskType
=
TASK_TYPE_TEMP
;
pMsg
->
phyLen
=
htonl
(
pTask
->
msgLen
);
pMsg
->
sqlLen
=
htonl
(
len
);
memcpy
(
pMsg
->
msg
,
pJob
->
sql
,
len
);
memcpy
(
pMsg
->
msg
+
len
,
pTask
->
msg
,
pTask
->
msgLen
);
...
...
@@ -1257,12 +1245,12 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
}
SResReadyReq
*
pMsg
=
msg
;
pMsg
->
header
.
vgId
=
htonl
(
addr
->
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
header
.
vgId
=
htonl
(
addr
->
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
pJob
->
queryId
);
pMsg
->
taskId
=
htobe64
(
pTask
->
taskId
);
pMsg
->
taskId
=
htobe64
(
pTask
->
taskId
);
break
;
}
case
TDMT_VND_FETCH
:
{
...
...
@@ -1272,32 +1260,32 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_TASK_ELOG
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SResFetchReq
*
pMsg
=
msg
;
pMsg
->
header
.
vgId
=
htonl
(
addr
->
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
header
.
vgId
=
htonl
(
addr
->
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
pJob
->
queryId
);
pMsg
->
taskId
=
htobe64
(
pTask
->
taskId
);
pMsg
->
taskId
=
htobe64
(
pTask
->
taskId
);
break
;
}
case
TDMT_VND_DROP_TASK
:{
case
TDMT_VND_DROP_TASK
:
{
msgSize
=
sizeof
(
STaskDropReq
);
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
SCH_TASK_ELOG
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
STaskDropReq
*
pMsg
=
msg
;
pMsg
->
header
.
vgId
=
htonl
(
addr
->
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
header
.
vgId
=
htonl
(
addr
->
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
pJob
->
queryId
);
pMsg
->
taskId
=
htobe64
(
pTask
->
taskId
);
pMsg
->
refId
=
htobe64
(
pJob
->
refId
);
pMsg
->
taskId
=
htobe64
(
pTask
->
taskId
);
pMsg
->
refId
=
htobe64
(
pJob
->
refId
);
break
;
}
case
TDMT_VND_QUERY_HEARTBEAT
:
{
...
...
@@ -1337,24 +1325,24 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
if
(
isCandidateAddr
)
{
SCH_ERR_RET
(
schRecordTaskExecNode
(
pJob
,
pTask
,
addr
));
}
return
TSDB_CODE_SUCCESS
;
_return:
SCH_SET_TASK_LASTMSG_TYPE
(
pTask
,
-
1
);
tfree
(
msg
);
SCH_RET
(
code
);
}
int32_t
schEnsureHbConnection
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
SQueryNodeAddr
*
addr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
pTask
->
candidateIdx
);
SQueryNodeEpId
epId
=
{
0
};
SQueryNodeEpId
epId
=
{
0
};
epId
.
nodeId
=
addr
->
nodeId
;
memcpy
(
&
epId
.
ep
,
SCH_GET_CUR_EP
(
addr
),
sizeof
(
SEp
));
SSchHbTrans
*
hb
=
taosHashGet
(
schMgmt
.
hbConnections
,
&
epId
,
sizeof
(
SQueryNodeEpId
));
if
(
NULL
==
hb
)
{
SCH_ERR_RET
(
schBuildAndSendMsg
(
pJob
,
NULL
,
addr
,
TDMT_VND_QUERY_HEARTBEAT
));
...
...
@@ -1364,29 +1352,30 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
}
int32_t
schLaunchTaskImpl
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
int8_t
status
=
0
;
int8_t
status
=
0
;
int32_t
code
=
0
;
atomic_add_fetch_32
(
&
pTask
->
level
->
taskLaunchedNum
,
1
);
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_TASK_DLOG
(
"no need to launch task cause of job status, job status:%d"
,
status
);
SCH_RET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
SSubplan
*
plan
=
pTask
->
plan
;
if
(
NULL
==
pTask
->
msg
)
{
// TODO add more detailed reason for failure
if
(
NULL
==
pTask
->
msg
)
{
// TODO add more detailed reason for failure
code
=
qSubPlanToString
(
plan
,
&
pTask
->
msg
,
&
pTask
->
msgLen
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
SCH_TASK_ELOG
(
"failed to create physical plan, code:%s, msg:%p, len:%d"
,
tstrerror
(
code
),
pTask
->
msg
,
pTask
->
msgLen
);
SCH_TASK_ELOG
(
"failed to create physical plan, code:%s, msg:%p, len:%d"
,
tstrerror
(
code
),
pTask
->
msg
,
pTask
->
msgLen
);
SCH_ERR_RET
(
code
);
}
else
{
SCH_TASK_DLOG
(
"physical plan len:%d, %s"
,
pTask
->
msgLen
,
pTask
->
msg
);
}
}
SCH_ERR_RET
(
schSetTaskCandidateAddrs
(
pJob
,
pTask
));
// NOTE: race condition: the task should be put into the hash table before send msg to server
...
...
@@ -1398,15 +1387,15 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
if
(
SCH_IS_QUERY_JOB
(
pJob
))
{
SCH_ERR_RET
(
schEnsureHbConnection
(
pJob
,
pTask
));
}
SCH_ERR_RET
(
schBuildAndSendMsg
(
pJob
,
pTask
,
NULL
,
plan
->
msgType
));
return
TSDB_CODE_SUCCESS
;
}
// Note: no more error processing, handled in function internal
int32_t
schLaunchTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
bool
enough
=
false
;
bool
enough
=
false
;
int32_t
code
=
0
;
if
(
SCH_TASK_NEED_FLOW_CTRL
(
pJob
,
pTask
))
{
...
...
@@ -1436,11 +1425,9 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schLaunchJob
(
SSchJob
*
pJob
)
{
SSchLevel
*
level
=
taosArrayGet
(
pJob
->
levels
,
pJob
->
levelIdx
);
SCH_ERR_RET
(
schCheckAndUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_EXECUTING
));
SCH_ERR_RET
(
schCheckJobNeedFlowCtrl
(
pJob
,
level
));
...
...
@@ -1457,7 +1444,7 @@ void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
}
int32_t
size
=
(
int32_t
)
taosArrayGetSize
(
pTask
->
execAddrs
);
if
(
size
<=
0
)
{
SCH_TASK_DLOG
(
"task has no exec address, no need to drop it, status:%d"
,
SCH_GET_TASK_STATUS
(
pTask
));
return
;
...
...
@@ -1481,9 +1468,9 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
if
(
!
SCH_TASK_NO_NEED_DROP
(
pTask
))
{
schDropTaskOnExecutedNode
(
pJob
,
pTask
);
}
pIter
=
taosHashIterate
(
list
,
pIter
);
}
}
}
void
schDropJobAllTasks
(
SSchJob
*
pJob
)
{
...
...
@@ -1493,10 +1480,9 @@ void schDropJobAllTasks(SSchJob *pJob) {
}
int32_t
schCancelJob
(
SSchJob
*
pJob
)
{
//TODO
//TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
// TODO
// TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
}
void
schFreeJobImpl
(
void
*
job
)
{
...
...
@@ -1506,7 +1492,7 @@ void schFreeJobImpl(void *job) {
SSchJob
*
pJob
=
job
;
uint64_t
queryId
=
pJob
->
queryId
;
int64_t
refId
=
pJob
->
refId
;
int64_t
refId
=
pJob
->
refId
;
if
(
pJob
->
status
==
JOB_TASK_STATUS_EXECUTING
)
{
schCancelJob
(
pJob
);
...
...
@@ -1514,55 +1500,55 @@ void schFreeJobImpl(void *job) {
schDropJobAllTasks
(
pJob
);
pJob
->
subPlans
=
NULL
;
// it is a reference to pDag->pSubplans
pJob
->
subPlans
=
NULL
;
// it is a reference to pDag->pSubplans
int32_t
numOfLevels
=
taosArrayGetSize
(
pJob
->
levels
);
for
(
int32_t
i
=
0
;
i
<
numOfLevels
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfLevels
;
++
i
)
{
SSchLevel
*
pLevel
=
taosArrayGet
(
pJob
->
levels
,
i
);
schFreeFlowCtrl
(
pLevel
);
int32_t
numOfTasks
=
taosArrayGetSize
(
pLevel
->
subTasks
);
for
(
int32_t
j
=
0
;
j
<
numOfTasks
;
++
j
)
{
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
j
);
for
(
int32_t
j
=
0
;
j
<
numOfTasks
;
++
j
)
{
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
j
);
schFreeTask
(
pTask
);
}
taosArrayDestroy
(
pLevel
->
subTasks
);
}
taosHashCleanup
(
pJob
->
execTasks
);
taosHashCleanup
(
pJob
->
failTasks
);
taosHashCleanup
(
pJob
->
succTasks
);
taosArrayDestroy
(
pJob
->
levels
);
taosArrayDestroy
(
pJob
->
nodeList
);
tfree
(
pJob
->
res
);
tfree
(
pJob
);
qDebug
(
"QID:0x%"
PRIx64
" job freed, refId:%"
PRIx64
", pointer:%p"
,
queryId
,
refId
,
pJob
);
qDebug
(
"QID:0x%"
PRIx64
" job freed, refId:%"
PRIx64
", pointer:%p"
,
queryId
,
refId
,
pJob
);
}
static
int32_t
schExecJobImpl
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
bool
syncSchedule
)
{
qDebug
(
"QID:0x%"
PRIx64
" job started"
,
pDag
->
queryId
);
static
int32_t
schExecJobImpl
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
bool
syncSchedule
)
{
qDebug
(
"QID:0x%"
PRIx64
" job started"
,
pDag
->
queryId
);
if
(
pNodeList
==
NULL
||
(
pNodeList
&&
taosArrayGetSize
(
pNodeList
)
<=
0
))
{
qDebug
(
"QID:0x%"
PRIx64
" input exec nodeList is empty"
,
pDag
->
queryId
);
qDebug
(
"QID:0x%"
PRIx64
" input exec nodeList is empty"
,
pDag
->
queryId
);
}
int32_t
code
=
0
;
int32_t
code
=
0
;
SSchJob
*
pJob
=
calloc
(
1
,
sizeof
(
SSchJob
));
if
(
NULL
==
pJob
)
{
qError
(
"QID:%"
PRIx64
" calloc %d failed"
,
pDag
->
queryId
,
(
int32_t
)
sizeof
(
SSchJob
));
qError
(
"QID:%"
PRIx64
" calloc %d failed"
,
pDag
->
queryId
,
(
int32_t
)
sizeof
(
SSchJob
));
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
pJob
->
attr
.
syncSchedule
=
syncSchedule
;
pJob
->
transport
=
transport
;
pJob
->
sql
=
sql
;
pJob
->
sql
=
sql
;
if
(
pNodeList
!=
NULL
)
{
pJob
->
nodeList
=
taosArrayDup
(
pNodeList
);
...
...
@@ -1570,19 +1556,22 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan* pD
SCH_ERR_JRET
(
schValidateAndBuildJob
(
pDag
,
pJob
));
pJob
->
execTasks
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
pJob
->
execTasks
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pJob
->
execTasks
)
{
SCH_JOB_ELOG
(
"taosHashInit %d execTasks failed"
,
pDag
->
numOfSubplans
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
pJob
->
succTasks
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
pJob
->
succTasks
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pJob
->
succTasks
)
{
SCH_JOB_ELOG
(
"taosHashInit %d succTasks failed"
,
pDag
->
numOfSubplans
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
pJob
->
failTasks
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
pJob
->
failTasks
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pJob
->
failTasks
)
{
SCH_JOB_ELOG
(
"taosHashInit %d failTasks failed"
,
pDag
->
numOfSubplans
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
...
@@ -1602,9 +1591,9 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan* pD
SCH_ERR_JRET
(
schLaunchJob
(
pJob
));
schAcquireJob
(
pJob
->
refId
);
*
job
=
pJob
->
refId
;
if
(
syncSchedule
)
{
SCH_JOB_DLOG
(
"will wait for rsp now, job status:%d"
,
SCH_GET_JOB_STATUS
(
pJob
));
tsem_wait
(
&
pJob
->
rspSem
);
...
...
@@ -1613,7 +1602,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan* pD
SCH_JOB_DLOG
(
"job exec done, job status:%d"
,
SCH_GET_JOB_STATUS
(
pJob
));
schReleaseJob
(
pJob
->
refId
);
return
TSDB_CODE_SUCCESS
;
_return:
...
...
@@ -1622,7 +1611,6 @@ _return:
SCH_RET
(
code
);
}
int32_t
schedulerInit
(
SSchedulerCfg
*
cfg
)
{
if
(
schMgmt
.
jobRef
)
{
qError
(
"scheduler already initialized"
);
...
...
@@ -1631,7 +1619,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
if
(
cfg
)
{
schMgmt
.
cfg
=
*
cfg
;
if
(
schMgmt
.
cfg
.
maxJobNum
==
0
)
{
schMgmt
.
cfg
.
maxJobNum
=
SCHEDULE_DEFAULT_MAX_JOB_NUM
;
}
...
...
@@ -1642,7 +1630,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
schMgmt
.
cfg
.
maxJobNum
=
SCHEDULE_DEFAULT_MAX_JOB_NUM
;
schMgmt
.
cfg
.
maxNodeTableNum
=
SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM
;
}
schMgmt
.
jobRef
=
taosOpenRef
(
schMgmt
.
cfg
.
maxJobNum
,
schFreeJobImpl
);
if
(
schMgmt
.
jobRef
<
0
)
{
qError
(
"init schduler jobRef failed, num:%u"
,
schMgmt
.
cfg
.
maxJobNum
);
...
...
@@ -1660,12 +1648,13 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
SCH_ERR_RET
(
TSDB_CODE_QRY_SYS_ERROR
);
}
qInfo
(
"scheduler %"
PRIx64
" initizlized, maxJob:%u"
,
schMgmt
.
sId
,
schMgmt
.
cfg
.
maxJobNum
);
qInfo
(
"scheduler %"
PRIx64
" initizlized, maxJob:%u"
,
schMgmt
.
sId
,
schMgmt
.
cfg
.
maxJobNum
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
schedulerExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
SQueryResult
*
pRes
)
{
int32_t
schedulerExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
SQueryResult
*
pRes
)
{
if
(
NULL
==
transport
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
||
NULL
==
pRes
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
...
...
@@ -1676,20 +1665,21 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan* pDag, in
pRes
->
code
=
atomic_load_32
(
&
job
->
errCode
);
pRes
->
numOfRows
=
job
->
resNumOfRows
;
schReleaseJob
(
*
pJob
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
schedulerAsyncExecJob
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
const
char
*
sql
,
int64_t
*
pJob
)
{
int32_t
schedulerAsyncExecJob
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
const
char
*
sql
,
int64_t
*
pJob
)
{
if
(
NULL
==
transport
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SCH_ERR_RET
(
schExecJobImpl
(
transport
,
pNodeList
,
pDag
,
pJob
,
sql
,
false
));
return
TSDB_CODE_SUCCESS
;
}
#if 0
int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) {
if (NULL == pDag || pDag->numOfSubplans <= 0 || LIST_LENGTH(pDag->pSubplans) == 0) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
...
...
@@ -1810,14 +1800,14 @@ _return:
SCH_RET(code);
}
#endif
int32_t
schedulerFetchRows
(
int64_t
job
,
void
**
pData
)
{
int32_t
schedulerFetchRows
(
int64_t
job
,
void
**
pData
)
{
if
(
NULL
==
pData
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
code
=
0
;
int32_t
code
=
0
;
SSchJob
*
pJob
=
schAcquireJob
(
job
);
if
(
NULL
==
pJob
)
{
qError
(
"acquire job from jobRef list failed, may be dropped, refId:%"
PRIx64
,
job
);
...
...
@@ -1861,12 +1851,11 @@ int32_t schedulerFetchRows(int64_t job, void** pData) {
SCH_JOB_ELOG
(
"job failed or dropping, status:%d"
,
status
);
SCH_ERR_JRET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
if
(
pJob
->
res
&&
((
SRetrieveTableRsp
*
)
pJob
->
res
)
->
completed
)
{
SCH_ERR_JRET
(
schCheckAndUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_SUCCEED
));
}
while
(
true
)
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
res
);
if
(
*
pData
!=
atomic_val_compare_exchange_ptr
(
&
pJob
->
res
,
*
pData
,
NULL
))
{
...
...
@@ -1891,7 +1880,7 @@ int32_t schedulerFetchRows(int64_t job, void** pData) {
_return:
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
1
,
0
);
schReleaseJob
(
job
);
SCH_RET
(
code
);
...
...
@@ -1944,17 +1933,17 @@ void schedulerFreeTaskList(SArray *taskList) {
taosArrayDestroy
(
taskList
);
}
void
schedulerDestroy
(
void
)
{
if
(
schMgmt
.
jobRef
)
{
SSchJob
*
pJob
=
taosIterateRef
(
schMgmt
.
jobRef
,
0
);
while
(
pJob
)
{
taosRemoveRef
(
schMgmt
.
jobRef
,
pJob
->
refId
);
pJob
=
taosIterateRef
(
schMgmt
.
jobRef
,
pJob
->
refId
);
}
taosCloseRef
(
schMgmt
.
jobRef
);
schMgmt
.
jobRef
=
0
;
}
...
...
source/util/src/tuuid.c
0 → 100644
浏览文件 @
eced27c4
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tuuid.h"
static
int64_t
hashId
=
0
;
static
int32_t
SerialNo
=
0
;
int32_t
tGenIdPI32
(
void
)
{
if
(
hashId
==
0
)
{
char
uid
[
64
];
int32_t
code
=
taosGetSystemUUID
(
uid
,
tListLen
(
uid
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
else
{
hashId
=
MurmurHash3_32
(
uid
,
strlen
(
uid
));
}
}
int64_t
ts
=
taosGetTimestampMs
();
uint64_t
pid
=
taosGetPId
();
int32_t
val
=
atomic_add_fetch_32
(
&
SerialNo
,
1
);
int32_t
id
=
((
hashId
&
0x1F
)
<<
26
)
|
((
pid
&
0x3F
)
<<
20
)
|
((
ts
&
0xFFF
)
<<
8
)
|
(
val
&
0xFF
);
return
id
;
}
int64_t
tGenIdPI64
(
void
)
{
if
(
hashId
==
0
)
{
char
uid
[
64
];
int32_t
code
=
taosGetSystemUUID
(
uid
,
tListLen
(
uid
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
else
{
hashId
=
MurmurHash3_32
(
uid
,
strlen
(
uid
));
}
}
int64_t
ts
=
taosGetTimestampMs
();
uint64_t
pid
=
taosGetPId
();
int32_t
val
=
atomic_add_fetch_32
(
&
SerialNo
,
1
);
int64_t
id
=
((
hashId
&
0x07FF
)
<<
52
)
|
((
pid
&
0x0FFF
)
<<
40
)
|
((
ts
&
0xFFFFFF
)
<<
16
)
|
(
val
&
0xFFFF
);
return
id
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录