Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
13d44067
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
13d44067
编写于
12月 25, 2021
作者:
D
dapan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
catalog ut
上级
7c9a9d21
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
335 addition
and
55 deletion
+335
-55
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+1
-1
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+23
-5
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+140
-20
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+2
-0
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+37
-22
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+132
-7
未找到文件。
include/libs/catalog/catalog.h
浏览文件 @
13d44067
...
...
@@ -110,7 +110,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
int32_t
catalogGetTableDistVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
pVgroupList
);
int32_t
catalogGetTableDistVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
*
pVgroupList
);
/**
* Get a table's vgroup from its name's hash value.
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
13d44067
...
...
@@ -197,15 +197,21 @@ int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetVgInfoFromDB
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SDBVgroupInfo
*
dbInfo
,
SArray
*
vgroupList
)
{
int32_t
ctgGetVgInfoFromDB
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SDBVgroupInfo
*
dbInfo
,
SArray
*
*
vgroupList
)
{
SHashObj
*
vgroupHash
=
NULL
;
SVgroupInfo
*
vgInfo
=
NULL
;
*
vgroupList
=
taosArrayInit
(
taosHashGetSize
(
dbInfo
->
vgInfo
),
sizeof
(
SVgroupInfo
));
if
(
NULL
==
*
vgroupList
)
{
ctgError
(
"taosArrayInit failed"
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
void
*
pIter
=
taosHashIterate
(
dbInfo
->
vgInfo
,
NULL
);
while
(
pIter
)
{
vgInfo
=
pIter
;
if
(
NULL
==
taosArrayPush
(
vgroupList
,
vgInfo
))
{
if
(
NULL
==
taosArrayPush
(
*
vgroupList
,
vgInfo
))
{
ctgError
(
"taosArrayPush failed"
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
...
...
@@ -522,7 +528,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
return
ctgGetTableMetaImpl
(
pCatalog
,
pRpc
,
pMgmtEps
,
pDBName
,
pTableName
,
true
,
pTableMeta
);
}
int32_t
catalogGetTableDistVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
pVgroupList
)
{
int32_t
catalogGetTableDistVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
*
pVgroupList
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pDBName
||
NULL
==
pTableName
||
NULL
==
pVgroupList
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
...
...
@@ -542,17 +548,29 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
int32_t
vgId
=
tbMeta
->
vgId
;
if
(
NULL
==
taosHashGetClone
(
dbVgroup
.
vgInfo
,
&
vgId
,
sizeof
(
vgId
),
&
vgroupInfo
))
{
ctgError
(
"vgId[%d] not found in vgroup list"
,
vgId
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_
J
RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
if
(
NULL
==
taosArrayPush
(
pVgroupList
,
&
vgroupInfo
))
{
*
pVgroupList
=
taosArrayInit
(
1
,
sizeof
(
SVgroupInfo
));
if
(
NULL
==
*
pVgroupList
)
{
ctgError
(
"taosArrayInit failed"
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
if
(
NULL
==
taosArrayPush
(
*
pVgroupList
,
&
vgroupInfo
))
{
ctgError
(
"push vgroupInfo to array failed"
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
}
tfree
(
tbMeta
);
return
TSDB_CODE_SUCCESS
;
_return:
tfree
(
tbMeta
);
taosArrayDestroy
(
*
pVgroupList
);
CTG_RET
(
code
);
}
...
...
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
13d44067
...
...
@@ -36,6 +36,7 @@
namespace
{
void
ctgTestSetPrepareTableMeta
();
void
ctgTestSetPrepareCTableMeta
();
void
ctgTestSetPrepareSTableMeta
();
...
...
@@ -122,21 +123,7 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
return
;
}
void
ctgTestPrepareDbVgroupsAndChildMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
ctgTestPrepareDbVgroups
(
shandle
,
pEpSet
,
pMsg
,
pRsp
);
ctgTestSetPrepareCTableMeta
();
return
;
}
void
ctgTestPrepareDbVgroupsAndSuperMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
ctgTestPrepareDbVgroups
(
shandle
,
pEpSet
,
pMsg
,
pRsp
);
ctgTestSetPrepareSTableMeta
();
return
;
}
void
ctgTestPrepareTableMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
...
...
@@ -262,6 +249,32 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
return
;
}
void
ctgTestPrepareDbVgroupsAndNormalMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
ctgTestPrepareDbVgroups
(
shandle
,
pEpSet
,
pMsg
,
pRsp
);
ctgTestSetPrepareTableMeta
();
return
;
}
void
ctgTestPrepareDbVgroupsAndChildMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
ctgTestPrepareDbVgroups
(
shandle
,
pEpSet
,
pMsg
,
pRsp
);
ctgTestSetPrepareCTableMeta
();
return
;
}
void
ctgTestPrepareDbVgroupsAndSuperMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
ctgTestPrepareDbVgroups
(
shandle
,
pEpSet
,
pMsg
,
pRsp
);
ctgTestSetPrepareSTableMeta
();
return
;
}
void
ctgTestSetPrepareDbVgroups
()
{
static
Stub
stub
;
...
...
@@ -315,6 +328,19 @@ void ctgTestSetPrepareSTableMeta() {
}
}
void
ctgTestSetPrepareDbVgroupsAndNormalMeta
()
{
static
Stub
stub
;
stub
.
set
(
rpcSendRecv
,
ctgTestPrepareDbVgroupsAndNormalMeta
);
{
AddrAny
any
(
"libtransport.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
ctgTestPrepareDbVgroupsAndNormalMeta
);
}
}
}
void
ctgTestSetPrepareDbVgroupsAndChildMeta
()
{
static
Stub
stub
;
...
...
@@ -324,7 +350,7 @@ void ctgTestSetPrepareDbVgroupsAndChildMeta() {
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
(
rpcSendRecv
,
ctgTestPrepareDbVgroupsAndChildMeta
)
);
stub
.
set
(
f
.
second
,
ctgTestPrepareDbVgroupsAndChildMeta
);
}
}
}
...
...
@@ -337,7 +363,7 @@ void ctgTestSetPrepareDbVgroupsAndSuperMeta() {
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
(
rpcSendRecv
,
ctgTestPrepareDbVgroupsAndSuperMeta
)
);
stub
.
set
(
f
.
second
,
ctgTestPrepareDbVgroupsAndSuperMeta
);
}
}
}
...
...
@@ -345,7 +371,7 @@ void ctgTestSetPrepareDbVgroupsAndSuperMeta() {
}
TEST
(
t
estCase
,
normalTableCas
e
)
{
TEST
(
t
ableMeta
,
normalTabl
e
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
vgInfo
=
{
0
};
...
...
@@ -396,7 +422,7 @@ TEST(testCase, normalTableCase) {
catalogDestroy
();
}
TEST
(
t
estCase
,
childTableCase
)
{
TEST
(
t
ableMeta
,
childTableCase
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
vgInfo
=
{
0
};
...
...
@@ -452,7 +478,7 @@ TEST(testCase, childTableCase) {
catalogDestroy
();
}
TEST
(
t
estCase
,
superTableCase
)
{
TEST
(
t
ableMeta
,
superTableCase
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
vgInfo
=
{
0
};
...
...
@@ -496,7 +522,7 @@ TEST(testCase, superTableCase) {
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
tableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestCTablename
,
&
tableMeta
);
code
=
catalog
RenewAnd
GetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestCTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
9
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_CHILD_TABLE
);
...
...
@@ -512,6 +538,100 @@ TEST(testCase, superTableCase) {
catalogDestroy
();
}
TEST
(
tableDistVgroup
,
normalTable
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
*
vgInfo
=
NULL
;
SArray
*
vgList
=
NULL
;
ctgTestSetPrepareDbVgroupsAndNormalMeta
();
initQueryModuleMsgHandle
();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetTableDistVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestTablename
,
&
vgList
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
taosArrayGetSize
((
const
SArray
*
)
vgList
),
1
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
0
);
ASSERT_EQ
(
vgInfo
->
vgId
,
8
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
3
);
catalogDestroy
();
}
TEST
(
tableDistVgroup
,
childTableCase
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
*
vgInfo
=
NULL
;
SArray
*
vgList
=
NULL
;
ctgTestSetPrepareDbVgroupsAndChildMeta
();
initQueryModuleMsgHandle
();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetTableDistVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestCTablename
,
&
vgList
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
taosArrayGetSize
((
const
SArray
*
)
vgList
),
1
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
0
);
ASSERT_EQ
(
vgInfo
->
vgId
,
9
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
4
);
catalogDestroy
();
}
TEST
(
tableDistVgroup
,
superTableCase
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
*
vgInfo
=
NULL
;
SArray
*
vgList
=
NULL
;
ctgTestSetPrepareDbVgroupsAndSuperMeta
();
initQueryModuleMsgHandle
();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetTableDistVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestSTablename
,
&
vgList
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
taosArrayGetSize
((
const
SArray
*
)
vgList
),
10
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
0
);
ASSERT_EQ
(
vgInfo
->
vgId
,
1
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
1
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
1
);
ASSERT_EQ
(
vgInfo
->
vgId
,
2
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
2
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
2
);
ASSERT_EQ
(
vgInfo
->
vgId
,
3
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
3
);
catalogDestroy
();
}
int
main
(
int
argc
,
char
**
argv
)
{
testing
::
InitGoogleTest
(
&
argc
,
argv
);
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
13d44067
...
...
@@ -88,6 +88,8 @@ typedef struct SQueryJob {
int32_t
userFetch
;
int32_t
remoteFetch
;
SQueryTask
*
fetchTask
;
int32_t
errCode
;
void
*
res
;
int32_t
resNumOfRows
;
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
13d44067
...
...
@@ -70,14 +70,14 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
}
for
(
int32_t
n
=
0
;
n
<
childNum
;
++
n
)
{
SSubplan
*
child
=
taosArrayGet
(
plan
->
pChildern
,
n
);
SQueryTask
*
childTask
=
taosHashGet
(
planToTask
,
&
child
,
POINTER_BYTES
);
if
(
childTask
)
{
SSubplan
*
*
child
=
taosArrayGet
(
plan
->
pChildern
,
n
);
SQueryTask
*
*
childTask
=
taosHashGet
(
planToTask
,
child
,
POINTER_BYTES
);
if
(
NULL
==
childTask
||
NULL
==
*
childTask
)
{
qError
(
"subplan relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
if
(
NULL
==
taosArrayPush
(
task
->
children
,
&
childTask
))
{
if
(
NULL
==
taosArrayPush
(
task
->
children
,
childTask
))
{
qError
(
"taosArrayPush failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -92,14 +92,14 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
}
for
(
int32_t
n
=
0
;
n
<
parentNum
;
++
n
)
{
SSubplan
*
parent
=
taosArrayGet
(
plan
->
pParents
,
n
);
SQueryTask
*
parentTask
=
taosHashGet
(
planToTask
,
&
parent
,
POINTER_BYTES
);
if
(
parentTask
)
{
SSubplan
*
*
parent
=
taosArrayGet
(
plan
->
pParents
,
n
);
SQueryTask
*
*
parentTask
=
taosHashGet
(
planToTask
,
parent
,
POINTER_BYTES
);
if
(
NULL
==
parentTask
||
NULL
==
*
parentTask
)
{
qError
(
"subplan relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
if
(
NULL
==
taosArrayPush
(
task
->
parents
,
&
parentTask
))
{
if
(
NULL
==
taosArrayPush
(
task
->
parents
,
parentTask
))
{
qError
(
"taosArrayPush failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -296,8 +296,7 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) {
int32_t
schMoveTaskToFailList
(
SQueryJob
*
job
,
SQueryTask
*
task
,
bool
*
moved
)
{
if
(
0
!=
taosHashRemove
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
)))
{
qWarn
(
"remove task[%"
PRIx64
"] from execTasks failed"
,
task
->
taskId
);
return
TSDB_CODE_SUCCESS
;
qWarn
(
"remove task[%"
PRIx64
"] from execTasks failed, it may not exist"
,
task
->
taskId
);
}
if
(
0
!=
taosHashPut
(
job
->
failTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
&
task
,
POINTER_BYTES
))
{
...
...
@@ -363,6 +362,10 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
break
;
}
case
TSDB_MSG_TYPE_FETCH
:
{
if
(
NULL
==
task
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
msgSize
=
sizeof
(
SResFetchMsg
);
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
...
...
@@ -419,7 +422,7 @@ int32_t schFetchFromRemote(SQueryJob *job) {
return
TSDB_CODE_SUCCESS
;
}
SCH_ERR_JRET
(
schAsyncSendMsg
(
job
,
NULL
,
TSDB_MSG_TYPE_FETCH
));
SCH_ERR_JRET
(
schAsyncSendMsg
(
job
,
job
->
fetchTask
,
TSDB_MSG_TYPE_FETCH
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -444,8 +447,9 @@ int32_t schProcessOnJobPartialSuccess(SQueryJob *job) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schProcessOnJobFailure
(
SQueryJob
*
job
)
{
int32_t
schProcessOnJobFailure
(
SQueryJob
*
job
,
int32_t
errCode
)
{
job
->
status
=
JOB_TASK_STATUS_FAILED
;
job
->
errCode
=
errCode
;
atomic_val_compare_exchange_32
(
&
job
->
remoteFetch
,
1
,
0
);
...
...
@@ -474,7 +478,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
task
->
status
=
JOB_TASK_STATUS_SUCCEED
;
int32_t
parentNum
=
(
int32_t
)
taosArrayGetSize
(
task
->
parents
)
;
int32_t
parentNum
=
task
->
parents
?
(
int32_t
)
taosArrayGetSize
(
task
->
parents
)
:
0
;
if
(
parentNum
==
0
)
{
if
(
task
->
plan
->
level
!=
0
)
{
qError
(
"level error"
);
...
...
@@ -496,7 +500,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
if
(
task
->
level
->
taskFailed
>
0
)
{
job
->
status
=
JOB_TASK_STATUS_FAILED
;
SCH_ERR_RET
(
schProcessOnJobFailure
(
job
));
SCH_ERR_RET
(
schProcessOnJobFailure
(
job
,
TSDB_CODE_QRY_APP_ERROR
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -505,6 +509,8 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
job
->
resEp
.
port
=
task
->
execAddr
.
port
;
}
job
->
fetchTask
=
task
;
SCH_ERR_RET
(
schProcessOnJobPartialSuccess
(
job
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -518,14 +524,14 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
}
for
(
int32_t
i
=
0
;
i
<
parentNum
;
++
i
)
{
SQueryTask
*
par
=
taosArrayGet
(
task
->
parents
,
i
);
SQueryTask
*
par
=
*
(
SQueryTask
**
)
taosArrayGet
(
task
->
parents
,
i
);
++
par
->
childReady
;
SCH_ERR_RET
(
qSetSubplanExecutionNode
(
par
->
plan
,
task
->
plan
->
id
.
templateId
,
&
task
->
execAddr
));
if
(
SCH_TASK_READY_TO_LUNCH
(
par
))
{
SCH_ERR_RET
(
schLaunchTask
(
job
,
task
));
SCH_ERR_RET
(
schLaunchTask
(
job
,
par
));
}
}
...
...
@@ -544,7 +550,6 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
SCH_ERR_RET
(
schMoveTaskToFailList
(
job
,
task
,
&
moved
));
if
(
!
moved
)
{
SCH_TASK_ERR_LOG
(
"task may already moved, status:%d"
,
task
->
status
);
return
TSDB_CODE_SUCCESS
;
}
if
(
SCH_TASK_NEED_WAIT_ALL
(
task
))
{
...
...
@@ -560,7 +565,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
}
job
->
status
=
JOB_TASK_STATUS_FAILED
;
SCH_ERR_RET
(
schProcessOnJobFailure
(
job
));
SCH_ERR_RET
(
schProcessOnJobFailure
(
job
,
errCode
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -617,9 +622,6 @@ int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, char
case
TSDB_MSG_TYPE_FETCH
:
{
SCH_ERR_JRET
(
rspCode
);
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
msg
;
if
(
rsp
->
completed
)
{
job
->
status
=
JOB_TASK_STATUS_SUCCEED
;
}
job
->
res
=
rsp
;
job
->
resNumOfRows
=
rsp
->
numOfRows
;
...
...
@@ -639,7 +641,7 @@ _task_error:
return
TSDB_CODE_SUCCESS
;
_return:
code
=
schProcessOnJobFailure
(
job
);
code
=
schProcessOnJobFailure
(
job
,
code
);
return
code
;
}
...
...
@@ -821,6 +823,11 @@ int32_t scheduleFetchRows(void *pJob, void **data) {
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
job
->
status
==
JOB_TASK_STATUS_FAILED
)
{
job
->
res
=
NULL
;
SCH_RET
(
job
->
errCode
);
}
if
(
job
->
status
==
JOB_TASK_STATUS_SUCCEED
)
{
job
->
res
=
NULL
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -837,6 +844,14 @@ int32_t scheduleFetchRows(void *pJob, void **data) {
tsem_wait
(
&
job
->
rspSem
);
if
(
job
->
status
==
JOB_TASK_STATUS_FAILED
)
{
code
=
job
->
errCode
;
}
if
(
job
->
res
&&
((
SRetrieveTableRsp
*
)
job
->
res
)
->
completed
)
{
job
->
status
=
JOB_TASK_STATUS_SUCCEED
;
}
*
data
=
job
->
res
;
job
->
res
=
NULL
;
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
13d44067
...
...
@@ -30,10 +30,16 @@
#include "scheduler.h"
#include "tep.h"
#include "trpc.h"
#include "schedulerInt.h"
#include "stub.h"
#include "addr_any.h"
namespace
{
void
mockBuildDag
(
SQueryDag
*
dag
)
{
uint64_t
qId
=
0x111111111111
;
extern
"C"
int32_t
schHandleRspMsg
(
SQueryJob
*
job
,
SQueryTask
*
task
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
);
void
schtBuildDag
(
SQueryDag
*
dag
)
{
uint64_t
qId
=
0x0000000000000001
;
dag
->
queryId
=
qId
;
dag
->
numOfSubplans
=
2
;
...
...
@@ -45,22 +51,26 @@ void mockBuildDag(SQueryDag *dag) {
SSubplan
mergePlan
=
{
0
};
scanPlan
.
id
.
queryId
=
qId
;
scanPlan
.
id
.
templateId
=
0x
222222222
2
;
scanPlan
.
id
.
subplanId
=
0x
333333333
3
;
scanPlan
.
id
.
templateId
=
0x
000000000000000
2
;
scanPlan
.
id
.
subplanId
=
0x
000000000000000
3
;
scanPlan
.
type
=
QUERY_TYPE_SCAN
;
scanPlan
.
level
=
1
;
scanPlan
.
execEpSet
.
numOfEps
=
1
;
scanPlan
.
execEpSet
.
port
[
0
]
=
6030
;
strcpy
(
scanPlan
.
execEpSet
.
fqdn
[
0
],
"ep0"
);
scanPlan
.
pChildern
=
NULL
;
scanPlan
.
pParents
=
taosArrayInit
(
1
,
POINTER_BYTES
);
scanPlan
.
pNode
=
(
SPhyNode
*
)
calloc
(
1
,
sizeof
(
SPhyNode
));
mergePlan
.
id
.
queryId
=
qId
;
mergePlan
.
id
.
templateId
=
0x4444444444
;
mergePlan
.
id
.
subplanId
=
0x5555555555
;
mergePlan
.
type
=
QUERY_TYPE_MERGE
;
mergePlan
.
level
=
0
;
mergePlan
.
execEpSet
.
numOfEps
=
1
;
mergePlan
.
execEpSet
.
numOfEps
=
0
;
mergePlan
.
pChildern
=
taosArrayInit
(
1
,
POINTER_BYTES
);
mergePlan
.
pParents
=
NULL
;
mergePlan
.
pNode
=
(
SPhyNode
*
)
calloc
(
1
,
sizeof
(
SPhyNode
));
SSubplan
*
mergePointer
=
(
SSubplan
*
)
taosArrayPush
(
merge
,
&
mergePlan
);
SSubplan
*
scanPointer
=
(
SSubplan
*
)
taosArrayPush
(
scan
,
&
scanPlan
);
...
...
@@ -72,9 +82,47 @@ void mockBuildDag(SQueryDag *dag) {
taosArrayPush
(
dag
->
pSubplans
,
&
scan
);
}
int32_t
schtPlanToString
(
const
SSubplan
*
subplan
,
char
**
str
,
int32_t
*
len
)
{
*
str
=
(
char
*
)
calloc
(
1
,
20
);
*
len
=
20
;
return
0
;
}
int32_t
schtExecNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SEpAddr
*
ep
)
{
return
0
;
}
void
schtSetPlanToString
()
{
static
Stub
stub
;
stub
.
set
(
qSubPlanToString
,
schtPlanToString
);
{
AddrAny
any
(
"libplanner.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^qSubPlanToString$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
schtPlanToString
);
}
}
}
void
schtSetExecNode
()
{
static
Stub
stub
;
stub
.
set
(
qSetSubplanExecutionNode
,
schtExecNode
);
{
AddrAny
any
(
"libplanner.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^qSetSubplanExecutionNode$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
schtExecNode
);
}
}
}
}
TEST
(
testCase
,
normalCase
)
{
TEST
(
queryTest
,
normalCase
)
{
void
*
mockPointer
=
(
void
*
)
0x1
;
char
*
clusterId
=
"cluster1"
;
char
*
dbname
=
"1.db1"
;
...
...
@@ -83,14 +131,91 @@ TEST(testCase, normalCase) {
void
*
pJob
=
NULL
;
SQueryDag
dag
=
{
0
};
SArray
*
qnodeList
=
taosArrayInit
(
1
,
sizeof
(
SEpAddr
));
SEpAddr
qnodeAddr
=
{
0
};
strcpy
(
qnodeAddr
.
fqdn
,
"qnode0.ep"
);
qnodeAddr
.
port
=
6031
;
taosArrayPush
(
qnodeList
,
&
qnodeAddr
);
int32_t
code
=
schedulerInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
mockBuildDag
(
&
dag
);
schtBuildDag
(
&
dag
);
schtSetPlanToString
();
schtSetExecNode
();
code
=
scheduleAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
pJob
);
ASSERT_EQ
(
code
,
0
);
SQueryJob
*
job
=
(
SQueryJob
*
)
pJob
;
void
*
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SQueryTask
*
task
=
*
(
SQueryTask
**
)
pIter
;
SQueryTableRsp
rsp
=
{
0
};
code
=
schHandleRspMsg
(
job
,
task
,
TSDB_MSG_TYPE_QUERY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SQueryTask
*
task
=
*
(
SQueryTask
**
)
pIter
;
SResReadyRsp
rsp
=
{
0
};
code
=
schHandleRspMsg
(
job
,
task
,
TSDB_MSG_TYPE_RES_READY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SQueryTask
*
task
=
*
(
SQueryTask
**
)
pIter
;
SQueryTableRsp
rsp
=
{
0
};
code
=
schHandleRspMsg
(
job
,
task
,
TSDB_MSG_TYPE_QUERY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SQueryTask
*
task
=
*
(
SQueryTask
**
)
pIter
;
SResReadyRsp
rsp
=
{
0
};
code
=
schHandleRspMsg
(
job
,
task
,
TSDB_MSG_TYPE_RES_READY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
SRetrieveTableRsp
rsp
=
{
0
};
rsp
.
completed
=
1
;
rsp
.
numOfRows
=
10
;
code
=
schHandleRspMsg
(
job
,
NULL
,
TSDB_MSG_TYPE_FETCH
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
void
*
data
=
NULL
;
code
=
scheduleFetchRows
(
job
,
&
data
);
ASSERT_EQ
(
code
,
0
);
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
data
;
ASSERT_EQ
(
pRsp
->
completed
,
1
);
ASSERT_EQ
(
pRsp
->
numOfRows
,
10
);
data
=
NULL
;
code
=
scheduleFetchRows
(
job
,
&
data
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
data
,
(
void
*
)
NULL
);
scheduleFreeJob
(
pJob
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录