Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
54573927
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
54573927
编写于
1月 07, 2022
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/feature/qnode' into feature/3.0_wxy
上级
fda3f090
797d0891
变更
7
显示空白变更内容
内联
并排
Showing
7 changed file
with
397 addition
and
297 deletion
+397
-297
include/libs/qcom/query.h
include/libs/qcom/query.h
+2
-1
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+1
-1
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+10
-0
source/libs/parser/src/astValidate.c
source/libs/parser/src/astValidate.c
+34
-1
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+11
-6
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+32
-23
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+307
-265
未找到文件。
include/libs/qcom/query.h
浏览文件 @
54573927
...
@@ -91,9 +91,10 @@ enum {
...
@@ -91,9 +91,10 @@ enum {
META_TYPE_NON_TABLE
=
1
,
META_TYPE_NON_TABLE
=
1
,
META_TYPE_CTABLE
,
META_TYPE_CTABLE
,
META_TYPE_TABLE
,
META_TYPE_TABLE
,
META_TYPE_BOTH_TABLE
,
META_TYPE_BOTH_TABLE
};
};
typedef
struct
STableMetaOutput
{
typedef
struct
STableMetaOutput
{
int32_t
metaType
;
int32_t
metaType
;
char
ctbFname
[
TSDB_TABLE_FNAME_LEN
];
char
ctbFname
[
TSDB_TABLE_FNAME_LEN
];
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
54573927
...
@@ -24,7 +24,7 @@ extern "C" {
...
@@ -24,7 +24,7 @@ extern "C" {
#include "catalog.h"
#include "catalog.h"
typedef
struct
SSchedulerCfg
{
typedef
struct
SSchedulerCfg
{
int32_t
maxJobNum
;
u
int32_t
maxJobNum
;
}
SSchedulerCfg
;
}
SSchedulerCfg
;
typedef
struct
SQueryProfileSummary
{
typedef
struct
SQueryProfileSummary
{
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
54573927
...
@@ -1226,6 +1226,16 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
...
@@ -1226,6 +1226,16 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
tNameGetFullDbName
(
pTableName
,
db
);
tNameGetFullDbName
(
pTableName
,
db
);
CTG_ERR_JRET
(
ctgGetDBVgroup
(
pCatalog
,
pRpc
,
pMgmtEps
,
db
,
false
,
&
dbVgroup
));
CTG_ERR_JRET
(
ctgGetDBVgroup
(
pCatalog
,
pRpc
,
pMgmtEps
,
db
,
false
,
&
dbVgroup
));
// REMOEV THIS ....
if
(
0
==
tbMeta
->
vgId
)
{
SVgroupInfo
vgroup
=
{
0
};
catalogGetTableHashVgroup
(
pCatalog
,
pRpc
,
pMgmtEps
,
pTableName
,
&
vgroup
);
tbMeta
->
vgId
=
vgroup
.
vgId
;
}
// REMOVE THIS ....
if
(
tbMeta
->
tableType
==
TSDB_SUPER_TABLE
)
{
if
(
tbMeta
->
tableType
==
TSDB_SUPER_TABLE
)
{
CTG_ERR_JRET
(
ctgGetVgInfoFromDB
(
pCatalog
,
pRpc
,
pMgmtEps
,
dbVgroup
,
pVgroupList
));
CTG_ERR_JRET
(
ctgGetVgInfoFromDB
(
pCatalog
,
pRpc
,
pMgmtEps
,
dbVgroup
,
pVgroupList
));
}
else
{
}
else
{
...
...
source/libs/parser/src/astValidate.c
浏览文件 @
54573927
...
@@ -3628,6 +3628,33 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf)
...
@@ -3628,6 +3628,33 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf)
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
setTableVgroupList
(
SParseBasicCtx
*
pCtx
,
SName
*
name
,
SVgroupsInfo
**
pVgList
)
{
SArray
*
vgroupList
=
NULL
;
int32_t
code
=
catalogGetTableDistVgroup
(
pCtx
->
pCatalog
,
pCtx
->
pTransporter
,
&
pCtx
->
mgmtEpSet
,
name
,
&
vgroupList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
int32_t
vgroupNum
=
taosArrayGetSize
(
vgroupList
);
SVgroupsInfo
*
vgList
=
calloc
(
1
,
sizeof
(
SVgroupsInfo
)
+
sizeof
(
SVgroupMsg
)
*
vgroupNum
);
vgList
->
numOfVgroups
=
vgroupNum
;
for
(
int32_t
i
=
0
;
i
<
vgroupNum
;
++
i
)
{
SVgroupInfo
*
vg
=
taosArrayGet
(
vgroupList
,
i
);
vgList
->
vgroups
[
i
].
vgId
=
vg
->
vgId
;
vgList
->
vgroups
[
i
].
numOfEps
=
vg
->
numOfEps
;
memcpy
(
vgList
->
vgroups
[
i
].
epAddr
,
vg
->
epAddr
,
sizeof
(
vgList
->
vgroups
[
i
].
epAddr
));
}
*
pVgList
=
vgList
;
taosArrayDestroy
(
vgroupList
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qParserValidateSqlNode
(
SParseBasicCtx
*
pCtx
,
SSqlInfo
*
pInfo
,
SQueryStmtInfo
*
pQueryInfo
,
char
*
msgBuf
,
int32_t
msgBufLen
)
{
int32_t
qParserValidateSqlNode
(
SParseBasicCtx
*
pCtx
,
SSqlInfo
*
pInfo
,
SQueryStmtInfo
*
pQueryInfo
,
char
*
msgBuf
,
int32_t
msgBufLen
)
{
assert
(
pCtx
!=
NULL
&&
pInfo
!=
NULL
);
assert
(
pCtx
!=
NULL
&&
pInfo
!=
NULL
);
int32_t
code
=
0
;
int32_t
code
=
0
;
...
@@ -3926,6 +3953,12 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt
...
@@ -3926,6 +3953,12 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt
pQueryInfo
->
pTableMetaInfo
[
0
]
->
name
=
*
name
;
pQueryInfo
->
pTableMetaInfo
[
0
]
->
name
=
*
name
;
pQueryInfo
->
numOfTables
=
1
;
pQueryInfo
->
numOfTables
=
1
;
code
=
setTableVgroupList
(
pCtx
,
name
,
&
pQueryInfo
->
pTableMetaInfo
[
0
]
->
vgroupList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
taosArrayDestroy
(
data
.
pTableMeta
);
return
code
;
}
// evaluate the sqlnode
// evaluate the sqlnode
STableMeta
*
pTableMeta
=
(
STableMeta
*
)
taosArrayGetP
(
data
.
pTableMeta
,
0
);
STableMeta
*
pTableMeta
=
(
STableMeta
*
)
taosArrayGetP
(
data
.
pTableMeta
,
0
);
assert
(
pTableMeta
!=
NULL
);
assert
(
pTableMeta
!=
NULL
);
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
54573927
...
@@ -160,9 +160,6 @@ static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableI
...
@@ -160,9 +160,6 @@ static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableI
return
(
SPhyNode
*
)
node
;
return
(
SPhyNode
*
)
node
;
}
}
static
SPhyNode
*
createSingleTableScanNode
(
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
)
{
return
createUserTableScanNode
(
pPlanNode
,
pTable
,
OP_TableScan
);
}
static
bool
isSystemTable
(
SQueryTableInfo
*
pTable
)
{
static
bool
isSystemTable
(
SQueryTableInfo
*
pTable
)
{
// todo
// todo
...
@@ -259,12 +256,20 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) {
...
@@ -259,12 +256,20 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) {
return
(
TSDB_SUPER_TABLE
==
pTable
->
pMeta
->
pTableMeta
->
tableType
);
return
(
TSDB_SUPER_TABLE
==
pTable
->
pMeta
->
pTableMeta
->
tableType
);
}
}
static
SPhyNode
*
createSingleTableScanNode
(
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
,
SSubplan
*
subplan
)
{
vgroupMsgToEpSet
(
&
(
pTable
->
pMeta
->
vgroupList
->
vgroups
[
0
]),
&
subplan
->
execNode
);
return
createUserTableScanNode
(
pPlanNode
,
pTable
,
OP_TableScan
);
}
static
SPhyNode
*
createTableScanNode
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
)
{
static
SPhyNode
*
createTableScanNode
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
)
{
SQueryTableInfo
*
pTable
=
(
SQueryTableInfo
*
)
pPlanNode
->
pExtInfo
;
SQueryTableInfo
*
pTable
=
(
SQueryTableInfo
*
)
pPlanNode
->
pExtInfo
;
if
(
needMultiNodeScan
(
pTable
))
{
if
(
needMultiNodeScan
(
pTable
))
{
return
createExchangeNode
(
pCxt
,
pPlanNode
,
splitSubplanByTable
(
pCxt
,
pPlanNode
,
pTable
));
return
createExchangeNode
(
pCxt
,
pPlanNode
,
splitSubplanByTable
(
pCxt
,
pPlanNode
,
pTable
));
}
}
return
createSingleTableScanNode
(
pPlanNode
,
pTable
);
return
createSingleTableScanNode
(
pPlanNode
,
pTable
,
pCxt
->
pCurrentSubplan
);
}
}
static
SPhyNode
*
createPhyNode
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
)
{
static
SPhyNode
*
createPhyNode
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
)
{
...
@@ -322,7 +327,7 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
...
@@ -322,7 +327,7 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
if
(
QNODE_MODIFY
==
pRoot
->
info
.
type
)
{
if
(
QNODE_MODIFY
==
pRoot
->
info
.
type
)
{
splitModificationOpSubPlan
(
pCxt
,
pRoot
);
splitModificationOpSubPlan
(
pCxt
,
pRoot
);
}
else
{
}
else
{
SSubplan
*
subplan
=
initSubplan
(
pCxt
,
QUERY_TYPE_
MERGE
);
SSubplan
*
subplan
=
initSubplan
(
pCxt
,
QUERY_TYPE_
SCAN
);
++
(
pCxt
->
nextId
.
templateId
);
++
(
pCxt
->
nextId
.
templateId
);
subplan
->
msgType
=
TDMT_VND_QUERY
;
subplan
->
msgType
=
TDMT_VND_QUERY
;
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
54573927
...
@@ -37,8 +37,8 @@ enum {
...
@@ -37,8 +37,8 @@ enum {
};
};
typedef
struct
SSchedulerMgmt
{
typedef
struct
SSchedulerMgmt
{
uint64_t
taskId
;
uint64_t
taskId
;
// sequential taksId
uint64_t
sId
;
uint64_t
sId
;
// schedulerId
SSchedulerCfg
cfg
;
SSchedulerCfg
cfg
;
SHashObj
*
jobs
;
// key: queryId, value: SQueryJob*
SHashObj
*
jobs
;
// key: queryId, value: SQueryJob*
}
SSchedulerMgmt
;
}
SSchedulerMgmt
;
...
@@ -83,52 +83,61 @@ typedef struct SSchJobAttr {
...
@@ -83,52 +83,61 @@ typedef struct SSchJobAttr {
typedef
struct
SSchJob
{
typedef
struct
SSchJob
{
uint64_t
queryId
;
uint64_t
queryId
;
int32_t
levelNum
;
int32_t
levelIdx
;
int8_t
status
;
SSchJobAttr
attr
;
SSchJobAttr
attr
;
SEpSet
dataSrcEps
;
int32_t
levelNum
;
SEpAddr
resEp
;
void
*
transport
;
void
*
transport
;
SArray
*
nodeList
;
// qnode/vnode list, element is SQueryNodeAddr
SArray
*
nodeList
;
// qnode/vnode list, element is SQueryNodeAddr
SArray
*
levels
;
// Element is SQueryLevel, starting from 0. SArray<SSchLevel>
SArray
*
subPlans
;
// subplan pointer copied from DAG, no need to free it in scheduler
int32_t
levelIdx
;
SEpSet
dataSrcEps
;
SHashObj
*
execTasks
;
// executing tasks, key:taskid, value:SQueryTask*
SHashObj
*
succTasks
;
// succeed tasks, key:taskid, value:SQueryTask*
SHashObj
*
failTasks
;
// failed tasks, key:taskid, value:SQueryTask*
int8_t
status
;
SQueryNodeAddr
resNode
;
tsem_t
rspSem
;
tsem_t
rspSem
;
int32_t
userFetch
;
int32_t
userFetch
;
int32_t
remoteFetch
;
int32_t
remoteFetch
;
SSchTask
*
fetchTask
;
SSchTask
*
fetchTask
;
int32_t
errCode
;
int32_t
errCode
;
void
*
res
;
void
*
res
;
int32_t
resNumOfRows
;
int32_t
resNumOfRows
;
SHashObj
*
execTasks
;
// executing tasks, key:taskid, value:SQueryTask*
SHashObj
*
succTasks
;
// succeed tasks, key:taskid, value:SQueryTask*
SHashObj
*
failTasks
;
// failed tasks, key:taskid, value:SQueryTask*
SArray
*
levels
;
// Element is SQueryLevel, starting from 0. SArray<SSchLevel>
SArray
*
subPlans
;
// Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. SArray<void*>
SQueryProfileSummary
summary
;
SQueryProfileSummary
summary
;
}
SSchJob
;
}
SSchJob
;
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
#define SCH_TASK_READY_TO_LUNCH(task) (
(task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE
#define SCH_TASK_READY_TO_LUNCH(task) (
atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children))
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__)
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY)
#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob)
#define SCH_JOB_ELOG(param, ...) qError("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
extern
int32_t
schLaunchTask
(
SSchJob
*
job
,
SSchTask
*
task
);
static
int32_t
schLaunchTask
(
SSchJob
*
job
,
SSchTask
*
task
);
extern
int32_t
schBuildAndSendMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
);
static
int32_t
schBuildAndSendMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
54573927
...
@@ -20,203 +20,196 @@
...
@@ -20,203 +20,196 @@
static
SSchedulerMgmt
schMgmt
=
{
0
};
static
SSchedulerMgmt
schMgmt
=
{
0
};
int32_t
schBuildTaskRalation
(
SSchJob
*
j
ob
,
SHashObj
*
planToTask
)
{
int32_t
schBuildTaskRalation
(
SSchJob
*
pJ
ob
,
SHashObj
*
planToTask
)
{
for
(
int32_t
i
=
0
;
i
<
j
ob
->
levelNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pJ
ob
->
levelNum
;
++
i
)
{
SSchLevel
*
level
=
taosArrayGet
(
j
ob
->
levels
,
i
);
SSchLevel
*
pLevel
=
taosArrayGet
(
pJ
ob
->
levels
,
i
);
for
(
int32_t
m
=
0
;
m
<
l
evel
->
taskNum
;
++
m
)
{
for
(
int32_t
m
=
0
;
m
<
pL
evel
->
taskNum
;
++
m
)
{
SSchTask
*
task
=
taosArrayGet
(
l
evel
->
subTasks
,
m
);
SSchTask
*
pTask
=
taosArrayGet
(
pL
evel
->
subTasks
,
m
);
SSubplan
*
p
lan
=
t
ask
->
plan
;
SSubplan
*
p
Plan
=
pT
ask
->
plan
;
int32_t
childNum
=
p
lan
->
pChildren
?
(
int32_t
)
taosArrayGetSize
(
p
lan
->
pChildren
)
:
0
;
int32_t
childNum
=
p
Plan
->
pChildren
?
(
int32_t
)
taosArrayGetSize
(
pP
lan
->
pChildren
)
:
0
;
int32_t
parentNum
=
p
lan
->
pParents
?
(
int32_t
)
taosArrayGetSize
(
p
lan
->
pParents
)
:
0
;
int32_t
parentNum
=
p
Plan
->
pParents
?
(
int32_t
)
taosArrayGetSize
(
pP
lan
->
pParents
)
:
0
;
if
(
childNum
>
0
)
{
if
(
childNum
>
0
)
{
task
->
children
=
taosArrayInit
(
childNum
,
POINTER_BYTES
);
if
(
pJob
->
levelIdx
==
pLevel
->
level
)
{
if
(
NULL
==
task
->
children
)
{
SCH_JOB_ELOG
(
"invalid query plan, lowest level, childNum:%d"
,
childNum
);
qError
(
"taosArrayInit %d failed"
,
childNum
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
pTask
->
children
=
taosArrayInit
(
childNum
,
POINTER_BYTES
);
if
(
NULL
==
pTask
->
children
)
{
SCH_TASK_ELOG
(
"taosArrayInit %d children failed"
,
childNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
}
}
for
(
int32_t
n
=
0
;
n
<
childNum
;
++
n
)
{
for
(
int32_t
n
=
0
;
n
<
childNum
;
++
n
)
{
SSubplan
**
child
=
taosArrayGet
(
plan
->
pChildren
,
n
);
SSubplan
**
child
=
taosArrayGet
(
p
P
lan
->
pChildren
,
n
);
SSchTask
**
childTask
=
taosHashGet
(
planToTask
,
child
,
POINTER_BYTES
);
SSchTask
**
childTask
=
taosHashGet
(
planToTask
,
child
,
POINTER_BYTES
);
if
(
NULL
==
childTask
||
NULL
==
*
childTask
)
{
if
(
NULL
==
childTask
||
NULL
==
*
childTask
)
{
qError
(
"subpla
n relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_TASK_ELOG
(
"subplan childre
n relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
}
if
(
NULL
==
taosArrayPush
(
t
ask
->
children
,
childTask
))
{
if
(
NULL
==
taosArrayPush
(
pT
ask
->
children
,
childTask
))
{
qError
(
"taosArrayPush failed"
);
SCH_TASK_ELOG
(
"taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
}
}
if
(
parentNum
>
0
)
{
if
(
parentNum
>
0
)
{
task
->
parents
=
taosArrayInit
(
parentNum
,
POINTER_BYTES
);
if
(
0
==
pLevel
->
level
)
{
if
(
NULL
==
task
->
parents
)
{
SCH_TASK_ELOG
(
"invalid task info, level:0, parentNum:%d"
,
parentNum
);
qError
(
"taosArrayInit %d failed"
,
parentNum
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
pTask
->
parents
=
taosArrayInit
(
parentNum
,
POINTER_BYTES
);
if
(
NULL
==
pTask
->
parents
)
{
SCH_TASK_ELOG
(
"taosArrayInit %d parents failed"
,
parentNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
}
}
for
(
int32_t
n
=
0
;
n
<
parentNum
;
++
n
)
{
for
(
int32_t
n
=
0
;
n
<
parentNum
;
++
n
)
{
SSubplan
**
parent
=
taosArrayGet
(
plan
->
pParents
,
n
);
SSubplan
**
parent
=
taosArrayGet
(
p
P
lan
->
pParents
,
n
);
SSchTask
**
parentTask
=
taosHashGet
(
planToTask
,
parent
,
POINTER_BYTES
);
SSchTask
**
parentTask
=
taosHashGet
(
planToTask
,
parent
,
POINTER_BYTES
);
if
(
NULL
==
parentTask
||
NULL
==
*
parentTask
)
{
if
(
NULL
==
parentTask
||
NULL
==
*
parentTask
)
{
qError
(
"subplan
relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_TASK_ELOG
(
"subplan parent
relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
}
if
(
NULL
==
taosArrayPush
(
t
ask
->
parents
,
parentTask
))
{
if
(
NULL
==
taosArrayPush
(
pT
ask
->
parents
,
parentTask
))
{
qError
(
"taosArrayPush failed"
);
SCH_TASK_ELOG
(
"taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
}
}
}
}
SSchLevel
*
level
=
taosArrayGet
(
job
->
levels
,
0
);
SCH_TASK_DLOG
(
"level:%d, parentNum:%d, childNum:%d"
,
i
,
parentNum
,
childNum
);
if
(
job
->
attr
.
queryJob
&&
level
->
taskNum
>
1
)
{
}
qError
(
"invalid plan info, level 0, taskNum:%d"
,
level
->
taskNum
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
}
SSch
Task
*
task
=
taosArrayGet
(
level
->
subTask
s
,
0
);
SSch
Level
*
pLevel
=
taosArrayGet
(
pJob
->
level
s
,
0
);
if
(
task
->
parents
&&
taosArrayGetSize
(
task
->
parents
)
>
0
)
{
if
(
pJob
->
attr
.
queryJob
&&
pLevel
->
taskNum
>
1
)
{
qError
(
"invalid plan info, level 0, parentNum:%d"
,
(
int32_t
)
taosArrayGetSize
(
task
->
parents
)
);
SCH_JOB_ELOG
(
"invalid query plan, level:0, taskNum:%d"
,
pLevel
->
taskNum
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
SSchTask
initTask
(
SSchJob
*
pJob
,
SSubplan
*
plan
,
SSchLevel
*
pLevel
)
{
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
)
{
SSchTask
task
=
{
0
};
pTask
->
plan
=
pPlan
;
if
(
plan
->
type
==
QUERY_TYPE_MODIFY
)
{
pTask
->
level
=
pLevel
;
pJob
->
attr
.
needFetch
=
false
;
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_NOT_START
);
}
else
{
pTask
->
taskId
=
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
pJob
->
attr
.
queryJob
=
true
;
}
task
.
plan
=
plan
;
task
.
level
=
pLevel
;
task
.
status
=
JOB_TASK_STATUS_NOT_START
;
task
.
taskId
=
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
return
task
;
return
TSDB_CODE_SUCCESS
;
}
}
static
void
cleanup
Task
(
SSchTask
*
pTask
)
{
void
schFree
Task
(
SSchTask
*
pTask
)
{
taosArrayDestroy
(
pTask
->
candidateAddrs
);
taosArrayDestroy
(
pTask
->
candidateAddrs
);
}
}
int32_t
schValidateAndBuildJob
(
SQueryDag
*
d
ag
,
SSchJob
*
pJob
)
{
int32_t
schValidateAndBuildJob
(
SQueryDag
*
pD
ag
,
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
pJob
->
queryId
=
d
ag
->
queryId
;
pJob
->
queryId
=
pD
ag
->
queryId
;
if
(
d
ag
->
numOfSubplans
<=
0
)
{
if
(
pD
ag
->
numOfSubplans
<=
0
)
{
qError
(
"invalid subplan num:%d"
,
d
ag
->
numOfSubplans
);
SCH_JOB_ELOG
(
"invalid subplan num:%d"
,
pD
ag
->
numOfSubplans
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
int32_t
levelNum
=
(
int32_t
)
taosArrayGetSize
(
d
ag
->
pSubplans
);
int32_t
levelNum
=
(
int32_t
)
taosArrayGetSize
(
pD
ag
->
pSubplans
);
if
(
levelNum
<=
0
)
{
if
(
levelNum
<=
0
)
{
qError
(
"invalid level num:%d"
,
levelNum
);
SCH_JOB_ELOG
(
"invalid level num:%d"
,
levelNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
SHashObj
*
planToTask
=
taosHashInit
(
SCHEDULE_DEFAULT_TASK_NUMBER
,
taosGetDefaultHashFunction
(
POINTER_BYTES
==
sizeof
(
int64_t
)
?
TSDB_DATA_TYPE_BIGINT
:
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
SHashObj
*
planToTask
=
taosHashInit
(
SCHEDULE_DEFAULT_TASK_NUMBER
,
taosGetDefaultHashFunction
(
POINTER_BYTES
==
sizeof
(
int64_t
)
?
TSDB_DATA_TYPE_BIGINT
:
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
planToTask
)
{
if
(
NULL
==
planToTask
)
{
qError
(
"taosHashInit %d failed"
,
SCHEDULE_DEFAULT_TASK_NUMBER
);
SCH_JOB_ELOG
(
"taosHashInit %d failed"
,
SCHEDULE_DEFAULT_TASK_NUMBER
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
pJob
->
levels
=
taosArrayInit
(
levelNum
,
sizeof
(
SSchLevel
));
pJob
->
levels
=
taosArrayInit
(
levelNum
,
sizeof
(
SSchLevel
));
if
(
NULL
==
pJob
->
levels
)
{
if
(
NULL
==
pJob
->
levels
)
{
qError
(
"taosArrayInit %d failed"
,
levelNum
);
SCH_JOB_ELOG
(
"taosArrayInit %d failed"
,
levelNum
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
//??
pJob
->
attr
.
needFetch
=
true
;
pJob
->
levelNum
=
levelNum
;
pJob
->
levelNum
=
levelNum
;
pJob
->
levelIdx
=
levelNum
-
1
;
pJob
->
levelIdx
=
levelNum
-
1
;
pJob
->
subPlans
=
d
ag
->
pSubplans
;
pJob
->
subPlans
=
pD
ag
->
pSubplans
;
SSchLevel
level
=
{
0
};
SSchLevel
level
=
{
0
};
SArray
*
levelP
lans
=
NULL
;
SArray
*
p
lans
=
NULL
;
int32_t
levelPlan
Num
=
0
;
int32_t
task
Num
=
0
;
SSchLevel
*
pLevel
=
NULL
;
SSchLevel
*
pLevel
=
NULL
;
level
.
status
=
JOB_TASK_STATUS_NOT_START
;
level
.
status
=
JOB_TASK_STATUS_NOT_START
;
for
(
int32_t
i
=
0
;
i
<
levelNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
levelNum
;
++
i
)
{
if
(
NULL
==
taosArrayPush
(
pJob
->
levels
,
&
level
))
{
if
(
NULL
==
taosArrayPush
(
pJob
->
levels
,
&
level
))
{
qError
(
"taosArrayPush failed"
);
SCH_JOB_ELOG
(
"taosArrayPush level failed, level:%d"
,
i
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
pLevel
=
taosArrayGet
(
pJob
->
levels
,
i
);
pLevel
=
taosArrayGet
(
pJob
->
levels
,
i
);
pLevel
->
level
=
i
;
pLevel
->
level
=
i
;
levelPlans
=
taosArrayGetP
(
dag
->
pSubplans
,
i
);
if
(
NULL
==
levelPlans
)
{
plans
=
taosArrayGetP
(
pDag
->
pSubplans
,
i
);
qError
(
"no level plans for level %d"
,
i
);
if
(
NULL
==
plans
)
{
SCH_JOB_ELOG
(
"empty level plan, level:%d"
,
i
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
levelPlanNum
=
(
int32_t
)
taosArrayGetSize
(
levelP
lans
);
taskNum
=
(
int32_t
)
taosArrayGetSize
(
p
lans
);
if
(
levelPlan
Num
<=
0
)
{
if
(
task
Num
<=
0
)
{
qError
(
"invalid level plans number:%d, level:%d"
,
levelPlan
Num
,
i
);
SCH_JOB_ELOG
(
"invalid level plan number:%d, level:%d"
,
task
Num
,
i
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
pLevel
->
taskNum
=
levelPlan
Num
;
pLevel
->
taskNum
=
task
Num
;
pLevel
->
subTasks
=
taosArrayInit
(
levelPlan
Num
,
sizeof
(
SSchTask
));
pLevel
->
subTasks
=
taosArrayInit
(
task
Num
,
sizeof
(
SSchTask
));
if
(
NULL
==
pLevel
->
subTasks
)
{
if
(
NULL
==
pLevel
->
subTasks
)
{
qError
(
"taosArrayInit %d failed"
,
levelPlan
Num
);
SCH_JOB_ELOG
(
"taosArrayInit %d failed"
,
task
Num
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
for
(
int32_t
n
=
0
;
n
<
levelPlanNum
;
++
n
)
{
for
(
int32_t
n
=
0
;
n
<
taskNum
;
++
n
)
{
SSubplan
*
plan
=
taosArrayGetP
(
levelPlans
,
n
);
SSubplan
*
plan
=
taosArrayGetP
(
plans
,
n
);
if
(
plan
->
type
==
QUERY_TYPE_MODIFY
)
{
pJob
->
attr
.
needFetch
=
false
;
SCH_SET_JOB_TYPE
(
&
pJob
->
attr
,
plan
->
type
);
}
else
{
pJob
->
attr
.
queryJob
=
true
;
SSchTask
task
=
{
0
};
}
SSchTask
*
pTask
=
&
task
;
schInitTask
(
pJob
,
&
task
,
plan
,
pLevel
);
SSchTask
task
=
initTask
(
pJob
,
plan
,
pLevel
);
void
*
p
=
taosArrayPush
(
pLevel
->
subTasks
,
&
task
);
void
*
p
=
taosArrayPush
(
pLevel
->
subTasks
,
&
task
);
if
(
NULL
==
p
)
{
if
(
NULL
==
p
)
{
qError
(
"taosArrayPush failed"
);
SCH_TASK_ELOG
(
"taosArrayPush task to level failed, level:%d, taskIdx:%d"
,
pLevel
->
level
,
n
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
if
(
0
!=
taosHashPut
(
planToTask
,
&
plan
,
POINTER_BYTES
,
&
p
,
POINTER_BYTES
))
{
if
(
0
!=
taosHashPut
(
planToTask
,
&
plan
,
POINTER_BYTES
,
&
p
,
POINTER_BYTES
))
{
qError
(
"taosHashPut failed"
);
SCH_TASK_ELOG
(
"taosHashPut to planToTaks failed, taskIdx:%d"
,
n
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
}
}
SCH_ERR_JRET
(
schBuildTaskRalation
(
pJob
,
planToTask
));
SCH_TASK_DLOG
(
"task initialized, level:%d"
,
pLevel
->
level
);
}
if
(
planToTask
)
{
SCH_JOB_DLOG
(
"level initialized, taskNum:%d"
,
taskNum
);
taosHashCleanup
(
planToTask
);
}
}
return
TSDB_CODE_SUCCESS
;
SCH_ERR_JRET
(
schBuildTaskRalation
(
pJob
,
planToTask
))
;
_return:
_return:
if
(
pLevel
->
subTasks
)
{
taosArrayDestroy
(
pLevel
->
subTasks
);
}
if
(
planToTask
)
{
if
(
planToTask
)
{
taosHashCleanup
(
planToTask
);
taosHashCleanup
(
planToTask
);
...
@@ -225,42 +218,49 @@ _return:
...
@@ -225,42 +218,49 @@ _return:
SCH_RET
(
code
);
SCH_RET
(
code
);
}
}
int32_t
schSetTaskCandidateAddrs
(
SSchJob
*
job
,
SSchTask
*
t
ask
)
{
int32_t
schSetTaskCandidateAddrs
(
SSchJob
*
pJob
,
SSchTask
*
pT
ask
)
{
if
(
t
ask
->
candidateAddrs
)
{
if
(
NULL
!=
pT
ask
->
candidateAddrs
)
{
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
t
ask
->
candidateIdx
=
0
;
pT
ask
->
candidateIdx
=
0
;
t
ask
->
candidateAddrs
=
taosArrayInit
(
SCH_MAX_CONDIDATE_EP_NUM
,
sizeof
(
SQueryNodeAddr
));
pT
ask
->
candidateAddrs
=
taosArrayInit
(
SCH_MAX_CONDIDATE_EP_NUM
,
sizeof
(
SQueryNodeAddr
));
if
(
NULL
==
t
ask
->
candidateAddrs
)
{
if
(
NULL
==
pT
ask
->
candidateAddrs
)
{
qError
(
"taosArrayInit failed"
);
SCH_TASK_ELOG
(
"taosArrayInit %d condidate addrs failed"
,
SCH_MAX_CONDIDATE_EP_NUM
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
if
(
t
ask
->
plan
->
execNode
.
numOfEps
>
0
)
{
if
(
pT
ask
->
plan
->
execNode
.
numOfEps
>
0
)
{
if
(
NULL
==
taosArrayPush
(
task
->
candidateAddrs
,
&
t
ask
->
plan
->
execNode
))
{
if
(
NULL
==
taosArrayPush
(
pTask
->
candidateAddrs
,
&
pT
ask
->
plan
->
execNode
))
{
qError
(
"taosArrayPush failed"
);
SCH_TASK_ELOG
(
"taosArrayPush execNode to candidate addrs failed, errno:%d"
,
errno
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
SCH_TASK_DLOG
(
"use execNode from plan as candidate addr, numOfEps:%d"
,
pTask
->
plan
->
execNode
.
numOfEps
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
addNum
=
0
;
int32_t
addNum
=
0
;
int32_t
nodeNum
=
0
;
if
(
pJob
->
nodeList
)
{
nodeNum
=
taosArrayGetSize
(
pJob
->
nodeList
);
if
(
job
->
nodeList
)
{
int32_t
nodeNum
=
(
int32_t
)
taosArrayGetSize
(
job
->
nodeList
);
for
(
int32_t
i
=
0
;
i
<
nodeNum
&&
addNum
<
SCH_MAX_CONDIDATE_EP_NUM
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
nodeNum
&&
addNum
<
SCH_MAX_CONDIDATE_EP_NUM
;
++
i
)
{
SQueryNodeAddr
*
naddr
=
taosArrayGet
(
j
ob
->
nodeList
,
i
);
SQueryNodeAddr
*
naddr
=
taosArrayGet
(
pJ
ob
->
nodeList
,
i
);
if
(
NULL
==
taosArrayPush
(
task
->
candidateAddrs
,
&
t
ask
->
plan
->
execNode
))
{
if
(
NULL
==
taosArrayPush
(
pTask
->
candidateAddrs
,
&
pT
ask
->
plan
->
execNode
))
{
qError
(
"taosArrayPush failed"
);
SCH_TASK_ELOG
(
"taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d"
,
addNum
,
errno
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
++
addNum
;
}
}
}
}
if
(
addNum
<=
0
)
{
SCH_TASK_ELOG
(
"no available execNode as condidate addr, nodeNum:%d"
,
nodeNum
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
/*
/*
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
...
@@ -274,12 +274,19 @@ int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) {
...
@@ -274,12 +274,19 @@ int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) {
}
}
int32_t
schPushTaskToExecList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
int32_t
schPushTaskToExecList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
if
(
0
!=
taosHashPut
(
pJob
->
execTasks
,
&
pTask
->
taskId
,
sizeof
(
pTask
->
taskId
),
&
pTask
,
POINTER_BYTES
))
{
int32_t
code
=
taosHashPut
(
pJob
->
execTasks
,
&
pTask
->
taskId
,
sizeof
(
pTask
->
taskId
),
&
pTask
,
POINTER_BYTES
);
qError
(
"failed to add new task, taskId:0x%"
PRIx64
", reqId:0x"
PRIx64
", out of memory"
,
pJob
->
queryId
);
if
(
0
!=
code
)
{
if
(
HASH_NODE_EXIST
(
code
))
{
SCH_TASK_ELOG
(
"task already in exec list, code:%x"
,
code
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
SCH_TASK_ELOG
(
"taosHashPut task to exec list failed, errno:%d"
,
errno
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
qDebug
(
"add one task, taskId:0x%"
PRIx64
", numOfTasks:%d, reqId:0x%"
PRIx64
,
pTask
->
taskId
,
taosHashGetSize
(
pJob
->
execTasks
),
pJob
->
queryId
);
SCH_TASK_DLOG
(
"task added to exec list, numOfTasks:%d"
,
taosHashGetSize
(
pJob
->
execTasks
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -347,7 +354,7 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
...
@@ -347,7 +354,7 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
job
->
status
=
JOB_TASK_STATUS_PARTIAL_SUCCEED
;
job
->
status
=
JOB_TASK_STATUS_PARTIAL_SUCCEED
;
bool
needFetch
=
job
->
userFetch
;
bool
needFetch
=
job
->
userFetch
;
if
((
!
job
->
attr
.
needFetch
)
&&
job
->
attr
.
syncSchedule
)
{
if
((
!
SCH_JOB_NEED_FETCH
(
&
job
->
attr
)
)
&&
job
->
attr
.
syncSchedule
)
{
tsem_post
(
&
job
->
rspSem
);
tsem_post
(
&
job
->
rspSem
);
}
}
...
@@ -364,7 +371,7 @@ int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) {
...
@@ -364,7 +371,7 @@ int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) {
atomic_val_compare_exchange_32
(
&
job
->
remoteFetch
,
1
,
0
);
atomic_val_compare_exchange_32
(
&
job
->
remoteFetch
,
1
,
0
);
if
(
job
->
userFetch
||
((
!
job
->
attr
.
needFetch
)
&&
job
->
attr
.
syncSchedule
))
{
if
(
job
->
userFetch
||
((
!
SCH_JOB_NEED_FETCH
(
&
job
->
attr
)
)
&&
job
->
attr
.
syncSchedule
))
{
tsem_post
(
&
job
->
rspSem
);
tsem_post
(
&
job
->
rspSem
);
}
}
...
@@ -378,50 +385,51 @@ int32_t schProcessOnDataFetched(SSchJob *job) {
...
@@ -378,50 +385,51 @@ int32_t schProcessOnDataFetched(SSchJob *job) {
}
}
int32_t
schProcessOnTaskSuccess
(
SSchJob
*
job
,
SSchTask
*
t
ask
)
{
int32_t
schProcessOnTaskSuccess
(
SSchJob
*
pJob
,
SSchTask
*
pT
ask
)
{
bool
moved
=
false
;
bool
moved
=
false
;
SCH_ERR_RET
(
schMoveTaskToSuccList
(
job
,
t
ask
,
&
moved
));
SCH_ERR_RET
(
schMoveTaskToSuccList
(
pJob
,
pT
ask
,
&
moved
));
if
(
!
moved
)
{
if
(
!
moved
)
{
SCH_TASK_E
RR_LOG
(
" task may already moved, status:%d"
,
t
ask
->
status
);
SCH_TASK_E
LOG
(
" task may already moved, status:%d"
,
pT
ask
->
status
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
task
->
status
=
JOB_TASK_STATUS_SUCCEED
;
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_SUCCEED
)
;
int32_t
parentNum
=
task
->
parents
?
(
int32_t
)
taosArrayGetSize
(
t
ask
->
parents
)
:
0
;
int32_t
parentNum
=
pTask
->
parents
?
(
int32_t
)
taosArrayGetSize
(
pT
ask
->
parents
)
:
0
;
if
(
parentNum
==
0
)
{
if
(
parentNum
==
0
)
{
if
(
task
->
plan
->
level
!=
0
)
{
if
(
pTask
->
level
->
level
!=
0
)
{
qError
(
"level error"
);
SCH_TASK_ELOG
(
"no parent task level error, level:%d"
,
pTask
->
level
->
level
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
int32_t
taskDone
=
0
;
int32_t
taskDone
=
0
;
if
(
SCH_TASK_NEED_WAIT_ALL
(
t
ask
))
{
if
(
SCH_TASK_NEED_WAIT_ALL
(
pT
ask
))
{
SCH_LOCK
(
SCH_WRITE
,
&
t
ask
->
level
->
lock
);
SCH_LOCK
(
SCH_WRITE
,
&
pT
ask
->
level
->
lock
);
t
ask
->
level
->
taskSucceed
++
;
pT
ask
->
level
->
taskSucceed
++
;
taskDone
=
task
->
level
->
taskSucceed
+
t
ask
->
level
->
taskFailed
;
taskDone
=
pTask
->
level
->
taskSucceed
+
pT
ask
->
level
->
taskFailed
;
SCH_UNLOCK
(
SCH_WRITE
,
&
t
ask
->
level
->
lock
);
SCH_UNLOCK
(
SCH_WRITE
,
&
pT
ask
->
level
->
lock
);
if
(
taskDone
<
t
ask
->
level
->
taskNum
)
{
if
(
taskDone
<
pT
ask
->
level
->
taskNum
)
{
qDebug
(
"wait all tasks, done:%d, all:%d"
,
taskDone
,
t
ask
->
level
->
taskNum
);
SCH_TASK_ELOG
(
"wait all tasks, done:%d, all:%d"
,
taskDone
,
pT
ask
->
level
->
taskNum
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
taskDone
>
pTask
->
level
->
taskNum
)
{
assert
(
0
);
}
}
if
(
t
ask
->
level
->
taskFailed
>
0
)
{
if
(
pT
ask
->
level
->
taskFailed
>
0
)
{
j
ob
->
status
=
JOB_TASK_STATUS_FAILED
;
pJ
ob
->
status
=
JOB_TASK_STATUS_FAILED
;
SCH_ERR_RET
(
schProcessOnJobFailure
(
j
ob
,
TSDB_CODE_QRY_APP_ERROR
));
SCH_ERR_RET
(
schProcessOnJobFailure
(
pJ
ob
,
TSDB_CODE_QRY_APP_ERROR
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
}
else
{
}
else
{
strncpy
(
job
->
resEp
.
fqdn
,
task
->
execAddr
.
epAddr
[
task
->
execAddr
.
inUse
].
fqdn
,
sizeof
(
job
->
resEp
.
fqdn
));
pJob
->
resNode
=
pTask
->
execAddr
;
job
->
resEp
.
port
=
task
->
execAddr
.
epAddr
[
task
->
execAddr
.
inUse
].
port
;
}
}
job
->
fetchTask
=
t
ask
;
pJob
->
fetchTask
=
pT
ask
;
SCH_ERR_RET
(
schProcessOnJobPartialSuccess
(
j
ob
));
SCH_ERR_RET
(
schProcessOnJobPartialSuccess
(
pJ
ob
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -436,53 +444,56 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
...
@@ -436,53 +444,56 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
*/
*/
for
(
int32_t
i
=
0
;
i
<
parentNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
parentNum
;
++
i
)
{
SSchTask
*
par
=
*
(
SSchTask
**
)
taosArrayGet
(
t
ask
->
parents
,
i
);
SSchTask
*
par
=
*
(
SSchTask
**
)
taosArrayGet
(
pT
ask
->
parents
,
i
);
++
par
->
childReady
;
atomic_add_fetch_32
(
&
par
->
childReady
,
1
)
;
SCH_ERR_RET
(
qSetSubplanExecutionNode
(
par
->
plan
,
task
->
plan
->
id
.
templateId
,
&
t
ask
->
execAddr
));
SCH_ERR_RET
(
qSetSubplanExecutionNode
(
par
->
plan
,
pTask
->
plan
->
id
.
templateId
,
&
pT
ask
->
execAddr
));
if
(
SCH_TASK_READY_TO_LUNCH
(
par
))
{
if
(
SCH_TASK_READY_TO_LUNCH
(
par
))
{
SCH_ERR_RET
(
schLaunchTask
(
j
ob
,
par
));
SCH_ERR_RET
(
schLaunchTask
(
pJ
ob
,
par
));
}
}
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
schProcessOnTaskFailure
(
SSchJob
*
job
,
SSchTask
*
t
ask
,
int32_t
errCode
)
{
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pT
ask
,
int32_t
errCode
)
{
bool
needRetry
=
false
;
bool
needRetry
=
false
;
bool
moved
=
false
;
bool
moved
=
false
;
int32_t
taskDone
=
0
;
int32_t
taskDone
=
0
;
SCH_ERR_RET
(
schTaskCheckAndSetRetry
(
job
,
t
ask
,
errCode
,
&
needRetry
));
SCH_ERR_RET
(
schTaskCheckAndSetRetry
(
pJob
,
pT
ask
,
errCode
,
&
needRetry
));
if
(
!
needRetry
)
{
if
(
!
needRetry
)
{
SCH_TASK_E
RR_
LOG
(
"task failed[%x], no more retry"
,
errCode
);
SCH_TASK_ELOG
(
"task failed[%x], no more retry"
,
errCode
);
SCH_ERR_RET
(
schMoveTaskToFailList
(
job
,
task
,
&
moved
));
if
(
SCH_GET_TASK_STATUS
(
pTask
)
==
JOB_TASK_STATUS_EXECUTING
)
{
SCH_ERR_RET
(
schMoveTaskToFailList
(
pJob
,
pTask
,
&
moved
));
if
(
!
moved
)
{
if
(
!
moved
)
{
SCH_TASK_ERR_LOG
(
"task may already moved, status:%d"
,
task
->
status
);
SCH_TASK_ELOG
(
"task may already moved, status:%d"
,
pTask
->
status
);
}
}
}
if
(
SCH_TASK_NEED_WAIT_ALL
(
task
))
{
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_FAILED
);
SCH_LOCK
(
SCH_WRITE
,
&
task
->
level
->
lock
);
task
->
level
->
taskFailed
++
;
taskDone
=
task
->
level
->
taskSucceed
+
task
->
level
->
taskFailed
;
SCH_UNLOCK
(
SCH_WRITE
,
&
task
->
level
->
lock
);
if
(
taskDone
<
task
->
level
->
taskNum
)
{
if
(
SCH_TASK_NEED_WAIT_ALL
(
pTask
))
{
qDebug
(
"wait all tasks, done:%d, all:%d"
,
taskDone
,
task
->
level
->
taskNum
);
SCH_LOCK
(
SCH_WRITE
,
&
pTask
->
level
->
lock
);
pTask
->
level
->
taskFailed
++
;
taskDone
=
pTask
->
level
->
taskSucceed
+
pTask
->
level
->
taskFailed
;
SCH_UNLOCK
(
SCH_WRITE
,
&
pTask
->
level
->
lock
);
if
(
taskDone
<
pTask
->
level
->
taskNum
)
{
qDebug
(
"wait all tasks, done:%d, all:%d"
,
taskDone
,
pTask
->
level
->
taskNum
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
}
}
job
->
status
=
JOB_TASK_STATUS_FAILED
;
SCH_ERR_RET
(
schProcessOnJobFailure
(
pJob
,
errCode
));
SCH_ERR_RET
(
schProcessOnJobFailure
(
job
,
errCode
));
return
TSDB_CODE_SUCCESS
;
return
errCode
;
}
}
SCH_ERR_RET
(
schLaunchTask
(
job
,
t
ask
));
SCH_ERR_RET
(
schLaunchTask
(
pJob
,
pT
ask
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -705,7 +716,7 @@ void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) {
...
@@ -705,7 +716,7 @@ void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) {
}
}
int32_t
schBuildAndSendMsg
(
SSchJob
*
job
,
SSchTask
*
t
ask
,
int32_t
msgType
)
{
int32_t
schBuildAndSendMsg
(
SSchJob
*
pJob
,
SSchTask
*
pT
ask
,
int32_t
msgType
)
{
uint32_t
msgSize
=
0
;
uint32_t
msgSize
=
0
;
void
*
msg
=
NULL
;
void
*
msg
=
NULL
;
int32_t
code
=
0
;
int32_t
code
=
0
;
...
@@ -713,22 +724,22 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
...
@@ -713,22 +724,22 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
switch
(
msgType
)
{
switch
(
msgType
)
{
case
TDMT_VND_CREATE_TABLE
:
case
TDMT_VND_CREATE_TABLE
:
case
TDMT_VND_SUBMIT
:
{
case
TDMT_VND_SUBMIT
:
{
if
(
NULL
==
task
->
msg
||
t
ask
->
msgLen
<=
0
)
{
if
(
NULL
==
pTask
->
msg
||
pT
ask
->
msgLen
<=
0
)
{
qError
(
"submit msg is NULL"
);
qError
(
"submit msg is NULL"
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
}
msgSize
=
t
ask
->
msgLen
;
msgSize
=
pT
ask
->
msgLen
;
msg
=
t
ask
->
msg
;
msg
=
pT
ask
->
msg
;
break
;
break
;
}
}
case
TDMT_VND_QUERY
:
{
case
TDMT_VND_QUERY
:
{
if
(
NULL
==
t
ask
->
msg
)
{
if
(
NULL
==
pT
ask
->
msg
)
{
qError
(
"query msg is NULL"
);
qError
(
"query msg is NULL"
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
}
msgSize
=
sizeof
(
SSubQueryMsg
)
+
t
ask
->
msgLen
;
msgSize
=
sizeof
(
SSubQueryMsg
)
+
pT
ask
->
msgLen
;
msg
=
calloc
(
1
,
msgSize
);
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
if
(
NULL
==
msg
)
{
qError
(
"calloc %d failed"
,
msgSize
);
qError
(
"calloc %d failed"
,
msgSize
);
...
@@ -737,12 +748,12 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
...
@@ -737,12 +748,12 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
SSubQueryMsg
*
pMsg
=
msg
;
SSubQueryMsg
*
pMsg
=
msg
;
pMsg
->
header
.
vgId
=
htonl
(
t
ask
->
plan
->
execNode
.
nodeId
);
pMsg
->
header
.
vgId
=
htonl
(
pT
ask
->
plan
->
execNode
.
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
j
ob
->
queryId
);
pMsg
->
queryId
=
htobe64
(
pJ
ob
->
queryId
);
pMsg
->
taskId
=
htobe64
(
t
ask
->
taskId
);
pMsg
->
taskId
=
htobe64
(
pT
ask
->
taskId
);
pMsg
->
contentLen
=
htonl
(
t
ask
->
msgLen
);
pMsg
->
contentLen
=
htonl
(
pT
ask
->
msgLen
);
memcpy
(
pMsg
->
msg
,
task
->
msg
,
t
ask
->
msgLen
);
memcpy
(
pMsg
->
msg
,
pTask
->
msg
,
pT
ask
->
msgLen
);
break
;
break
;
}
}
case
TDMT_VND_RES_READY
:
{
case
TDMT_VND_RES_READY
:
{
...
@@ -755,14 +766,14 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
...
@@ -755,14 +766,14 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
SResReadyMsg
*
pMsg
=
msg
;
SResReadyMsg
*
pMsg
=
msg
;
pMsg
->
header
.
vgId
=
htonl
(
t
ask
->
plan
->
execNode
.
nodeId
);
pMsg
->
header
.
vgId
=
htonl
(
pT
ask
->
plan
->
execNode
.
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
j
ob
->
queryId
);
pMsg
->
queryId
=
htobe64
(
pJ
ob
->
queryId
);
pMsg
->
taskId
=
htobe64
(
t
ask
->
taskId
);
pMsg
->
taskId
=
htobe64
(
pT
ask
->
taskId
);
break
;
break
;
}
}
case
TDMT_VND_FETCH
:
{
case
TDMT_VND_FETCH
:
{
if
(
NULL
==
t
ask
)
{
if
(
NULL
==
pT
ask
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
}
msgSize
=
sizeof
(
SResFetchMsg
);
msgSize
=
sizeof
(
SResFetchMsg
);
...
@@ -774,10 +785,10 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
...
@@ -774,10 +785,10 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
SResFetchMsg
*
pMsg
=
msg
;
SResFetchMsg
*
pMsg
=
msg
;
pMsg
->
header
.
vgId
=
htonl
(
t
ask
->
plan
->
execNode
.
nodeId
);
pMsg
->
header
.
vgId
=
htonl
(
pT
ask
->
plan
->
execNode
.
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
j
ob
->
queryId
);
pMsg
->
queryId
=
htobe64
(
pJ
ob
->
queryId
);
pMsg
->
taskId
=
htobe64
(
t
ask
->
taskId
);
pMsg
->
taskId
=
htobe64
(
pT
ask
->
taskId
);
break
;
break
;
}
}
case
TDMT_VND_DROP_TASK
:{
case
TDMT_VND_DROP_TASK
:{
...
@@ -790,10 +801,10 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
...
@@ -790,10 +801,10 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
STaskDropMsg
*
pMsg
=
msg
;
STaskDropMsg
*
pMsg
=
msg
;
pMsg
->
header
.
vgId
=
htonl
(
t
ask
->
plan
->
execNode
.
nodeId
);
pMsg
->
header
.
vgId
=
htonl
(
pT
ask
->
plan
->
execNode
.
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
j
ob
->
queryId
);
pMsg
->
queryId
=
htobe64
(
pJ
ob
->
queryId
);
pMsg
->
taskId
=
htobe64
(
t
ask
->
taskId
);
pMsg
->
taskId
=
htobe64
(
pT
ask
->
taskId
);
break
;
break
;
}
}
default:
default:
...
@@ -803,11 +814,11 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
...
@@ -803,11 +814,11 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
}
}
SEpSet
epSet
;
SEpSet
epSet
;
SQueryNodeAddr
*
addr
=
taosArrayGet
(
task
->
candidateAddrs
,
t
ask
->
candidateIdx
);
SQueryNodeAddr
*
addr
=
taosArrayGet
(
pTask
->
candidateAddrs
,
pT
ask
->
candidateIdx
);
schConvertAddrToEpSet
(
addr
,
&
epSet
);
schConvertAddrToEpSet
(
addr
,
&
epSet
);
SCH_ERR_JRET
(
schAsyncSendMsg
(
job
->
transport
,
&
epSet
,
job
->
queryId
,
t
ask
->
taskId
,
msgType
,
msg
,
msgSize
));
SCH_ERR_JRET
(
schAsyncSendMsg
(
pJob
->
transport
,
&
epSet
,
pJob
->
queryId
,
pT
ask
->
taskId
,
msgType
,
msg
,
msgSize
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -817,33 +828,62 @@ _return:
...
@@ -817,33 +828,62 @@ _return:
SCH_RET
(
code
);
SCH_RET
(
code
);
}
}
static
FORCE_INLINE
bool
schJobNeedToStop
(
SSchJob
*
pJob
,
int8_t
*
pStatus
)
{
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
pStatus
)
{
*
pStatus
=
status
;
}
int32_t
schLaunchTask
(
SSchJob
*
job
,
SSchTask
*
task
)
{
return
(
status
==
JOB_TASK_STATUS_FAILED
||
status
==
JOB_TASK_STATUS_CANCELLED
SSubplan
*
plan
=
task
->
plan
;
||
status
==
JOB_TASK_STATUS_CANCELLING
||
status
==
JOB_TASK_STATUS_DROPPING
);
SCH_ERR_RET
(
qSubPlanToString
(
plan
,
&
task
->
msg
,
&
task
->
msgLen
));
}
SCH_ERR_RET
(
schSetTaskCandidateAddrs
(
job
,
task
));
if
(
NULL
==
task
->
candidateAddrs
||
taosArrayGetSize
(
task
->
candidateAddrs
)
<=
0
)
{
int32_t
schLaunchTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
SCH_TASK_ERR_LOG
(
"no valid candidate node for task:%"
PRIx64
,
task
->
taskId
);
int8_t
status
=
0
;
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
int32_t
code
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_TASK_ELOG
(
"no need to launch task cause of job status, job status:%d"
,
status
);
SCH_ERR_RET
(
pJob
->
errCode
);
}
SSubplan
*
plan
=
pTask
->
plan
;
if
(
NULL
==
pTask
->
msg
)
{
code
=
qSubPlanToString
(
plan
,
&
pTask
->
msg
,
&
pTask
->
msgLen
);
if
(
TSDB_CODE_SUCCESS
!=
code
||
NULL
==
pTask
->
msg
||
pTask
->
msgLen
<=
0
)
{
SCH_TASK_ELOG
(
"subplanToString error, code:%x, msg:%p, len:%d"
,
code
,
pTask
->
msg
,
pTask
->
msgLen
);
SCH_ERR_JRET
(
code
);
}
}
}
SCH_ERR_JRET
(
schSetTaskCandidateAddrs
(
pJob
,
pTask
));
// NOTE: race condition: the task should be put into the hash table before send msg to server
// NOTE: race condition: the task should be put into the hash table before send msg to server
SCH_ERR_RET
(
schPushTaskToExecList
(
job
,
task
));
SCH_ERR_JRET
(
schPushTaskToExecList
(
pJob
,
pTask
));
SCH_ERR_RET
(
schBuildAndSendMsg
(
job
,
task
,
plan
->
msgType
));
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_EXECUTING
);
SCH_ERR_JRET
(
schBuildAndSendMsg
(
pJob
,
pTask
,
plan
->
msgType
));
task
->
status
=
JOB_TASK_STATUS_EXECUTING
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
_return:
code
=
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
);
SCH_RET
(
code
);
}
}
int32_t
schLaunchJob
(
SSchJob
*
job
)
{
int32_t
schLaunchJob
(
SSchJob
*
pJob
)
{
SSchLevel
*
level
=
taosArrayGet
(
job
->
levels
,
job
->
levelIdx
);
SSchLevel
*
level
=
taosArrayGet
(
pJob
->
levels
,
pJob
->
levelIdx
);
for
(
int32_t
i
=
0
;
i
<
level
->
taskNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
level
->
taskNum
;
++
i
)
{
SSchTask
*
t
ask
=
taosArrayGet
(
level
->
subTasks
,
i
);
SSchTask
*
pT
ask
=
taosArrayGet
(
level
->
subTasks
,
i
);
SCH_ERR_RET
(
schLaunchTask
(
job
,
t
ask
));
SCH_ERR_RET
(
schLaunchTask
(
pJob
,
pT
ask
));
}
}
j
ob
->
status
=
JOB_TASK_STATUS_EXECUTING
;
pJ
ob
->
status
=
JOB_TASK_STATUS_EXECUTING
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -876,116 +916,118 @@ void schDropJobAllTasks(SSchJob *job) {
...
@@ -876,116 +916,118 @@ void schDropJobAllTasks(SSchJob *job) {
}
}
}
}
uint64_t
schGenSchId
(
void
)
{
int32_t
schExecJobImpl
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
void
**
job
,
bool
syncSchedule
)
{
uint64_t
sId
=
0
;
// TODO
qDebug
(
"Gen sId:0x%"
PRIx64
,
sId
);
return
sId
;
}
int32_t
schedulerInit
(
SSchedulerCfg
*
cfg
)
{
if
(
schMgmt
.
jobs
)
{
qError
(
"scheduler already init"
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
if
(
cfg
)
{
schMgmt
.
cfg
=
*
cfg
;
if
(
schMgmt
.
cfg
.
maxJobNum
<=
0
)
{
schMgmt
.
cfg
.
maxJobNum
=
SCHEDULE_DEFAULT_JOB_NUMBER
;
}
}
else
{
schMgmt
.
cfg
.
maxJobNum
=
SCHEDULE_DEFAULT_JOB_NUMBER
;
}
schMgmt
.
jobs
=
taosHashInit
(
schMgmt
.
cfg
.
maxJobNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
schMgmt
.
jobs
)
{
SCH_ERR_LRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
,
"init %d schduler jobs failed"
,
schMgmt
.
cfg
.
maxJobNum
);
}
schMgmt
.
sId
=
schGenSchId
();
return
TSDB_CODE_SUCCESS
;
}
int32_t
scheduleExecJobImpl
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
void
**
pJob
,
bool
syncSchedule
)
{
if
(
nodeList
&&
taosArrayGetSize
(
nodeList
)
<=
0
)
{
if
(
nodeList
&&
taosArrayGetSize
(
nodeList
)
<=
0
)
{
qInfo
(
"
qnodeList is empty"
);
qInfo
(
"
QID:%"
PRIx64
" input nodeList is empty"
,
pDag
->
queryId
);
}
}
int32_t
code
=
0
;
int32_t
code
=
0
;
SSchJob
*
job
=
calloc
(
1
,
sizeof
(
SSchJob
));
SSchJob
*
pJob
=
calloc
(
1
,
sizeof
(
SSchJob
));
if
(
NULL
==
job
)
{
if
(
NULL
==
pJob
)
{
qError
(
"QID:%"
PRIx64
" calloc %d failed"
,
pDag
->
queryId
,
(
int32_t
)
sizeof
(
SSchJob
));
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
j
ob
->
attr
.
syncSchedule
=
syncSchedule
;
pJ
ob
->
attr
.
syncSchedule
=
syncSchedule
;
j
ob
->
transport
=
transport
;
pJ
ob
->
transport
=
transport
;
j
ob
->
nodeList
=
nodeList
;
pJ
ob
->
nodeList
=
nodeList
;
SCH_ERR_JRET
(
schValidateAndBuildJob
(
pDag
,
j
ob
));
SCH_ERR_JRET
(
schValidateAndBuildJob
(
pDag
,
pJ
ob
));
j
ob
->
execTasks
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
pJ
ob
->
execTasks
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
j
ob
->
execTasks
)
{
if
(
NULL
==
pJ
ob
->
execTasks
)
{
qError
(
"taosHashInit %d
failed"
,
pDag
->
numOfSubplans
);
SCH_JOB_ELOG
(
"taosHashInit %d execTasks
failed"
,
pDag
->
numOfSubplans
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
j
ob
->
succTasks
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
pJ
ob
->
succTasks
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
j
ob
->
succTasks
)
{
if
(
NULL
==
pJ
ob
->
succTasks
)
{
qError
(
"taosHashInit %d
failed"
,
pDag
->
numOfSubplans
);
SCH_JOB_ELOG
(
"taosHashInit %d succTasks
failed"
,
pDag
->
numOfSubplans
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
j
ob
->
failTasks
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
pJ
ob
->
failTasks
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
j
ob
->
failTasks
)
{
if
(
NULL
==
pJ
ob
->
failTasks
)
{
qError
(
"taosHashInit %d
failed"
,
pDag
->
numOfSubplans
);
SCH_JOB_ELOG
(
"taosHashInit %d failTasks
failed"
,
pDag
->
numOfSubplans
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
tsem_init
(
&
j
ob
->
rspSem
,
0
,
0
);
tsem_init
(
&
pJ
ob
->
rspSem
,
0
,
0
);
code
=
taosHashPut
(
schMgmt
.
jobs
,
&
job
->
queryId
,
sizeof
(
job
->
queryId
),
&
j
ob
,
POINTER_BYTES
);
code
=
taosHashPut
(
schMgmt
.
jobs
,
&
pJob
->
queryId
,
sizeof
(
pJob
->
queryId
),
&
pJ
ob
,
POINTER_BYTES
);
if
(
0
!=
code
)
{
if
(
0
!=
code
)
{
if
(
HASH_NODE_EXIST
(
code
))
{
if
(
HASH_NODE_EXIST
(
code
))
{
qError
(
"taosHashPut queryId:0x%"
PRIx64
" already exist"
,
job
->
queryId
);
SCH_JOB_ELOG
(
"job already exist, isQueryJob:%d"
,
pJob
->
attr
.
queryJob
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
else
{
}
else
{
qError
(
"taosHashPut queryId:0x%"
PRIx64
" failed"
,
job
->
queryId
);
SCH_JOB_ELOG
(
"taosHashPut job failed, errno:%d"
,
errno
);
SCH_ERR_JRET
(
TSDB_CODE_
SCH_INTERNAL_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_
QRY_OUT_OF_MEMORY
);
}
}
}
}
job
->
status
=
JOB_TASK_STATUS_NOT_START
;
pJob
->
status
=
JOB_TASK_STATUS_NOT_START
;
SCH_ERR_JRET
(
schLaunchJob
(
job
));
SCH_ERR_JRET
(
schLaunchJob
(
pJob
));
*
(
SSchJob
**
)
pJob
=
j
ob
;
*
(
SSchJob
**
)
job
=
pJ
ob
;
if
(
syncSchedule
)
{
if
(
syncSchedule
)
{
tsem_wait
(
&
job
->
rspSem
);
SCH_JOB_DLOG
(
"will wait for rsp now, job status:%d"
,
SCH_GET_JOB_STATUS
(
pJob
));
tsem_wait
(
&
pJob
->
rspSem
);
}
}
SCH_JOB_DLOG
(
"job exec done, job status:%d"
,
SCH_GET_JOB_STATUS
(
pJob
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
_return:
_return:
*
(
SSchJob
**
)
pJob
=
NULL
;
scheduleFreeJob
(
job
);
*
(
SSchJob
**
)
job
=
NULL
;
scheduleFreeJob
(
pJob
);
SCH_RET
(
code
);
SCH_RET
(
code
);
}
}
int32_t
schedulerInit
(
SSchedulerCfg
*
cfg
)
{
if
(
schMgmt
.
jobs
)
{
qError
(
"scheduler already initialized"
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
if
(
cfg
)
{
schMgmt
.
cfg
=
*
cfg
;
if
(
schMgmt
.
cfg
.
maxJobNum
==
0
)
{
schMgmt
.
cfg
.
maxJobNum
=
SCHEDULE_DEFAULT_JOB_NUMBER
;
}
}
else
{
schMgmt
.
cfg
.
maxJobNum
=
SCHEDULE_DEFAULT_JOB_NUMBER
;
}
schMgmt
.
jobs
=
taosHashInit
(
schMgmt
.
cfg
.
maxJobNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
schMgmt
.
jobs
)
{
qError
(
"init schduler jobs failed, num:%u"
,
schMgmt
.
cfg
.
maxJobNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
if
(
taosGetSystemUUID
((
char
*
)
&
schMgmt
.
sId
,
sizeof
(
schMgmt
.
sId
)))
{
qError
(
"generate schdulerId failed, errno:%d"
,
errno
);
SCH_ERR_RET
(
TSDB_CODE_QRY_SYS_ERROR
);
}
qInfo
(
"scheduler %"
PRIx64
" initizlized, maxJob:%u"
,
schMgmt
.
sId
,
schMgmt
.
cfg
.
maxJobNum
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
scheduleExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
void
**
pJob
,
SQueryResult
*
pRes
)
{
int32_t
scheduleExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
void
**
pJob
,
SQueryResult
*
pRes
)
{
if
(
NULL
==
transport
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
||
NULL
==
pRes
)
{
if
(
NULL
==
transport
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
||
NULL
==
pRes
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
SCH_ERR_RET
(
sch
edule
ExecJobImpl
(
transport
,
nodeList
,
pDag
,
pJob
,
true
));
SCH_ERR_RET
(
schExecJobImpl
(
transport
,
nodeList
,
pDag
,
pJob
,
true
));
SSchJob
*
job
=
*
(
SSchJob
**
)
pJob
;
SSchJob
*
job
=
*
(
SSchJob
**
)
pJob
;
...
@@ -1000,7 +1042,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
...
@@ -1000,7 +1042,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
return
sch
edule
ExecJobImpl
(
transport
,
nodeList
,
pDag
,
pJob
,
false
);
return
schExecJobImpl
(
transport
,
nodeList
,
pDag
,
pJob
,
false
);
}
}
...
@@ -1012,7 +1054,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) {
...
@@ -1012,7 +1054,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) {
SSchJob
*
job
=
pJob
;
SSchJob
*
job
=
pJob
;
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
!
job
->
attr
.
needFetch
)
{
if
(
!
SCH_JOB_NEED_FETCH
(
&
job
->
attr
)
)
{
qError
(
"no need to fetch data"
);
qError
(
"no need to fetch data"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
}
...
@@ -1091,7 +1133,7 @@ void scheduleFreeJob(void *pJob) {
...
@@ -1091,7 +1133,7 @@ void scheduleFreeJob(void *pJob) {
int32_t
numOfTasks
=
taosArrayGetSize
(
pLevel
->
subTasks
);
int32_t
numOfTasks
=
taosArrayGetSize
(
pLevel
->
subTasks
);
for
(
int32_t
j
=
0
;
j
<
numOfTasks
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
numOfTasks
;
++
j
)
{
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
j
);
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
j
);
cleanup
Task
(
pTask
);
schFree
Task
(
pTask
);
}
}
taosArrayDestroy
(
pLevel
->
subTasks
);
taosArrayDestroy
(
pLevel
->
subTasks
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录