Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
408b1168
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
408b1168
编写于
12月 27, 2021
作者:
S
Shengliang Guan
提交者:
GitHub
12月 27, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9397 from taosdata/feature/qnode
Feature/qnode
上级
c5b42e57
0be19201
变更
14
显示空白变更内容
内联
并排
Showing
14 changed file
with
1227 addition
and
224 deletion
+1227
-224
CMakeLists.txt
CMakeLists.txt
+1
-0
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+2
-2
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+10
-2
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+1
-1
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+27
-16
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+526
-32
source/libs/qcom/src/querymsg.c
source/libs/qcom/src/querymsg.c
+17
-16
source/libs/qworker/CMakeLists.txt
source/libs/qworker/CMakeLists.txt
+2
-0
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+5
-0
source/libs/qworker/test/CMakeLists.txt
source/libs/qworker/test/CMakeLists.txt
+18
-0
source/libs/qworker/test/qworkerTests.cpp
source/libs/qworker/test/qworkerTests.cpp
+120
-0
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+20
-9
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+183
-89
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+295
-57
未找到文件。
CMakeLists.txt
浏览文件 @
408b1168
...
@@ -11,6 +11,7 @@ set(CMAKE_CONTRIB_DIR "${CMAKE_SOURCE_DIR}/contrib")
...
@@ -11,6 +11,7 @@ set(CMAKE_CONTRIB_DIR "${CMAKE_SOURCE_DIR}/contrib")
include
(
${
CMAKE_SUPPORT_DIR
}
/cmake.options
)
include
(
${
CMAKE_SUPPORT_DIR
}
/cmake.options
)
SET
(
CMAKE_C_FLAGS
"
${
CMAKE_C_FLAGS
}
-fPIC -gdwarf-2 -msse4.2 -mfma -g3"
)
SET
(
CMAKE_C_FLAGS
"
${
CMAKE_C_FLAGS
}
-fPIC -gdwarf-2 -msse4.2 -mfma -g3"
)
SET
(
CMAKE_CXX_FLAGS
"
${
CMAKE_CXX_FLAGS
}
-fPIC -gdwarf-2 -msse4.2 -mfma -g3"
)
# contrib
# contrib
add_subdirectory
(
contrib
)
add_subdirectory
(
contrib
)
...
...
include/libs/catalog/catalog.h
浏览文件 @
408b1168
...
@@ -110,7 +110,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
...
@@ -110,7 +110,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
* @return error code
*/
*/
int32_t
catalogGetTableDistVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
pVgroupList
);
int32_t
catalogGetTableDistVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
*
pVgroupList
);
/**
/**
* Get a table's vgroup from its name's hash value.
* Get a table's vgroup from its name's hash value.
...
@@ -137,7 +137,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter
...
@@ -137,7 +137,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter
int32_t
catalogGetAllMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SCatalogReq
*
pReq
,
SMetaData
*
pRsp
);
int32_t
catalogGetAllMeta
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
SCatalogReq
*
pReq
,
SMetaData
*
pRsp
);
int32_t
catalogGetQnodeList
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
S
EpSet
*
pQnodeEpSe
t
);
int32_t
catalogGetQnodeList
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
S
Array
*
pQnodeLis
t
);
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
408b1168
...
@@ -59,7 +59,15 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
...
@@ -59,7 +59,15 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param qnodeList Qnode address list, element is SEpAddr
* @param qnodeList Qnode address list, element is SEpAddr
* @return
* @return
*/
*/
int32_t
scheduleExecJob
(
void
*
transport
,
SArray
*
qnodeList
,
SQueryDag
*
pDag
,
void
**
pJob
);
int32_t
scheduleExecJob
(
void
*
transport
,
SArray
*
qnodeList
,
SQueryDag
*
pDag
,
void
**
pJob
,
uint64_t
*
numOfRows
);
/**
* Process the query job, generated according to the query physical plan.
* This is a asynchronized API, and is also thread-safety.
* @param qnodeList Qnode address list, element is SEpAddr
* @return
*/
int32_t
scheduleAsyncExecJob
(
void
*
transport
,
SArray
*
qnodeList
,
SQueryDag
*
pDag
,
void
**
pJob
);
int32_t
scheduleFetchRows
(
void
*
pJob
,
void
**
data
);
int32_t
scheduleFetchRows
(
void
*
pJob
,
void
**
data
);
...
...
source/client/src/clientImpl.c
浏览文件 @
408b1168
...
@@ -209,7 +209,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
...
@@ -209,7 +209,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
}
}
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryDag
*
pDag
,
void
**
pJob
)
{
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryDag
*
pDag
,
void
**
pJob
)
{
return
scheduleExecJob
(
pRequest
->
pTscObj
->
pTransporter
,
NULL
/*todo appInfo.xxx*/
,
pDag
,
pJob
);
return
schedule
Async
ExecJob
(
pRequest
->
pTscObj
->
pTransporter
,
NULL
/*todo appInfo.xxx*/
,
pDag
,
pJob
);
}
}
TAOS_RES
*
taos_query_l
(
TAOS
*
taos
,
const
char
*
sql
,
int
sqlLen
)
{
TAOS_RES
*
taos_query_l
(
TAOS
*
taos
,
const
char
*
sql
,
int
sqlLen
)
{
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
408b1168
...
@@ -197,15 +197,21 @@ int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
...
@@ -197,15 +197,21 @@ int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
ctgGetVgInfoFromDB
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SDBVgroupInfo
*
dbInfo
,
SArray
*
vgroupList
)
{
int32_t
ctgGetVgInfoFromDB
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SDBVgroupInfo
*
dbInfo
,
SArray
*
*
vgroupList
)
{
SHashObj
*
vgroupHash
=
NULL
;
SHashObj
*
vgroupHash
=
NULL
;
SVgroupInfo
*
vgInfo
=
NULL
;
SVgroupInfo
*
vgInfo
=
NULL
;
*
vgroupList
=
taosArrayInit
(
taosHashGetSize
(
dbInfo
->
vgInfo
),
sizeof
(
SVgroupInfo
));
if
(
NULL
==
*
vgroupList
)
{
ctgError
(
"taosArrayInit failed"
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
void
*
pIter
=
taosHashIterate
(
dbInfo
->
vgInfo
,
NULL
);
void
*
pIter
=
taosHashIterate
(
dbInfo
->
vgInfo
,
NULL
);
while
(
pIter
)
{
while
(
pIter
)
{
vgInfo
=
pIter
;
vgInfo
=
pIter
;
if
(
NULL
==
taosArrayPush
(
vgroupList
,
vgInfo
))
{
if
(
NULL
==
taosArrayPush
(
*
vgroupList
,
vgInfo
))
{
ctgError
(
"taosArrayPush failed"
);
ctgError
(
"taosArrayPush failed"
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
}
...
@@ -295,14 +301,6 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
...
@@ -295,14 +301,6 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
}
if
(
NULL
==
pCatalog
->
tableCache
.
cache
)
{
pCatalog
->
tableCache
.
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
tableCache
.
cache
)
{
ctgError
(
"init hash[%d] for tablemeta cache failed"
,
ctgMgmt
.
cfg
.
maxTblCacheNum
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
}
if
(
NULL
==
pCatalog
->
tableCache
.
cache
)
{
if
(
NULL
==
pCatalog
->
tableCache
.
cache
)
{
pCatalog
->
tableCache
.
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
pCatalog
->
tableCache
.
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
tableCache
.
cache
)
{
if
(
NULL
==
pCatalog
->
tableCache
.
cache
)
{
...
@@ -329,7 +327,8 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
...
@@ -329,7 +327,8 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
}
}
}
}
if
(
taosHashPut
(
pCatalog
->
tableCache
.
cache
,
output
->
tbFname
,
strlen
(
output
->
tbFname
),
output
->
tbMeta
,
sizeof
(
*
output
->
tbMeta
))
!=
0
)
{
int32_t
tbSize
=
sizeof
(
*
output
->
tbMeta
)
+
sizeof
(
SSchema
)
*
(
output
->
tbMeta
->
tableInfo
.
numOfColumns
+
output
->
tbMeta
->
tableInfo
.
numOfTags
);
if
(
taosHashPut
(
pCatalog
->
tableCache
.
cache
,
output
->
tbFname
,
strlen
(
output
->
tbFname
),
output
->
tbMeta
,
tbSize
)
!=
0
)
{
ctgError
(
"push table[%s] to table cache failed"
,
output
->
tbFname
);
ctgError
(
"push table[%s] to table cache failed"
,
output
->
tbFname
);
goto
error_exit
;
goto
error_exit
;
}
}
...
@@ -529,7 +528,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
...
@@ -529,7 +528,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
return
ctgGetTableMetaImpl
(
pCatalog
,
pRpc
,
pMgmtEps
,
pDBName
,
pTableName
,
true
,
pTableMeta
);
return
ctgGetTableMetaImpl
(
pCatalog
,
pRpc
,
pMgmtEps
,
pDBName
,
pTableName
,
true
,
pTableMeta
);
}
}
int32_t
catalogGetTableDistVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
pVgroupList
)
{
int32_t
catalogGetTableDistVgroup
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SArray
*
*
pVgroupList
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pDBName
||
NULL
==
pTableName
||
NULL
==
pVgroupList
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pDBName
||
NULL
==
pTableName
||
NULL
==
pVgroupList
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
}
...
@@ -549,18 +548,30 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
...
@@ -549,18 +548,30 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
int32_t
vgId
=
tbMeta
->
vgId
;
int32_t
vgId
=
tbMeta
->
vgId
;
if
(
NULL
==
taosHashGetClone
(
dbVgroup
.
vgInfo
,
&
vgId
,
sizeof
(
vgId
),
&
vgroupInfo
))
{
if
(
NULL
==
taosHashGetClone
(
dbVgroup
.
vgInfo
,
&
vgId
,
sizeof
(
vgId
),
&
vgroupInfo
))
{
ctgError
(
"vgId[%d] not found in vgroup list"
,
vgId
);
ctgError
(
"vgId[%d] not found in vgroup list"
,
vgId
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
*
pVgroupList
=
taosArrayInit
(
1
,
sizeof
(
SVgroupInfo
));
if
(
NULL
==
*
pVgroupList
)
{
ctgError
(
"taosArrayInit failed"
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
}
if
(
NULL
==
taosArrayPush
(
pVgroupList
,
&
vgroupInfo
))
{
if
(
NULL
==
taosArrayPush
(
*
pVgroupList
,
&
vgroupInfo
))
{
ctgError
(
"push vgroupInfo to array failed"
);
ctgError
(
"push vgroupInfo to array failed"
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
}
}
}
tfree
(
tbMeta
);
return
TSDB_CODE_SUCCESS
;
_return:
_return:
tfree
(
tbMeta
);
tfree
(
tbMeta
);
taosArrayDestroy
(
*
pVgroupList
);
CTG_RET
(
code
);
CTG_RET
(
code
);
}
}
...
@@ -634,8 +645,8 @@ _return:
...
@@ -634,8 +645,8 @@ _return:
CTG_RET
(
code
);
CTG_RET
(
code
);
}
}
int32_t
catalogGetQnodeList
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
S
EpSet
*
pQnodeEpSe
t
)
{
int32_t
catalogGetQnodeList
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
S
Array
*
pQnodeLis
t
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pQnode
EpSe
t
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pQnode
Lis
t
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
}
...
...
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
408b1168
...
@@ -32,27 +32,28 @@
...
@@ -32,27 +32,28 @@
#include "stub.h"
#include "stub.h"
#include "addr_any.h"
#include "addr_any.h"
typedef
struct
SAppInstInfo
{
int64_t
numOfConns
;
SCorEpSet
mgmtEp
;
}
SAppInstInfo
;
typedef
struct
STscObj
{
char
user
[
TSDB_USER_LEN
];
char
pass
[
TSDB_PASSWORD_LEN
];
char
acctId
[
TSDB_ACCT_ID_LEN
];
char
db
[
TSDB_ACCT_ID_LEN
+
TSDB_DB_NAME_LEN
];
uint32_t
connId
;
uint64_t
id
;
// ref ID returned by taosAddRef
// struct SSqlObj *sqlList;
void
*
pTransporter
;
pthread_mutex_t
mutex
;
// used to protect the operation on db
int32_t
numOfReqs
;
// number of sqlObj from this tscObj
SAppInstInfo
*
pAppInfo
;
}
STscObj
;
namespace
{
namespace
{
void
ctgTestSetPrepareTableMeta
();
void
ctgTestSetPrepareCTableMeta
();
void
ctgTestSetPrepareSTableMeta
();
int32_t
ctgTestVgNum
=
10
;
int32_t
ctgTestColNum
=
2
;
int32_t
ctgTestTagNum
=
1
;
int32_t
ctgTestSVersion
=
1
;
int32_t
ctgTestTVersion
=
1
;
char
*
ctgTestClusterId
=
"cluster1"
;
char
*
ctgTestDbname
=
"1.db1"
;
char
*
ctgTestTablename
=
"table1"
;
char
*
ctgTestCTablename
=
"ctable1"
;
char
*
ctgTestSTablename
=
"stable1"
;
void
sendCreateDbMsg
(
void
*
shandle
,
SEpSet
*
pEpSet
)
{
void
sendCreateDbMsg
(
void
*
shandle
,
SEpSet
*
pEpSet
)
{
SCreateDbMsg
*
pReq
=
(
SCreateDbMsg
*
)
rpcMallocCont
(
sizeof
(
SCreateDbMsg
));
SCreateDbMsg
*
pReq
=
(
SCreateDbMsg
*
)
rpcMallocCont
(
sizeof
(
SCreateDbMsg
));
strcpy
(
pReq
->
db
,
"1.db1"
);
strcpy
(
pReq
->
db
,
"1.db1"
);
...
@@ -88,22 +89,281 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
...
@@ -88,22 +89,281 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
ASSERT_EQ
(
rpcRsp
.
code
,
0
);
ASSERT_EQ
(
rpcRsp
.
code
,
0
);
}
}
void
__rpcSendRecv
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
void
ctgTestPrepareDbVgroups
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
SUseDbRsp
*
rspMsg
=
NULL
;
//todo
SUseDbRsp
*
rspMsg
=
NULL
;
//todo
pRsp
->
code
=
0
;
pRsp
->
contLen
=
sizeof
(
SUseDbRsp
)
+
ctgTestVgNum
*
sizeof
(
SVgroupInfo
);
pRsp
->
pCont
=
calloc
(
1
,
pRsp
->
contLen
);
rspMsg
=
(
SUseDbRsp
*
)
pRsp
->
pCont
;
strcpy
(
rspMsg
->
db
,
ctgTestDbname
);
rspMsg
->
vgVersion
=
htonl
(
1
);
rspMsg
->
vgNum
=
htonl
(
ctgTestVgNum
);
rspMsg
->
hashMethod
=
0
;
SVgroupInfo
*
vg
=
NULL
;
uint32_t
hashUnit
=
UINT32_MAX
/
ctgTestVgNum
;
for
(
int32_t
i
=
0
;
i
<
ctgTestVgNum
;
++
i
)
{
vg
=
&
rspMsg
->
vgroupInfo
[
i
];
vg
->
vgId
=
htonl
(
i
+
1
);
vg
->
hashBegin
=
htonl
(
i
*
hashUnit
);
vg
->
hashEnd
=
htonl
(
hashUnit
*
(
i
+
1
)
-
1
);
vg
->
numOfEps
=
i
%
TSDB_MAX_REPLICA
+
1
;
vg
->
inUse
=
i
%
vg
->
numOfEps
;
for
(
int32_t
n
=
0
;
n
<
vg
->
numOfEps
;
++
n
)
{
SEpAddrMsg
*
addr
=
&
vg
->
epAddr
[
n
];
strcpy
(
addr
->
fqdn
,
"a0"
);
addr
->
port
=
htons
(
n
+
22
);
}
}
vg
->
hashEnd
=
htonl
(
UINT32_MAX
);
return
;
}
void
ctgTestPrepareTableMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
STableMetaMsg
*
rspMsg
=
NULL
;
//todo
pRsp
->
code
=
0
;
pRsp
->
contLen
=
sizeof
(
STableMetaMsg
)
+
(
ctgTestColNum
+
ctgTestTagNum
)
*
sizeof
(
SSchema
);
pRsp
->
pCont
=
calloc
(
1
,
pRsp
->
contLen
);
rspMsg
=
(
STableMetaMsg
*
)
pRsp
->
pCont
;
sprintf
(
rspMsg
->
tbFname
,
"%s.%s"
,
ctgTestDbname
,
ctgTestTablename
);
rspMsg
->
numOfTags
=
0
;
rspMsg
->
numOfColumns
=
htonl
(
ctgTestColNum
);
rspMsg
->
precision
=
1
;
rspMsg
->
tableType
=
TSDB_NORMAL_TABLE
;
rspMsg
->
update
=
1
;
rspMsg
->
sversion
=
htonl
(
ctgTestSVersion
);
rspMsg
->
tversion
=
htonl
(
ctgTestTVersion
);
rspMsg
->
suid
=
0
;
rspMsg
->
tuid
=
htobe64
(
0x0000000000000001
);
rspMsg
->
vgId
=
htonl
(
8
);
SSchema
*
s
=
NULL
;
s
=
&
rspMsg
->
pSchema
[
0
];
s
->
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
s
->
colId
=
htonl
(
0
);
s
->
bytes
=
htonl
(
8
);
strcpy
(
s
->
name
,
"ts"
);
s
=
&
rspMsg
->
pSchema
[
1
];
s
->
type
=
TSDB_DATA_TYPE_INT
;
s
->
colId
=
htonl
(
1
);
s
->
bytes
=
htonl
(
4
);
strcpy
(
s
->
name
,
"col1"
);
return
;
}
void
ctgTestPrepareCTableMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
STableMetaMsg
*
rspMsg
=
NULL
;
//todo
pRsp
->
code
=
0
;
pRsp
->
contLen
=
sizeof
(
STableMetaMsg
)
+
(
ctgTestColNum
+
ctgTestTagNum
)
*
sizeof
(
SSchema
);
pRsp
->
pCont
=
calloc
(
1
,
pRsp
->
contLen
);
rspMsg
=
(
STableMetaMsg
*
)
pRsp
->
pCont
;
sprintf
(
rspMsg
->
tbFname
,
"%s.%s"
,
ctgTestDbname
,
ctgTestCTablename
);
sprintf
(
rspMsg
->
stbFname
,
"%s.%s"
,
ctgTestDbname
,
ctgTestSTablename
);
rspMsg
->
numOfTags
=
htonl
(
ctgTestTagNum
);
rspMsg
->
numOfColumns
=
htonl
(
ctgTestColNum
);
rspMsg
->
precision
=
1
;
rspMsg
->
tableType
=
TSDB_CHILD_TABLE
;
rspMsg
->
update
=
1
;
rspMsg
->
sversion
=
htonl
(
ctgTestSVersion
);
rspMsg
->
tversion
=
htonl
(
ctgTestTVersion
);
rspMsg
->
suid
=
htobe64
(
0x0000000000000002
);
rspMsg
->
tuid
=
htobe64
(
0x0000000000000003
);
rspMsg
->
vgId
=
htonl
(
9
);
SSchema
*
s
=
NULL
;
s
=
&
rspMsg
->
pSchema
[
0
];
s
->
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
s
->
colId
=
htonl
(
0
);
s
->
bytes
=
htonl
(
8
);
strcpy
(
s
->
name
,
"ts"
);
s
=
&
rspMsg
->
pSchema
[
1
];
s
->
type
=
TSDB_DATA_TYPE_INT
;
s
->
colId
=
htonl
(
1
);
s
->
bytes
=
htonl
(
4
);
strcpy
(
s
->
name
,
"col1s"
);
s
=
&
rspMsg
->
pSchema
[
2
];
s
->
type
=
TSDB_DATA_TYPE_BINARY
;
s
->
colId
=
htonl
(
2
);
s
->
bytes
=
htonl
(
12
);
strcpy
(
s
->
name
,
"tag1s"
);
return
;
}
void
ctgTestPrepareSTableMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
STableMetaMsg
*
rspMsg
=
NULL
;
//todo
pRsp
->
code
=
0
;
pRsp
->
contLen
=
sizeof
(
STableMetaMsg
)
+
(
ctgTestColNum
+
ctgTestTagNum
)
*
sizeof
(
SSchema
);
pRsp
->
pCont
=
calloc
(
1
,
pRsp
->
contLen
);
rspMsg
=
(
STableMetaMsg
*
)
pRsp
->
pCont
;
sprintf
(
rspMsg
->
tbFname
,
"%s.%s"
,
ctgTestDbname
,
ctgTestSTablename
);
sprintf
(
rspMsg
->
stbFname
,
"%s.%s"
,
ctgTestDbname
,
ctgTestSTablename
);
rspMsg
->
numOfTags
=
htonl
(
ctgTestTagNum
);
rspMsg
->
numOfColumns
=
htonl
(
ctgTestColNum
);
rspMsg
->
precision
=
1
;
rspMsg
->
tableType
=
TSDB_SUPER_TABLE
;
rspMsg
->
update
=
1
;
rspMsg
->
sversion
=
htonl
(
ctgTestSVersion
);
rspMsg
->
tversion
=
htonl
(
ctgTestTVersion
);
rspMsg
->
suid
=
htobe64
(
0x0000000000000002
);
rspMsg
->
tuid
=
htobe64
(
0x0000000000000003
);
rspMsg
->
vgId
=
0
;
SSchema
*
s
=
NULL
;
s
=
&
rspMsg
->
pSchema
[
0
];
s
->
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
s
->
colId
=
htonl
(
0
);
s
->
bytes
=
htonl
(
8
);
strcpy
(
s
->
name
,
"ts"
);
s
=
&
rspMsg
->
pSchema
[
1
];
s
->
type
=
TSDB_DATA_TYPE_INT
;
s
->
colId
=
htonl
(
1
);
s
->
bytes
=
htonl
(
4
);
strcpy
(
s
->
name
,
"col1s"
);
s
=
&
rspMsg
->
pSchema
[
2
];
s
->
type
=
TSDB_DATA_TYPE_BINARY
;
s
->
colId
=
htonl
(
2
);
s
->
bytes
=
htonl
(
12
);
strcpy
(
s
->
name
,
"tag1s"
);
return
;
return
;
}
}
void
ctgTestPrepareDbVgroupsAndNormalMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
ctgTestPrepareDbVgroups
(
shandle
,
pEpSet
,
pMsg
,
pRsp
);
ctgTestSetPrepareTableMeta
();
return
;
}
void
ctgTestPrepareDbVgroupsAndChildMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
ctgTestPrepareDbVgroups
(
shandle
,
pEpSet
,
pMsg
,
pRsp
);
ctgTestSetPrepareCTableMeta
();
return
;
}
void
ctgTestPrepareDbVgroupsAndSuperMeta
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
ctgTestPrepareDbVgroups
(
shandle
,
pEpSet
,
pMsg
,
pRsp
);
ctgTestSetPrepareSTableMeta
();
return
;
}
void
ctgTestSetPrepareDbVgroups
()
{
static
Stub
stub
;
stub
.
set
(
rpcSendRecv
,
ctgTestPrepareDbVgroups
);
{
AddrAny
any
(
"libtransport.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
ctgTestPrepareDbVgroups
);
}
}
}
void
ctgTestSetPrepareTableMeta
()
{
static
Stub
stub
;
stub
.
set
(
rpcSendRecv
,
ctgTestPrepareTableMeta
);
{
AddrAny
any
(
"libtransport.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
ctgTestPrepareTableMeta
);
}
}
}
void
ctgTestSetPrepareCTableMeta
()
{
static
Stub
stub
;
stub
.
set
(
rpcSendRecv
,
ctgTestPrepareCTableMeta
);
{
AddrAny
any
(
"libtransport.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
ctgTestPrepareCTableMeta
);
}
}
}
void
ctgTestSetPrepareSTableMeta
()
{
static
Stub
stub
;
stub
.
set
(
rpcSendRecv
,
ctgTestPrepareSTableMeta
);
{
AddrAny
any
(
"libtransport.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
ctgTestPrepareSTableMeta
);
}
}
}
void
ctgTestSetPrepareDbVgroupsAndNormalMeta
()
{
static
Stub
stub
;
stub
.
set
(
rpcSendRecv
,
ctgTestPrepareDbVgroupsAndNormalMeta
);
{
AddrAny
any
(
"libtransport.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
ctgTestPrepareDbVgroupsAndNormalMeta
);
}
}
}
void
ctgTestSetPrepareDbVgroupsAndChildMeta
()
{
static
Stub
stub
;
stub
.
set
(
rpcSendRecv
,
ctgTestPrepareDbVgroupsAndChildMeta
);
{
AddrAny
any
(
"libtransport.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
ctgTestPrepareDbVgroupsAndChildMeta
);
}
}
}
void
initTestEnv
()
{
void
ctgTestSetPrepareDbVgroupsAndSuperMeta
()
{
static
Stub
stub
;
static
Stub
stub
;
stub
.
set
(
rpcSendRecv
,
__rpcSendRecv
);
stub
.
set
(
rpcSendRecv
,
ctgTestPrepareDbVgroupsAndSuperMeta
);
{
{
AddrAny
any
(
"libtransport.so"
);
AddrAny
any
(
"libtransport.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
any
.
get_global_func_addr_dynsym
(
"^rpcSendRecv$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
__rpcSendRecv
);
stub
.
set
(
f
.
second
,
ctgTestPrepareDbVgroupsAndSuperMeta
);
}
}
}
}
}
}
...
@@ -111,33 +371,267 @@ void initTestEnv() {
...
@@ -111,33 +371,267 @@ void initTestEnv() {
}
}
TEST
(
testCase
,
normalCase
)
{
TEST
(
tableMeta
,
normalTable
)
{
STscObj
*
pConn
=
(
STscObj
*
)
taos_connect
(
"127.0.0.1"
,
"root"
,
"taosdata"
,
NULL
,
0
);
struct
SCatalog
*
pCtg
=
NULL
;
assert
(
pConn
!=
NULL
);
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
vgInfo
=
{
0
};
ctgTestSetPrepareDbVgroups
();
initQueryModuleMsgHandle
();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetTableHashVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestTablename
,
&
vgInfo
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
vgInfo
.
vgId
,
8
);
ASSERT_EQ
(
vgInfo
.
numOfEps
,
3
);
ctgTestSetPrepareTableMeta
();
STableMeta
*
tableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
8
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_NORMAL_TABLE
);
ASSERT_EQ
(
tableMeta
->
sversion
,
ctgTestSVersion
);
ASSERT_EQ
(
tableMeta
->
tversion
,
ctgTestTVersion
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfColumns
,
ctgTestColNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfTags
,
0
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
precision
,
1
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
tableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
8
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_NORMAL_TABLE
);
ASSERT_EQ
(
tableMeta
->
sversion
,
ctgTestSVersion
);
ASSERT_EQ
(
tableMeta
->
tversion
,
ctgTestTVersion
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfColumns
,
ctgTestColNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfTags
,
0
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
precision
,
1
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
catalogDestroy
();
}
char
*
clusterId
=
"cluster1"
;
TEST
(
tableMeta
,
childTableCase
)
{
char
*
dbname
=
"1.db1"
;
char
*
tablename
=
"table1"
;
struct
SCatalog
*
pCtg
=
NULL
;
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
vgInfo
=
{
0
};
SVgroupInfo
vgInfo
=
{
0
};
ctgTestSetPrepareDbVgroupsAndChildMeta
();
initQueryModuleMsgHandle
();
initQueryModuleMsgHandle
();
sendCreateDbMsg
(
pConn
->
pTransporter
,
&
pConn
->
pAppInfo
->
mgmtEp
.
epSet
);
//
sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
clusterId
,
&
pCtg
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
STableMeta
*
tableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestCTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
9
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_CHILD_TABLE
);
ASSERT_EQ
(
tableMeta
->
sversion
,
ctgTestSVersion
);
ASSERT_EQ
(
tableMeta
->
tversion
,
ctgTestTVersion
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfColumns
,
ctgTestColNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfTags
,
ctgTestTagNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
precision
,
1
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
tableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestCTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
9
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_CHILD_TABLE
);
ASSERT_EQ
(
tableMeta
->
sversion
,
ctgTestSVersion
);
ASSERT_EQ
(
tableMeta
->
tversion
,
ctgTestTVersion
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfColumns
,
ctgTestColNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfTags
,
ctgTestTagNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
precision
,
1
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
code
=
catalogGetTableHashVgroup
(
pCtg
,
pConn
->
pTransporter
,
&
pConn
->
pAppInfo
->
mgmtEp
.
epSet
,
dbname
,
tablename
,
&
vgInfo
);
tableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestSTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
0
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_SUPER_TABLE
);
ASSERT_EQ
(
tableMeta
->
sversion
,
ctgTestSVersion
);
ASSERT_EQ
(
tableMeta
->
tversion
,
ctgTestTVersion
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfColumns
,
ctgTestColNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfTags
,
ctgTestTagNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
precision
,
1
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
taos_close
(
pConn
);
catalogDestroy
(
);
}
}
TEST
(
tableMeta
,
superTableCase
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
vgInfo
=
{
0
};
ctgTestSetPrepareDbVgroupsAndSuperMeta
();
initQueryModuleMsgHandle
();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
STableMeta
*
tableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestSTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
0
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_SUPER_TABLE
);
ASSERT_EQ
(
tableMeta
->
sversion
,
ctgTestSVersion
);
ASSERT_EQ
(
tableMeta
->
tversion
,
ctgTestTVersion
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfColumns
,
ctgTestColNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfTags
,
ctgTestTagNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
precision
,
1
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
ctgTestSetPrepareCTableMeta
();
tableMeta
=
NULL
;
code
=
catalogGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestCTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
9
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_CHILD_TABLE
);
ASSERT_EQ
(
tableMeta
->
sversion
,
ctgTestSVersion
);
ASSERT_EQ
(
tableMeta
->
tversion
,
ctgTestTVersion
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfColumns
,
ctgTestColNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfTags
,
ctgTestTagNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
precision
,
1
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
tableMeta
=
NULL
;
code
=
catalogRenewAndGetTableMeta
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestCTablename
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
9
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_CHILD_TABLE
);
ASSERT_EQ
(
tableMeta
->
sversion
,
ctgTestSVersion
);
ASSERT_EQ
(
tableMeta
->
tversion
,
ctgTestTVersion
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfColumns
,
ctgTestColNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
numOfTags
,
ctgTestTagNum
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
precision
,
1
);
ASSERT_EQ
(
tableMeta
->
tableInfo
.
rowSize
,
12
);
catalogDestroy
();
}
TEST
(
tableDistVgroup
,
normalTable
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
*
vgInfo
=
NULL
;
SArray
*
vgList
=
NULL
;
ctgTestSetPrepareDbVgroupsAndNormalMeta
();
initQueryModuleMsgHandle
();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetTableDistVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestTablename
,
&
vgList
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
taosArrayGetSize
((
const
SArray
*
)
vgList
),
1
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
0
);
ASSERT_EQ
(
vgInfo
->
vgId
,
8
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
3
);
catalogDestroy
();
}
TEST
(
tableDistVgroup
,
childTableCase
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
*
vgInfo
=
NULL
;
SArray
*
vgList
=
NULL
;
ctgTestSetPrepareDbVgroupsAndChildMeta
();
initQueryModuleMsgHandle
();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetTableDistVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestCTablename
,
&
vgList
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
taosArrayGetSize
((
const
SArray
*
)
vgList
),
1
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
0
);
ASSERT_EQ
(
vgInfo
->
vgId
,
9
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
4
);
catalogDestroy
();
}
TEST
(
tableDistVgroup
,
superTableCase
)
{
struct
SCatalog
*
pCtg
=
NULL
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SVgroupInfo
*
vgInfo
=
NULL
;
SArray
*
vgList
=
NULL
;
ctgTestSetPrepareDbVgroupsAndSuperMeta
();
initQueryModuleMsgHandle
();
//sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
int32_t
code
=
catalogInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetHandle
(
ctgTestClusterId
,
&
pCtg
);
ASSERT_EQ
(
code
,
0
);
code
=
catalogGetTableDistVgroup
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
ctgTestSTablename
,
&
vgList
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
taosArrayGetSize
((
const
SArray
*
)
vgList
),
10
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
0
);
ASSERT_EQ
(
vgInfo
->
vgId
,
1
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
1
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
1
);
ASSERT_EQ
(
vgInfo
->
vgId
,
2
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
2
);
vgInfo
=
(
SVgroupInfo
*
)
taosArrayGet
(
vgList
,
2
);
ASSERT_EQ
(
vgInfo
->
vgId
,
3
);
ASSERT_EQ
(
vgInfo
->
numOfEps
,
3
);
catalogDestroy
();
}
int
main
(
int
argc
,
char
**
argv
)
{
int
main
(
int
argc
,
char
**
argv
)
{
testing
::
InitGoogleTest
(
&
argc
,
argv
);
testing
::
InitGoogleTest
(
&
argc
,
argv
);
...
...
source/libs/qcom/src/querymsg.c
浏览文件 @
408b1168
...
@@ -92,8 +92,8 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
...
@@ -92,8 +92,8 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
return
TSDB_CODE_TSC_VALUE_OUT_OF_RANGE
;
}
}
pRsp
->
vgVersion
=
hton
l
(
pRsp
->
vgVersion
);
pRsp
->
vgVersion
=
ntoh
l
(
pRsp
->
vgVersion
);
pRsp
->
vgNum
=
hton
l
(
pRsp
->
vgNum
);
pRsp
->
vgNum
=
ntoh
l
(
pRsp
->
vgNum
);
if
(
pRsp
->
vgNum
<
0
)
{
if
(
pRsp
->
vgNum
<
0
)
{
qError
(
"invalid db[%s] vgroup number[%d]"
,
pRsp
->
db
,
pRsp
->
vgNum
);
qError
(
"invalid db[%s] vgroup number[%d]"
,
pRsp
->
db
,
pRsp
->
vgNum
);
...
@@ -115,12 +115,12 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
...
@@ -115,12 +115,12 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
}
}
for
(
int32_t
i
=
0
;
i
<
pRsp
->
vgNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pRsp
->
vgNum
;
++
i
)
{
pRsp
->
vgroupInfo
[
i
].
vgId
=
hton
l
(
pRsp
->
vgroupInfo
[
i
].
vgId
);
pRsp
->
vgroupInfo
[
i
].
vgId
=
ntoh
l
(
pRsp
->
vgroupInfo
[
i
].
vgId
);
pRsp
->
vgroupInfo
[
i
].
hashBegin
=
hton
l
(
pRsp
->
vgroupInfo
[
i
].
hashBegin
);
pRsp
->
vgroupInfo
[
i
].
hashBegin
=
ntoh
l
(
pRsp
->
vgroupInfo
[
i
].
hashBegin
);
pRsp
->
vgroupInfo
[
i
].
hashEnd
=
hton
l
(
pRsp
->
vgroupInfo
[
i
].
hashEnd
);
pRsp
->
vgroupInfo
[
i
].
hashEnd
=
ntoh
l
(
pRsp
->
vgroupInfo
[
i
].
hashEnd
);
for
(
int32_t
n
=
0
;
n
<
pRsp
->
vgroupInfo
[
i
].
numOfEps
;
++
n
)
{
for
(
int32_t
n
=
0
;
n
<
pRsp
->
vgroupInfo
[
i
].
numOfEps
;
++
n
)
{
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
=
hton
s
(
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
);
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
=
ntoh
s
(
pRsp
->
vgroupInfo
[
i
].
epAddr
[
n
].
port
);
}
}
if
(
0
!=
taosHashPut
(
pOut
->
dbVgroup
.
vgInfo
,
&
pRsp
->
vgroupInfo
[
i
].
vgId
,
sizeof
(
pRsp
->
vgroupInfo
[
i
].
vgId
),
&
pRsp
->
vgroupInfo
[
i
],
sizeof
(
pRsp
->
vgroupInfo
[
i
])))
{
if
(
0
!=
taosHashPut
(
pOut
->
dbVgroup
.
vgInfo
,
&
pRsp
->
vgroupInfo
[
i
].
vgId
,
sizeof
(
pRsp
->
vgroupInfo
[
i
].
vgId
),
&
pRsp
->
vgroupInfo
[
i
],
sizeof
(
pRsp
->
vgroupInfo
[
i
])))
{
...
@@ -142,13 +142,13 @@ _return:
...
@@ -142,13 +142,13 @@ _return:
}
}
static
int32_t
queryConvertTableMetaMsg
(
STableMetaMsg
*
pMetaMsg
)
{
static
int32_t
queryConvertTableMetaMsg
(
STableMetaMsg
*
pMetaMsg
)
{
pMetaMsg
->
numOfTags
=
hton
l
(
pMetaMsg
->
numOfTags
);
pMetaMsg
->
numOfTags
=
ntoh
l
(
pMetaMsg
->
numOfTags
);
pMetaMsg
->
numOfColumns
=
hton
l
(
pMetaMsg
->
numOfColumns
);
pMetaMsg
->
numOfColumns
=
ntoh
l
(
pMetaMsg
->
numOfColumns
);
pMetaMsg
->
sversion
=
hton
l
(
pMetaMsg
->
sversion
);
pMetaMsg
->
sversion
=
ntoh
l
(
pMetaMsg
->
sversion
);
pMetaMsg
->
tversion
=
hton
l
(
pMetaMsg
->
tversion
);
pMetaMsg
->
tversion
=
ntoh
l
(
pMetaMsg
->
tversion
);
pMetaMsg
->
tuid
=
htobe64
(
pMetaMsg
->
tuid
);
pMetaMsg
->
tuid
=
htobe64
(
pMetaMsg
->
tuid
);
pMetaMsg
->
suid
=
htobe64
(
pMetaMsg
->
suid
);
pMetaMsg
->
suid
=
htobe64
(
pMetaMsg
->
suid
);
pMetaMsg
->
vgId
=
hton
l
(
pMetaMsg
->
vgId
);
pMetaMsg
->
vgId
=
ntoh
l
(
pMetaMsg
->
vgId
);
if
(
pMetaMsg
->
numOfTags
<
0
||
pMetaMsg
->
numOfTags
>
TSDB_MAX_TAGS
)
{
if
(
pMetaMsg
->
numOfTags
<
0
||
pMetaMsg
->
numOfTags
>
TSDB_MAX_TAGS
)
{
qError
(
"invalid numOfTags[%d] in table meta rsp msg"
,
pMetaMsg
->
numOfTags
);
qError
(
"invalid numOfTags[%d] in table meta rsp msg"
,
pMetaMsg
->
numOfTags
);
...
@@ -179,8 +179,8 @@ static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) {
...
@@ -179,8 +179,8 @@ static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) {
int32_t
numOfTotalCols
=
pMetaMsg
->
numOfColumns
+
pMetaMsg
->
numOfTags
;
int32_t
numOfTotalCols
=
pMetaMsg
->
numOfColumns
+
pMetaMsg
->
numOfTags
;
for
(
int
i
=
0
;
i
<
numOfTotalCols
;
++
i
)
{
for
(
int
i
=
0
;
i
<
numOfTotalCols
;
++
i
)
{
pSchema
->
bytes
=
hton
l
(
pSchema
->
bytes
);
pSchema
->
bytes
=
ntoh
l
(
pSchema
->
bytes
);
pSchema
->
colId
=
hton
l
(
pSchema
->
colId
);
pSchema
->
colId
=
ntoh
l
(
pSchema
->
colId
);
pSchema
++
;
pSchema
++
;
}
}
...
@@ -203,6 +203,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl
...
@@ -203,6 +203,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
pTableMeta
->
vgId
=
isSuperTable
?
0
:
msg
->
vgId
;
pTableMeta
->
tableType
=
isSuperTable
?
TSDB_SUPER_TABLE
:
msg
->
tableType
;
pTableMeta
->
tableType
=
isSuperTable
?
TSDB_SUPER_TABLE
:
msg
->
tableType
;
pTableMeta
->
uid
=
msg
->
suid
;
pTableMeta
->
uid
=
msg
->
suid
;
pTableMeta
->
suid
=
msg
->
suid
;
pTableMeta
->
suid
=
msg
->
suid
;
...
@@ -213,12 +214,12 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl
...
@@ -213,12 +214,12 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl
pTableMeta
->
tableInfo
.
precision
=
msg
->
precision
;
pTableMeta
->
tableInfo
.
precision
=
msg
->
precision
;
pTableMeta
->
tableInfo
.
numOfColumns
=
msg
->
numOfColumns
;
pTableMeta
->
tableInfo
.
numOfColumns
=
msg
->
numOfColumns
;
memcpy
(
pTableMeta
->
schema
,
msg
->
pSchema
,
sizeof
(
SSchema
)
*
total
);
for
(
int32_t
i
=
0
;
i
<
msg
->
numOfColumns
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
msg
->
numOfColumns
;
++
i
)
{
pTableMeta
->
tableInfo
.
rowSize
+=
pTableMeta
->
schema
[
i
].
bytes
;
pTableMeta
->
tableInfo
.
rowSize
+=
pTableMeta
->
schema
[
i
].
bytes
;
}
}
memcpy
(
pTableMeta
->
schema
,
msg
->
pSchema
,
sizeof
(
SSchema
)
*
total
);
*
pMeta
=
pTableMeta
;
*
pMeta
=
pTableMeta
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/qworker/CMakeLists.txt
浏览文件 @
408b1168
...
@@ -10,3 +10,5 @@ target_link_libraries(
...
@@ -10,3 +10,5 @@ target_link_libraries(
qworker
qworker
PRIVATE os util transport planner qcom
PRIVATE os util transport planner qcom
)
)
ADD_SUBDIRECTORY
(
test
)
\ No newline at end of file
source/libs/qworker/src/qworker.c
浏览文件 @
408b1168
...
@@ -944,6 +944,11 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...
@@ -944,6 +944,11 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
msg
->
schedulerId
=
htobe64
(
msg
->
schedulerId
);
msg
->
queryId
=
htobe64
(
msg
->
queryId
);
msg
->
taskId
=
htobe64
(
msg
->
taskId
);
msg
->
contentLen
=
ntohl
(
msg
->
contentLen
);
bool
queryDone
=
false
;
bool
queryDone
=
false
;
bool
queryRsp
=
false
;
bool
queryRsp
=
false
;
bool
needStop
=
false
;
bool
needStop
=
false
;
...
...
source/libs/qworker/test/CMakeLists.txt
0 → 100644
浏览文件 @
408b1168
MESSAGE
(
STATUS
"build qworker unit test"
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
ADD_EXECUTABLE
(
qworkerTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
qworkerTest
PUBLIC os util common transport gtest qcom planner qworker
)
TARGET_INCLUDE_DIRECTORIES
(
qworkerTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/qworker/"
PRIVATE
"
${
CMAKE_SOURCE_DIR
}
/source/libs/qworker/inc"
)
source/libs/qworker/test/qworkerTests.cpp
0 → 100644
浏览文件 @
408b1168
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
#include "taos.h"
#include "tdef.h"
#include "tvariant.h"
#include "tep.h"
#include "trpc.h"
#include "planner.h"
#include "qworker.h"
#include "stub.h"
#include "addr_any.h"
namespace
{
int32_t
qwtStringToPlan
(
const
char
*
str
,
SSubplan
**
subplan
)
{
return
0
;
}
void
stubSetStringToPlan
()
{
static
Stub
stub
;
stub
.
set
(
qStringToSubplan
,
qwtStringToPlan
);
{
AddrAny
any
(
"libplanner.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^qStringToSubplan$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
qwtStringToPlan
);
}
}
}
}
TEST
(
testCase
,
normalCase
)
{
void
*
mgmt
=
NULL
;
int32_t
code
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SRpcMsg
queryRpc
=
{
0
};
SRpcMsg
readyRpc
=
{
0
};
SRpcMsg
fetchRpc
=
{
0
};
SRpcMsg
dropRpc
=
{
0
};
SSubQueryMsg
*
queryMsg
=
(
SSubQueryMsg
*
)
calloc
(
1
,
sizeof
(
SSubQueryMsg
)
+
100
);
queryMsg
->
queryId
=
htobe64
(
1
);
queryMsg
->
schedulerId
=
htobe64
(
1
);
queryMsg
->
taskId
=
htobe64
(
1
);
queryMsg
->
contentLen
=
htonl
(
100
);
queryRpc
.
pCont
=
queryMsg
;
SResReadyMsg
readyMsg
=
{
0
};
readyMsg
.
schedulerId
=
htobe64
(
1
);
readyMsg
.
queryId
=
htobe64
(
1
);
readyMsg
.
taskId
=
htobe64
(
1
);
readyRpc
.
pCont
=
&
readyMsg
;
SResFetchMsg
fetchMsg
=
{
0
};
fetchMsg
.
schedulerId
=
htobe64
(
1
);
fetchMsg
.
queryId
=
htobe64
(
1
);
fetchMsg
.
taskId
=
htobe64
(
1
);
fetchRpc
.
pCont
=
&
fetchMsg
;
STaskDropMsg
dropMsg
=
{
0
};
dropMsg
.
schedulerId
=
htobe64
(
1
);
dropMsg
.
queryId
=
htobe64
(
1
);
dropMsg
.
taskId
=
htobe64
(
1
);
dropRpc
.
pCont
=
&
dropMsg
;
stubSetStringToPlan
();
code
=
qWorkerInit
(
NULL
,
&
mgmt
);
ASSERT_EQ
(
code
,
0
);
code
=
qWorkerProcessQueryMsg
(
mockPointer
,
mgmt
,
&
queryRpc
);
ASSERT_EQ
(
code
,
0
);
code
=
qWorkerProcessReadyMsg
(
mockPointer
,
mgmt
,
&
readyRpc
);
ASSERT_EQ
(
code
,
0
);
code
=
qWorkerProcessFetchMsg
(
mockPointer
,
mgmt
,
&
fetchRpc
);
ASSERT_EQ
(
code
,
0
);
code
=
qWorkerProcessDropMsg
(
mockPointer
,
mgmt
,
&
dropRpc
);
ASSERT_EQ
(
code
,
0
);
}
int
main
(
int
argc
,
char
**
argv
)
{
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
}
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
408b1168
...
@@ -43,7 +43,7 @@ typedef struct SSchedulerMgmt {
...
@@ -43,7 +43,7 @@ typedef struct SSchedulerMgmt {
SHashObj
*
jobs
;
// key: queryId, value: SQueryJob*
SHashObj
*
jobs
;
// key: queryId, value: SQueryJob*
}
SSchedulerMgmt
;
}
SSchedulerMgmt
;
typedef
struct
S
Query
Level
{
typedef
struct
S
Sch
Level
{
int32_t
level
;
int32_t
level
;
int8_t
status
;
int8_t
status
;
SRWLatch
lock
;
SRWLatch
lock
;
...
@@ -51,12 +51,12 @@ typedef struct SQueryLevel {
...
@@ -51,12 +51,12 @@ typedef struct SQueryLevel {
int32_t
taskSucceed
;
int32_t
taskSucceed
;
int32_t
taskNum
;
int32_t
taskNum
;
SArray
*
subTasks
;
// Element is SQueryTask
SArray
*
subTasks
;
// Element is SQueryTask
}
S
Query
Level
;
}
S
Sch
Level
;
typedef
struct
S
Query
Task
{
typedef
struct
S
Sch
Task
{
uint64_t
taskId
;
// task id
uint64_t
taskId
;
// task id
S
QueryLevel
*
level
;
// level
S
SchLevel
*
level
;
// level
SSubplan
*
plan
;
// subplan
SSubplan
*
plan
;
// subplan
char
*
msg
;
// operator tree
char
*
msg
;
// operator tree
int32_t
msgLen
;
// msg length
int32_t
msgLen
;
// msg length
...
@@ -66,13 +66,20 @@ typedef struct SQueryTask {
...
@@ -66,13 +66,20 @@ typedef struct SQueryTask {
int32_t
childReady
;
// child task ready number
int32_t
childReady
;
// child task ready number
SArray
*
children
;
// the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray
*
children
;
// the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray
*
parents
;
// the data destination tasks, get data from current task, element is SQueryTask*
SArray
*
parents
;
// the data destination tasks, get data from current task, element is SQueryTask*
}
S
Query
Task
;
}
S
Sch
Task
;
typedef
struct
SQueryJob
{
typedef
struct
SSchJobAttr
{
bool
needFetch
;
bool
syncSchedule
;
bool
queryJob
;
}
SSchJobAttr
;
typedef
struct
SSchJob
{
uint64_t
queryId
;
uint64_t
queryId
;
int32_t
levelNum
;
int32_t
levelNum
;
int32_t
levelIdx
;
int32_t
levelIdx
;
int8_t
status
;
int8_t
status
;
SSchJobAttr
attr
;
SQueryProfileSummary
summary
;
SQueryProfileSummary
summary
;
SEpSet
dataSrcEps
;
SEpSet
dataSrcEps
;
SEpAddr
resEp
;
SEpAddr
resEp
;
...
@@ -81,7 +88,11 @@ typedef struct SQueryJob {
...
@@ -81,7 +88,11 @@ typedef struct SQueryJob {
tsem_t
rspSem
;
tsem_t
rspSem
;
int32_t
userFetch
;
int32_t
userFetch
;
int32_t
remoteFetch
;
int32_t
remoteFetch
;
SSchTask
*
fetchTask
;
int32_t
errCode
;
void
*
res
;
void
*
res
;
int32_t
resNumOfRows
;
SHashObj
*
execTasks
;
// executing tasks, key:taskid, value:SQueryTask*
SHashObj
*
execTasks
;
// executing tasks, key:taskid, value:SQueryTask*
SHashObj
*
succTasks
;
// succeed tasks, key:taskid, value:SQueryTask*
SHashObj
*
succTasks
;
// succeed tasks, key:taskid, value:SQueryTask*
...
@@ -89,7 +100,7 @@ typedef struct SQueryJob {
...
@@ -89,7 +100,7 @@ typedef struct SQueryJob {
SArray
*
levels
;
// Element is SQueryLevel, starting from 0.
SArray
*
levels
;
// Element is SQueryLevel, starting from 0.
SArray
*
subPlans
;
// Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
SArray
*
subPlans
;
// Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
}
S
Query
Job
;
}
S
Sch
Job
;
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE
#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE
...
@@ -108,7 +119,7 @@ typedef struct SQueryJob {
...
@@ -108,7 +119,7 @@ typedef struct SQueryJob {
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
extern
int32_t
schLaunchTask
(
S
QueryJob
*
job
,
SQuery
Task
*
task
);
extern
int32_t
schLaunchTask
(
S
SchJob
*
job
,
SSch
Task
*
task
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
408b1168
...
@@ -51,12 +51,12 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_
...
@@ -51,12 +51,12 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_
*/
*/
}
}
int32_t
schBuildTaskRalation
(
S
Query
Job
*
job
,
SHashObj
*
planToTask
)
{
int32_t
schBuildTaskRalation
(
S
Sch
Job
*
job
,
SHashObj
*
planToTask
)
{
for
(
int32_t
i
=
0
;
i
<
job
->
levelNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
job
->
levelNum
;
++
i
)
{
S
Query
Level
*
level
=
taosArrayGet
(
job
->
levels
,
i
);
S
Sch
Level
*
level
=
taosArrayGet
(
job
->
levels
,
i
);
for
(
int32_t
m
=
0
;
m
<
level
->
taskNum
;
++
m
)
{
for
(
int32_t
m
=
0
;
m
<
level
->
taskNum
;
++
m
)
{
S
Query
Task
*
task
=
taosArrayGet
(
level
->
subTasks
,
m
);
S
Sch
Task
*
task
=
taosArrayGet
(
level
->
subTasks
,
m
);
SSubplan
*
plan
=
task
->
plan
;
SSubplan
*
plan
=
task
->
plan
;
int32_t
childNum
=
plan
->
pChildern
?
(
int32_t
)
taosArrayGetSize
(
plan
->
pChildern
)
:
0
;
int32_t
childNum
=
plan
->
pChildern
?
(
int32_t
)
taosArrayGetSize
(
plan
->
pChildern
)
:
0
;
int32_t
parentNum
=
plan
->
pParents
?
(
int32_t
)
taosArrayGetSize
(
plan
->
pParents
)
:
0
;
int32_t
parentNum
=
plan
->
pParents
?
(
int32_t
)
taosArrayGetSize
(
plan
->
pParents
)
:
0
;
...
@@ -70,14 +70,14 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
...
@@ -70,14 +70,14 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
}
}
for
(
int32_t
n
=
0
;
n
<
childNum
;
++
n
)
{
for
(
int32_t
n
=
0
;
n
<
childNum
;
++
n
)
{
SSubplan
*
child
=
taosArrayGet
(
plan
->
pChildern
,
n
);
SSubplan
*
*
child
=
taosArrayGet
(
plan
->
pChildern
,
n
);
S
QueryTask
*
childTask
=
taosHashGet
(
planToTask
,
&
child
,
POINTER_BYTES
);
S
SchTask
**
childTask
=
taosHashGet
(
planToTask
,
child
,
POINTER_BYTES
);
if
(
childTask
)
{
if
(
NULL
==
childTask
||
NULL
==
*
childTask
)
{
qError
(
"subplan relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
qError
(
"subplan relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
}
if
(
NULL
==
taosArrayPush
(
task
->
children
,
&
childTask
))
{
if
(
NULL
==
taosArrayPush
(
task
->
children
,
childTask
))
{
qError
(
"taosArrayPush failed"
);
qError
(
"taosArrayPush failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
...
@@ -92,14 +92,14 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
...
@@ -92,14 +92,14 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
}
}
for
(
int32_t
n
=
0
;
n
<
parentNum
;
++
n
)
{
for
(
int32_t
n
=
0
;
n
<
parentNum
;
++
n
)
{
SSubplan
*
parent
=
taosArrayGet
(
plan
->
pParents
,
n
);
SSubplan
*
*
parent
=
taosArrayGet
(
plan
->
pParents
,
n
);
S
QueryTask
*
parentTask
=
taosHashGet
(
planToTask
,
&
parent
,
POINTER_BYTES
);
S
SchTask
**
parentTask
=
taosHashGet
(
planToTask
,
parent
,
POINTER_BYTES
);
if
(
parentTask
)
{
if
(
NULL
==
parentTask
||
NULL
==
*
parentTask
)
{
qError
(
"subplan relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
qError
(
"subplan relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
}
if
(
NULL
==
taosArrayPush
(
task
->
parents
,
&
parentTask
))
{
if
(
NULL
==
taosArrayPush
(
task
->
parents
,
parentTask
))
{
qError
(
"taosArrayPush failed"
);
qError
(
"taosArrayPush failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
...
@@ -107,13 +107,13 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
...
@@ -107,13 +107,13 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
}
}
}
}
S
Query
Level
*
level
=
taosArrayGet
(
job
->
levels
,
0
);
S
Sch
Level
*
level
=
taosArrayGet
(
job
->
levels
,
0
);
if
(
level
->
taskNum
>
1
)
{
if
(
job
->
attr
.
queryJob
&&
level
->
taskNum
>
1
)
{
qError
(
"invalid plan info, level 0, taskNum:%d"
,
level
->
taskNum
);
qError
(
"invalid plan info, level 0, taskNum:%d"
,
level
->
taskNum
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
}
S
Query
Task
*
task
=
taosArrayGet
(
level
->
subTasks
,
0
);
S
Sch
Task
*
task
=
taosArrayGet
(
level
->
subTasks
,
0
);
if
(
task
->
parents
&&
taosArrayGetSize
(
task
->
parents
)
>
0
)
{
if
(
task
->
parents
&&
taosArrayGetSize
(
task
->
parents
)
>
0
)
{
qError
(
"invalid plan info, level 0, parentNum:%d"
,
(
int32_t
)
taosArrayGetSize
(
task
->
parents
));
qError
(
"invalid plan info, level 0, parentNum:%d"
,
(
int32_t
)
taosArrayGetSize
(
task
->
parents
));
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
...
@@ -124,7 +124,7 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
...
@@ -124,7 +124,7 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
}
}
int32_t
schValidateAndBuildJob
(
SQueryDag
*
dag
,
S
Query
Job
*
job
)
{
int32_t
schValidateAndBuildJob
(
SQueryDag
*
dag
,
S
Sch
Job
*
job
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
job
->
queryId
=
dag
->
queryId
;
job
->
queryId
=
dag
->
queryId
;
...
@@ -146,21 +146,23 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
...
@@ -146,21 +146,23 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
job
->
levels
=
taosArrayInit
(
levelNum
,
sizeof
(
S
Query
Level
));
job
->
levels
=
taosArrayInit
(
levelNum
,
sizeof
(
S
Sch
Level
));
if
(
NULL
==
job
->
levels
)
{
if
(
NULL
==
job
->
levels
)
{
qError
(
"taosArrayInit %d failed"
,
levelNum
);
qError
(
"taosArrayInit %d failed"
,
levelNum
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
job
->
attr
.
needFetch
=
true
;
job
->
levelNum
=
levelNum
;
job
->
levelNum
=
levelNum
;
job
->
levelIdx
=
levelNum
-
1
;
job
->
levelIdx
=
levelNum
-
1
;
job
->
subPlans
=
dag
->
pSubplans
;
job
->
subPlans
=
dag
->
pSubplans
;
S
Query
Level
level
=
{
0
};
S
Sch
Level
level
=
{
0
};
SArray
*
levelPlans
=
NULL
;
SArray
*
levelPlans
=
NULL
;
int32_t
levelPlanNum
=
0
;
int32_t
levelPlanNum
=
0
;
S
Query
Level
*
pLevel
=
NULL
;
S
Sch
Level
*
pLevel
=
NULL
;
level
.
status
=
JOB_TASK_STATUS_NOT_START
;
level
.
status
=
JOB_TASK_STATUS_NOT_START
;
...
@@ -187,7 +189,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
...
@@ -187,7 +189,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
pLevel
->
taskNum
=
levelPlanNum
;
pLevel
->
taskNum
=
levelPlanNum
;
pLevel
->
subTasks
=
taosArrayInit
(
levelPlanNum
,
sizeof
(
S
Query
Task
));
pLevel
->
subTasks
=
taosArrayInit
(
levelPlanNum
,
sizeof
(
S
Sch
Task
));
if
(
NULL
==
pLevel
->
subTasks
)
{
if
(
NULL
==
pLevel
->
subTasks
)
{
qError
(
"taosArrayInit %d failed"
,
levelPlanNum
);
qError
(
"taosArrayInit %d failed"
,
levelPlanNum
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
@@ -195,7 +197,14 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
...
@@ -195,7 +197,14 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
for
(
int32_t
n
=
0
;
n
<
levelPlanNum
;
++
n
)
{
for
(
int32_t
n
=
0
;
n
<
levelPlanNum
;
++
n
)
{
SSubplan
*
plan
=
taosArrayGet
(
levelPlans
,
n
);
SSubplan
*
plan
=
taosArrayGet
(
levelPlans
,
n
);
SQueryTask
task
=
{
0
};
SSchTask
task
=
{
0
};
if
(
plan
->
type
==
QUERY_TYPE_MODIFY
)
{
job
->
attr
.
needFetch
=
false
;
}
else
{
job
->
attr
.
queryJob
=
true
;
}
task
.
taskId
=
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
task
.
taskId
=
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
task
.
plan
=
plan
;
task
.
plan
=
plan
;
...
@@ -236,7 +245,7 @@ _return:
...
@@ -236,7 +245,7 @@ _return:
SCH_RET
(
code
);
SCH_RET
(
code
);
}
}
int32_t
schSetTaskExecEpSet
(
S
Query
Job
*
job
,
SEpSet
*
epSet
)
{
int32_t
schSetTaskExecEpSet
(
S
Sch
Job
*
job
,
SEpSet
*
epSet
)
{
if
(
epSet
->
numOfEps
>=
SCH_MAX_CONDIDATE_EP_NUM
)
{
if
(
epSet
->
numOfEps
>=
SCH_MAX_CONDIDATE_EP_NUM
)
{
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -263,7 +272,7 @@ int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) {
...
@@ -263,7 +272,7 @@ int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) {
}
}
int32_t
schPushTaskToExecList
(
S
QueryJob
*
job
,
SQuery
Task
*
task
)
{
int32_t
schPushTaskToExecList
(
S
SchJob
*
job
,
SSch
Task
*
task
)
{
if
(
0
!=
taosHashPut
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
&
task
,
POINTER_BYTES
))
{
if
(
0
!=
taosHashPut
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
&
task
,
POINTER_BYTES
))
{
qError
(
"taosHashPut failed"
);
qError
(
"taosHashPut failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
@@ -272,7 +281,7 @@ int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) {
...
@@ -272,7 +281,7 @@ int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
schMoveTaskToSuccList
(
S
QueryJob
*
job
,
SQuery
Task
*
task
,
bool
*
moved
)
{
int32_t
schMoveTaskToSuccList
(
S
SchJob
*
job
,
SSch
Task
*
task
,
bool
*
moved
)
{
if
(
0
!=
taosHashRemove
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
)))
{
if
(
0
!=
taosHashRemove
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
)))
{
qWarn
(
"remove task[%"
PRIx64
"] from execTasks failed"
,
task
->
taskId
);
qWarn
(
"remove task[%"
PRIx64
"] from execTasks failed"
,
task
->
taskId
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -288,10 +297,9 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) {
...
@@ -288,10 +297,9 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
schMoveTaskToFailList
(
S
QueryJob
*
job
,
SQuery
Task
*
task
,
bool
*
moved
)
{
int32_t
schMoveTaskToFailList
(
S
SchJob
*
job
,
SSch
Task
*
task
,
bool
*
moved
)
{
if
(
0
!=
taosHashRemove
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
)))
{
if
(
0
!=
taosHashRemove
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
)))
{
qWarn
(
"remove task[%"
PRIx64
"] from execTasks failed"
,
task
->
taskId
);
qWarn
(
"remove task[%"
PRIx64
"] from execTasks failed, it may not exist"
,
task
->
taskId
);
return
TSDB_CODE_SUCCESS
;
}
}
if
(
0
!=
taosHashPut
(
job
->
failTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
&
task
,
POINTER_BYTES
))
{
if
(
0
!=
taosHashPut
(
job
->
failTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
&
task
,
POINTER_BYTES
))
{
...
@@ -305,7 +313,7 @@ int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) {
...
@@ -305,7 +313,7 @@ int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) {
}
}
int32_t
schAsyncSendMsg
(
S
QueryJob
*
job
,
SQuery
Task
*
task
,
int32_t
msgType
)
{
int32_t
schAsyncSendMsg
(
S
SchJob
*
job
,
SSch
Task
*
task
,
int32_t
msgType
)
{
int32_t
msgSize
=
0
;
int32_t
msgSize
=
0
;
void
*
msg
=
NULL
;
void
*
msg
=
NULL
;
...
@@ -357,6 +365,9 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
...
@@ -357,6 +365,9 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
break
;
break
;
}
}
case
TDMT_VND_FETCH
:
{
case
TDMT_VND_FETCH
:
{
if
(
NULL
==
task
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
msgSize
=
sizeof
(
SResFetchMsg
);
msgSize
=
sizeof
(
SResFetchMsg
);
msg
=
calloc
(
1
,
msgSize
);
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
if
(
NULL
==
msg
)
{
...
@@ -395,7 +406,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
...
@@ -395,7 +406,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
schTaskCheckAndSetRetry
(
S
QueryJob
*
job
,
SQuery
Task
*
task
,
int32_t
errCode
,
bool
*
needRetry
)
{
int32_t
schTaskCheckAndSetRetry
(
S
SchJob
*
job
,
SSch
Task
*
task
,
int32_t
errCode
,
bool
*
needRetry
)
{
// TODO set retry or not based on task type/errCode/retry times/job status/available eps...
// TODO set retry or not based on task type/errCode/retry times/job status/available eps...
// TODO if needRetry, set task retry info
// TODO if needRetry, set task retry info
...
@@ -405,7 +416,7 @@ int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCod
...
@@ -405,7 +416,7 @@ int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCod
}
}
int32_t
schFetchFromRemote
(
S
Query
Job
*
job
)
{
int32_t
schFetchFromRemote
(
S
Sch
Job
*
job
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
atomic_val_compare_exchange_32
(
&
job
->
remoteFetch
,
0
,
1
)
!=
0
)
{
if
(
atomic_val_compare_exchange_32
(
&
job
->
remoteFetch
,
0
,
1
)
!=
0
)
{
...
@@ -413,7 +424,7 @@ int32_t schFetchFromRemote(SQueryJob *job) {
...
@@ -413,7 +424,7 @@ int32_t schFetchFromRemote(SQueryJob *job) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
SCH_ERR_JRET
(
schAsyncSendMsg
(
job
,
NULL
,
TDMT_VND_FETCH
));
SCH_ERR_JRET
(
schAsyncSendMsg
(
job
,
job
->
fetchTask
,
TDMT_VND_FETCH
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -424,8 +435,12 @@ _return:
...
@@ -424,8 +435,12 @@ _return:
}
}
int32_t
schProcessOnJobSuccess
(
SQueryJob
*
job
)
{
int32_t
schProcessOnJobPartialSuccess
(
SSchJob
*
job
)
{
job
->
status
=
JOB_TASK_STATUS_SUCCEED
;
job
->
status
=
JOB_TASK_STATUS_PARTIAL_SUCCEED
;
if
((
!
job
->
attr
.
needFetch
)
&&
job
->
attr
.
syncSchedule
)
{
tsem_post
(
&
job
->
rspSem
);
}
if
(
job
->
userFetch
)
{
if
(
job
->
userFetch
)
{
SCH_ERR_RET
(
schFetchFromRemote
(
job
));
SCH_ERR_RET
(
schFetchFromRemote
(
job
));
...
@@ -434,26 +449,27 @@ int32_t schProcessOnJobSuccess(SQueryJob *job) {
...
@@ -434,26 +449,27 @@ int32_t schProcessOnJobSuccess(SQueryJob *job) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
schProcessOnJobFailure
(
S
QueryJob
*
job
)
{
int32_t
schProcessOnJobFailure
(
S
SchJob
*
job
,
int32_t
errCode
)
{
job
->
status
=
JOB_TASK_STATUS_FAILED
;
job
->
status
=
JOB_TASK_STATUS_FAILED
;
job
->
errCode
=
errCode
;
atomic_val_compare_exchange_32
(
&
job
->
remoteFetch
,
1
,
0
);
atomic_val_compare_exchange_32
(
&
job
->
remoteFetch
,
1
,
0
);
if
(
job
->
userFetch
)
{
if
(
job
->
userFetch
||
((
!
job
->
attr
.
needFetch
)
&&
job
->
attr
.
syncSchedule
)
)
{
tsem_post
(
&
job
->
rspSem
);
tsem_post
(
&
job
->
rspSem
);
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
schProcessOnDataFetched
(
S
Query
Job
*
job
)
{
int32_t
schProcessOnDataFetched
(
S
Sch
Job
*
job
)
{
atomic_val_compare_exchange_32
(
&
job
->
remoteFetch
,
1
,
0
);
atomic_val_compare_exchange_32
(
&
job
->
remoteFetch
,
1
,
0
);
tsem_post
(
&
job
->
rspSem
);
tsem_post
(
&
job
->
rspSem
);
}
}
int32_t
schProcessOnTaskSuccess
(
S
QueryJob
*
job
,
SQuery
Task
*
task
)
{
int32_t
schProcessOnTaskSuccess
(
S
SchJob
*
job
,
SSch
Task
*
task
)
{
bool
moved
=
false
;
bool
moved
=
false
;
SCH_ERR_RET
(
schMoveTaskToSuccList
(
job
,
task
,
&
moved
));
SCH_ERR_RET
(
schMoveTaskToSuccList
(
job
,
task
,
&
moved
));
...
@@ -464,7 +480,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
...
@@ -464,7 +480,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
task
->
status
=
JOB_TASK_STATUS_SUCCEED
;
task
->
status
=
JOB_TASK_STATUS_SUCCEED
;
int32_t
parentNum
=
(
int32_t
)
taosArrayGetSize
(
task
->
parents
)
;
int32_t
parentNum
=
task
->
parents
?
(
int32_t
)
taosArrayGetSize
(
task
->
parents
)
:
0
;
if
(
parentNum
==
0
)
{
if
(
parentNum
==
0
)
{
if
(
task
->
plan
->
level
!=
0
)
{
if
(
task
->
plan
->
level
!=
0
)
{
qError
(
"level error"
);
qError
(
"level error"
);
...
@@ -475,7 +491,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
...
@@ -475,7 +491,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
if
(
SCH_TASK_NEED_WAIT_ALL
(
task
))
{
if
(
SCH_TASK_NEED_WAIT_ALL
(
task
))
{
SCH_LOCK
(
SCH_WRITE
,
&
task
->
level
->
lock
);
SCH_LOCK
(
SCH_WRITE
,
&
task
->
level
->
lock
);
task
->
level
->
task
Fail
ed
++
;
task
->
level
->
task
Succe
ed
++
;
taskDone
=
task
->
level
->
taskSucceed
+
task
->
level
->
taskFailed
;
taskDone
=
task
->
level
->
taskSucceed
+
task
->
level
->
taskFailed
;
SCH_UNLOCK
(
SCH_WRITE
,
&
task
->
level
->
lock
);
SCH_UNLOCK
(
SCH_WRITE
,
&
task
->
level
->
lock
);
...
@@ -486,7 +502,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
...
@@ -486,7 +502,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
if
(
task
->
level
->
taskFailed
>
0
)
{
if
(
task
->
level
->
taskFailed
>
0
)
{
job
->
status
=
JOB_TASK_STATUS_FAILED
;
job
->
status
=
JOB_TASK_STATUS_FAILED
;
SCH_ERR_RET
(
schProcessOnJobFailure
(
job
));
SCH_ERR_RET
(
schProcessOnJobFailure
(
job
,
TSDB_CODE_QRY_APP_ERROR
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -495,7 +511,9 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
...
@@ -495,7 +511,9 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
job
->
resEp
.
port
=
task
->
execAddr
.
port
;
job
->
resEp
.
port
=
task
->
execAddr
.
port
;
}
}
SCH_ERR_RET
(
schProcessOnJobSuccess
(
job
));
job
->
fetchTask
=
task
;
SCH_ERR_RET
(
schProcessOnJobPartialSuccess
(
job
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -508,21 +526,21 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
...
@@ -508,21 +526,21 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
}
}
for
(
int32_t
i
=
0
;
i
<
parentNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
parentNum
;
++
i
)
{
S
QueryTask
*
par
=
taosArrayGet
(
task
->
parents
,
i
);
S
SchTask
*
par
=
*
(
SSchTask
**
)
taosArrayGet
(
task
->
parents
,
i
);
++
par
->
childReady
;
++
par
->
childReady
;
SCH_ERR_RET
(
qSetSubplanExecutionNode
(
par
->
plan
,
task
->
plan
->
id
.
templateId
,
&
task
->
execAddr
));
SCH_ERR_RET
(
qSetSubplanExecutionNode
(
par
->
plan
,
task
->
plan
->
id
.
templateId
,
&
task
->
execAddr
));
if
(
SCH_TASK_READY_TO_LUNCH
(
par
))
{
if
(
SCH_TASK_READY_TO_LUNCH
(
par
))
{
SCH_ERR_RET
(
schLaunchTask
(
job
,
task
));
SCH_ERR_RET
(
schLaunchTask
(
job
,
par
));
}
}
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
schProcessOnTaskFailure
(
S
QueryJob
*
job
,
SQuery
Task
*
task
,
int32_t
errCode
)
{
int32_t
schProcessOnTaskFailure
(
S
SchJob
*
job
,
SSch
Task
*
task
,
int32_t
errCode
)
{
bool
needRetry
=
false
;
bool
needRetry
=
false
;
bool
moved
=
false
;
bool
moved
=
false
;
int32_t
taskDone
=
0
;
int32_t
taskDone
=
0
;
...
@@ -534,7 +552,6 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
...
@@ -534,7 +552,6 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
SCH_ERR_RET
(
schMoveTaskToFailList
(
job
,
task
,
&
moved
));
SCH_ERR_RET
(
schMoveTaskToFailList
(
job
,
task
,
&
moved
));
if
(
!
moved
)
{
if
(
!
moved
)
{
SCH_TASK_ERR_LOG
(
"task may already moved, status:%d"
,
task
->
status
);
SCH_TASK_ERR_LOG
(
"task may already moved, status:%d"
,
task
->
status
);
return
TSDB_CODE_SUCCESS
;
}
}
if
(
SCH_TASK_NEED_WAIT_ALL
(
task
))
{
if
(
SCH_TASK_NEED_WAIT_ALL
(
task
))
{
...
@@ -550,7 +567,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
...
@@ -550,7 +567,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
}
}
job
->
status
=
JOB_TASK_STATUS_FAILED
;
job
->
status
=
JOB_TASK_STATUS_FAILED
;
SCH_ERR_RET
(
schProcessOnJobFailure
(
job
));
SCH_ERR_RET
(
schProcessOnJobFailure
(
job
,
errCode
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -560,13 +577,29 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
...
@@ -560,13 +577,29 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
schHandleRspMsg
(
S
QueryJob
*
job
,
SQueryTask
*
task
,
int32_t
msgTyp
e
,
int32_t
rspCode
)
{
int32_t
schHandleRspMsg
(
S
SchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSiz
e
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
switch
(
msgType
)
{
switch
(
msgType
)
{
case
TDMT_VND_QUERY
:
case
TDMT_VND_SUBMIT
:
{
if
(
rspCode
!=
TSDB_CODE_SUCCESS
)
{
SShellSubmitRspMsg
*
rsp
=
(
SShellSubmitRspMsg
*
)
msg
;
SCH_ERR_JRET
(
schProcessOnTaskFailure
(
job
,
task
,
rspCode
));
if
(
rsp
->
code
!=
TSDB_CODE_SUCCESS
)
{
SCH_ERR_JRET
(
schProcessOnTaskFailure
(
job
,
task
,
rsp
->
code
));
}
else
{
job
->
resNumOfRows
+=
rsp
->
affectedRows
;
code
=
schProcessOnTaskSuccess
(
job
,
task
);
if
(
code
)
{
goto
_task_error
;
}
}
break
;
}
case
TDMT_VND_QUERY
:
{
SQueryTableRsp
*
rsp
=
(
SQueryTableRsp
*
)
msg
;
if
(
rsp
->
code
!=
TSDB_CODE_SUCCESS
)
{
SCH_ERR_JRET
(
schProcessOnTaskFailure
(
job
,
task
,
rsp
->
code
));
}
else
{
}
else
{
code
=
schAsyncSendMsg
(
job
,
task
,
TDMT_VND_RES_READY
);
code
=
schAsyncSendMsg
(
job
,
task
,
TDMT_VND_RES_READY
);
if
(
code
)
{
if
(
code
)
{
...
@@ -574,9 +607,12 @@ int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32
...
@@ -574,9 +607,12 @@ int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32
}
}
}
}
break
;
break
;
case
TDMT_VND_RES_READY
:
}
if
(
rspCode
!=
TSDB_CODE_SUCCESS
)
{
case
TDMT_VND_RES_READY
:
{
SCH_ERR_JRET
(
schProcessOnTaskFailure
(
job
,
task
,
rspCode
));
SResReadyRsp
*
rsp
=
(
SResReadyRsp
*
)
msg
;
if
(
rsp
->
code
!=
TSDB_CODE_SUCCESS
)
{
SCH_ERR_JRET
(
schProcessOnTaskFailure
(
job
,
task
,
rsp
->
code
));
}
else
{
}
else
{
code
=
schProcessOnTaskSuccess
(
job
,
task
);
code
=
schProcessOnTaskSuccess
(
job
,
task
);
if
(
code
)
{
if
(
code
)
{
...
@@ -584,10 +620,17 @@ int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32
...
@@ -584,10 +620,17 @@ int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32
}
}
}
}
break
;
break
;
case
TDMT_VND_FETCH
:
}
case
TDMT_VND_FETCH
:
{
SCH_ERR_JRET
(
rspCode
);
SCH_ERR_JRET
(
rspCode
);
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
msg
;
job
->
res
=
rsp
;
job
->
resNumOfRows
=
rsp
->
numOfRows
;
SCH_ERR_JRET
(
schProcessOnDataFetched
(
job
));
SCH_ERR_JRET
(
schProcessOnDataFetched
(
job
));
break
;
break
;
}
default:
default:
qError
(
"unknown msg type:%d received"
,
msgType
);
qError
(
"unknown msg type:%d received"
,
msgType
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
return
TSDB_CODE_QRY_INVALID_INPUT
;
...
@@ -600,14 +643,14 @@ _task_error:
...
@@ -600,14 +643,14 @@ _task_error:
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
_return:
_return:
code
=
schProcessOnJobFailure
(
job
);
code
=
schProcessOnJobFailure
(
job
,
code
);
return
code
;
return
code
;
}
}
int32_t
schLaunchTask
(
S
QueryJob
*
job
,
SQuery
Task
*
task
)
{
int32_t
schLaunchTask
(
S
SchJob
*
job
,
SSch
Task
*
task
)
{
SSubplan
*
plan
=
task
->
plan
;
SSubplan
*
plan
=
task
->
plan
;
SCH_ERR_RET
(
qSubPlanToString
(
plan
,
&
task
->
msg
,
&
task
->
msgLen
));
SCH_ERR_RET
(
qSubPlanToString
(
plan
,
&
task
->
msg
,
&
task
->
msgLen
));
if
(
plan
->
execEpSet
.
numOfEps
<=
0
)
{
if
(
plan
->
execEpSet
.
numOfEps
<=
0
)
{
...
@@ -630,10 +673,10 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) {
...
@@ -630,10 +673,10 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
schLaunchJob
(
S
Query
Job
*
job
)
{
int32_t
schLaunchJob
(
S
Sch
Job
*
job
)
{
S
Query
Level
*
level
=
taosArrayGet
(
job
->
levels
,
job
->
levelIdx
);
S
Sch
Level
*
level
=
taosArrayGet
(
job
->
levels
,
job
->
levelIdx
);
for
(
int32_t
i
=
0
;
i
<
level
->
taskNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
level
->
taskNum
;
++
i
)
{
S
Query
Task
*
task
=
taosArrayGet
(
level
->
subTasks
,
i
);
S
Sch
Task
*
task
=
taosArrayGet
(
level
->
subTasks
,
i
);
SCH_ERR_RET
(
schLaunchTask
(
job
,
task
));
SCH_ERR_RET
(
schLaunchTask
(
job
,
task
));
}
}
...
@@ -642,10 +685,10 @@ int32_t schLaunchJob(SQueryJob *job) {
...
@@ -642,10 +685,10 @@ int32_t schLaunchJob(SQueryJob *job) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
void
schDropJobAllTasks
(
S
Query
Job
*
job
)
{
void
schDropJobAllTasks
(
S
Sch
Job
*
job
)
{
void
*
pIter
=
taosHashIterate
(
job
->
succTasks
,
NULL
);
void
*
pIter
=
taosHashIterate
(
job
->
succTasks
,
NULL
);
while
(
pIter
)
{
while
(
pIter
)
{
S
QueryTask
*
task
=
*
(
SQuery
Task
**
)
pIter
;
S
SchTask
*
task
=
*
(
SSch
Task
**
)
pIter
;
schAsyncSendMsg
(
job
,
task
,
TDMT_VND_DROP_TASK
);
schAsyncSendMsg
(
job
,
task
,
TDMT_VND_DROP_TASK
);
...
@@ -654,7 +697,7 @@ void schDropJobAllTasks(SQueryJob *job) {
...
@@ -654,7 +697,7 @@ void schDropJobAllTasks(SQueryJob *job) {
pIter
=
taosHashIterate
(
job
->
failTasks
,
NULL
);
pIter
=
taosHashIterate
(
job
->
failTasks
,
NULL
);
while
(
pIter
)
{
while
(
pIter
)
{
S
QueryTask
*
task
=
*
(
SQuery
Task
**
)
pIter
;
S
SchTask
*
task
=
*
(
SSch
Task
**
)
pIter
;
schAsyncSendMsg
(
job
,
task
,
TDMT_VND_DROP_TASK
);
schAsyncSendMsg
(
job
,
task
,
TDMT_VND_DROP_TASK
);
...
@@ -680,7 +723,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
...
@@ -680,7 +723,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
}
}
int32_t
scheduleExecJob
(
void
*
transport
,
SArray
*
qnodeList
,
SQueryDag
*
pDag
,
void
**
pJob
)
{
int32_t
scheduleExecJob
Impl
(
void
*
transport
,
SArray
*
qnodeList
,
SQueryDag
*
pDag
,
void
**
pJob
,
bool
syncSchedule
)
{
if
(
NULL
==
transport
||
NULL
==
transport
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
)
{
if
(
NULL
==
transport
||
NULL
==
transport
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
...
@@ -690,11 +733,12 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi
...
@@ -690,11 +733,12 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi
}
}
int32_t
code
=
0
;
int32_t
code
=
0
;
S
QueryJob
*
job
=
calloc
(
1
,
sizeof
(
SQuery
Job
));
S
SchJob
*
job
=
calloc
(
1
,
sizeof
(
SSch
Job
));
if
(
NULL
==
job
)
{
if
(
NULL
==
job
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
job
->
attr
.
syncSchedule
=
syncSchedule
;
job
->
transport
=
transport
;
job
->
transport
=
transport
;
job
->
qnodeList
=
qnodeList
;
job
->
qnodeList
=
qnodeList
;
...
@@ -720,53 +764,103 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi
...
@@ -720,53 +764,103 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi
tsem_init
(
&
job
->
rspSem
,
0
,
0
);
tsem_init
(
&
job
->
rspSem
,
0
,
0
);
if
(
0
!=
taosHashPut
(
schMgmt
.
jobs
,
&
job
->
queryId
,
sizeof
(
job
->
queryId
),
&
job
,
POINTER_BYTES
))
{
code
=
taosHashPut
(
schMgmt
.
jobs
,
&
job
->
queryId
,
sizeof
(
job
->
queryId
),
&
job
,
POINTER_BYTES
);
if
(
0
!=
code
)
{
if
(
HASH_NODE_EXIST
(
code
))
{
qError
(
"taosHashPut queryId:%"
PRIx64
" already exist"
,
job
->
queryId
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
else
{
qError
(
"taosHashPut queryId:%"
PRIx64
" failed"
,
job
->
queryId
);
qError
(
"taosHashPut queryId:%"
PRIx64
" failed"
,
job
->
queryId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
}
}
job
->
status
=
JOB_TASK_STATUS_NOT_START
;
job
->
status
=
JOB_TASK_STATUS_NOT_START
;
SCH_ERR_JRET
(
schLaunchJob
(
job
));
SCH_ERR_JRET
(
schLaunchJob
(
job
));
*
(
SQueryJob
**
)
pJob
=
job
;
*
(
SSchJob
**
)
pJob
=
job
;
if
(
syncSchedule
)
{
tsem_wait
(
&
job
->
rspSem
);
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
_return:
_return:
*
(
S
Query
Job
**
)
pJob
=
NULL
;
*
(
S
Sch
Job
**
)
pJob
=
NULL
;
scheduleFreeJob
(
job
);
scheduleFreeJob
(
job
);
SCH_RET
(
code
);
SCH_RET
(
code
);
}
}
int32_t
scheduleExecJob
(
void
*
transport
,
SArray
*
qnodeList
,
SQueryDag
*
pDag
,
void
**
pJob
,
uint64_t
*
numOfRows
)
{
*
numOfRows
=
0
;
SCH_ERR_RET
(
scheduleExecJobImpl
(
transport
,
qnodeList
,
pDag
,
pJob
,
true
));
SSchJob
*
job
=
*
(
SSchJob
**
)
pJob
;
*
numOfRows
=
job
->
resNumOfRows
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
scheduleAsyncExecJob
(
void
*
transport
,
SArray
*
qnodeList
,
SQueryDag
*
pDag
,
void
**
pJob
)
{
return
scheduleExecJobImpl
(
transport
,
qnodeList
,
pDag
,
pJob
,
false
);
}
int32_t
scheduleFetchRows
(
void
*
pJob
,
void
**
data
)
{
int32_t
scheduleFetchRows
(
void
*
pJob
,
void
**
data
)
{
if
(
NULL
==
pJob
||
NULL
==
data
)
{
if
(
NULL
==
pJob
||
NULL
==
data
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
)
;
}
}
S
Query
Job
*
job
=
pJob
;
S
Sch
Job
*
job
=
pJob
;
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
!
job
->
attr
.
needFetch
)
{
qError
(
"no need to fetch data"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
job
->
status
==
JOB_TASK_STATUS_FAILED
)
{
job
->
res
=
NULL
;
SCH_RET
(
job
->
errCode
);
}
if
(
job
->
status
==
JOB_TASK_STATUS_SUCCEED
)
{
job
->
res
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
if
(
atomic_val_compare_exchange_32
(
&
job
->
userFetch
,
0
,
1
)
!=
0
)
{
if
(
atomic_val_compare_exchange_32
(
&
job
->
userFetch
,
0
,
1
)
!=
0
)
{
qError
(
"prior fetching not finished"
);
qError
(
"prior fetching not finished"
);
return
TSDB_CODE_QRY_APP_ERROR
;
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
)
;
}
}
if
(
job
->
status
==
JOB_TASK_STATUS_SUCCEED
)
{
if
(
job
->
status
==
JOB_TASK_STATUS_
PARTIAL_
SUCCEED
)
{
SCH_ERR_JRET
(
schFetchFromRemote
(
job
));
SCH_ERR_JRET
(
schFetchFromRemote
(
job
));
}
}
tsem_wait
(
&
job
->
rspSem
);
tsem_wait
(
&
job
->
rspSem
);
if
(
job
->
status
==
JOB_TASK_STATUS_FAILED
)
{
code
=
job
->
errCode
;
}
if
(
job
->
res
&&
((
SRetrieveTableRsp
*
)
job
->
res
)
->
completed
)
{
job
->
status
=
JOB_TASK_STATUS_SUCCEED
;
}
*
data
=
job
->
res
;
*
data
=
job
->
res
;
job
->
res
=
NULL
;
job
->
res
=
NULL
;
_return:
_return:
atomic_val_compare_exchange_32
(
&
job
->
userFetch
,
1
,
0
);
atomic_val_compare_exchange_32
(
&
job
->
userFetch
,
1
,
0
);
return
code
;
SCH_RET
(
code
)
;
}
}
int32_t
scheduleCancelJob
(
void
*
pJob
)
{
int32_t
scheduleCancelJob
(
void
*
pJob
)
{
...
@@ -782,7 +876,7 @@ void scheduleFreeJob(void *pJob) {
...
@@ -782,7 +876,7 @@ void scheduleFreeJob(void *pJob) {
return
;
return
;
}
}
S
Query
Job
*
job
=
pJob
;
S
Sch
Job
*
job
=
pJob
;
if
(
job
->
status
>
0
)
{
if
(
job
->
status
>
0
)
{
if
(
0
!=
taosHashRemove
(
schMgmt
.
jobs
,
&
job
->
queryId
,
sizeof
(
job
->
queryId
)))
{
if
(
0
!=
taosHashRemove
(
schMgmt
.
jobs
,
&
job
->
queryId
,
sizeof
(
job
->
queryId
)))
{
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
408b1168
...
@@ -30,10 +30,16 @@
...
@@ -30,10 +30,16 @@
#include "scheduler.h"
#include "scheduler.h"
#include "tep.h"
#include "tep.h"
#include "trpc.h"
#include "trpc.h"
#include "schedulerInt.h"
#include "stub.h"
#include "addr_any.h"
namespace
{
namespace
{
void
mockBuildDag
(
SQueryDag
*
dag
)
{
uint64_t
qId
=
0x111111111111
;
extern
"C"
int32_t
schHandleRspMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
);
void
schtBuildQueryDag
(
SQueryDag
*
dag
)
{
uint64_t
qId
=
0x0000000000000001
;
dag
->
queryId
=
qId
;
dag
->
queryId
=
qId
;
dag
->
numOfSubplans
=
2
;
dag
->
numOfSubplans
=
2
;
...
@@ -45,22 +51,26 @@ void mockBuildDag(SQueryDag *dag) {
...
@@ -45,22 +51,26 @@ void mockBuildDag(SQueryDag *dag) {
SSubplan
mergePlan
=
{
0
};
SSubplan
mergePlan
=
{
0
};
scanPlan
.
id
.
queryId
=
qId
;
scanPlan
.
id
.
queryId
=
qId
;
scanPlan
.
id
.
templateId
=
0x
2222222222
;
scanPlan
.
id
.
templateId
=
0x
0000000000000002
;
scanPlan
.
id
.
subplanId
=
0x
3333333333
;
scanPlan
.
id
.
subplanId
=
0x
0000000000000003
;
scanPlan
.
type
=
QUERY_TYPE_SCAN
;
scanPlan
.
type
=
QUERY_TYPE_SCAN
;
scanPlan
.
level
=
1
;
scanPlan
.
level
=
1
;
scanPlan
.
execEpSet
.
numOfEps
=
1
;
scanPlan
.
execEpSet
.
numOfEps
=
1
;
scanPlan
.
execEpSet
.
port
[
0
]
=
6030
;
strcpy
(
scanPlan
.
execEpSet
.
fqdn
[
0
],
"ep0"
);
scanPlan
.
pChildern
=
NULL
;
scanPlan
.
pChildern
=
NULL
;
scanPlan
.
pParents
=
taosArrayInit
(
1
,
POINTER_BYTES
);
scanPlan
.
pParents
=
taosArrayInit
(
1
,
POINTER_BYTES
);
scanPlan
.
pNode
=
(
SPhyNode
*
)
calloc
(
1
,
sizeof
(
SPhyNode
));
mergePlan
.
id
.
queryId
=
qId
;
mergePlan
.
id
.
queryId
=
qId
;
mergePlan
.
id
.
templateId
=
0x4444444444
;
mergePlan
.
id
.
templateId
=
0x4444444444
;
mergePlan
.
id
.
subplanId
=
0x5555555555
;
mergePlan
.
id
.
subplanId
=
0x5555555555
;
mergePlan
.
type
=
QUERY_TYPE_MERGE
;
mergePlan
.
type
=
QUERY_TYPE_MERGE
;
mergePlan
.
level
=
0
;
mergePlan
.
level
=
0
;
mergePlan
.
execEpSet
.
numOfEps
=
1
;
mergePlan
.
execEpSet
.
numOfEps
=
0
;
mergePlan
.
pChildern
=
taosArrayInit
(
1
,
POINTER_BYTES
);
mergePlan
.
pChildern
=
taosArrayInit
(
1
,
POINTER_BYTES
);
mergePlan
.
pParents
=
NULL
;
mergePlan
.
pParents
=
NULL
;
mergePlan
.
pNode
=
(
SPhyNode
*
)
calloc
(
1
,
sizeof
(
SPhyNode
));
SSubplan
*
mergePointer
=
(
SSubplan
*
)
taosArrayPush
(
merge
,
&
mergePlan
);
SSubplan
*
mergePointer
=
(
SSubplan
*
)
taosArrayPush
(
merge
,
&
mergePlan
);
SSubplan
*
scanPointer
=
(
SSubplan
*
)
taosArrayPush
(
scan
,
&
scanPlan
);
SSubplan
*
scanPointer
=
(
SSubplan
*
)
taosArrayPush
(
scan
,
&
scanPlan
);
...
@@ -72,9 +82,120 @@ void mockBuildDag(SQueryDag *dag) {
...
@@ -72,9 +82,120 @@ void mockBuildDag(SQueryDag *dag) {
taosArrayPush
(
dag
->
pSubplans
,
&
scan
);
taosArrayPush
(
dag
->
pSubplans
,
&
scan
);
}
}
void
schtBuildInsertDag
(
SQueryDag
*
dag
)
{
uint64_t
qId
=
0x0000000000000002
;
dag
->
queryId
=
qId
;
dag
->
numOfSubplans
=
2
;
dag
->
pSubplans
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SArray
*
inserta
=
taosArrayInit
(
dag
->
numOfSubplans
,
sizeof
(
SSubplan
));
SSubplan
insertPlan
[
2
]
=
{
0
};
insertPlan
[
0
].
id
.
queryId
=
qId
;
insertPlan
[
0
].
id
.
templateId
=
0x0000000000000003
;
insertPlan
[
0
].
id
.
subplanId
=
0x0000000000000004
;
insertPlan
[
0
].
type
=
QUERY_TYPE_MODIFY
;
insertPlan
[
0
].
level
=
0
;
insertPlan
[
0
].
execEpSet
.
numOfEps
=
1
;
insertPlan
[
0
].
execEpSet
.
port
[
0
]
=
6030
;
strcpy
(
insertPlan
[
0
].
execEpSet
.
fqdn
[
0
],
"ep0"
);
insertPlan
[
0
].
pChildern
=
NULL
;
insertPlan
[
0
].
pParents
=
NULL
;
insertPlan
[
0
].
pNode
=
NULL
;
insertPlan
[
0
].
pDataSink
=
(
SDataSink
*
)
calloc
(
1
,
sizeof
(
SDataSink
));
insertPlan
[
1
].
id
.
queryId
=
qId
;
insertPlan
[
1
].
id
.
templateId
=
0x0000000000000003
;
insertPlan
[
1
].
id
.
subplanId
=
0x0000000000000005
;
insertPlan
[
1
].
type
=
QUERY_TYPE_MODIFY
;
insertPlan
[
1
].
level
=
0
;
insertPlan
[
1
].
execEpSet
.
numOfEps
=
1
;
insertPlan
[
1
].
execEpSet
.
port
[
0
]
=
6030
;
strcpy
(
insertPlan
[
1
].
execEpSet
.
fqdn
[
0
],
"ep1"
);
insertPlan
[
1
].
pChildern
=
NULL
;
insertPlan
[
1
].
pParents
=
NULL
;
insertPlan
[
1
].
pNode
=
NULL
;
insertPlan
[
1
].
pDataSink
=
(
SDataSink
*
)
calloc
(
1
,
sizeof
(
SDataSink
));
taosArrayPush
(
inserta
,
&
insertPlan
[
0
]);
taosArrayPush
(
inserta
,
&
insertPlan
[
1
]);
taosArrayPush
(
dag
->
pSubplans
,
&
inserta
);
}
int32_t
schtPlanToString
(
const
SSubplan
*
subplan
,
char
**
str
,
int32_t
*
len
)
{
*
str
=
(
char
*
)
calloc
(
1
,
20
);
*
len
=
20
;
return
0
;
}
int32_t
schtExecNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SEpAddr
*
ep
)
{
return
0
;
}
void
schtSetPlanToString
()
{
static
Stub
stub
;
stub
.
set
(
qSubPlanToString
,
schtPlanToString
);
{
AddrAny
any
(
"libplanner.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^qSubPlanToString$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
schtPlanToString
);
}
}
}
void
schtSetExecNode
()
{
static
Stub
stub
;
stub
.
set
(
qSetSubplanExecutionNode
,
schtExecNode
);
{
AddrAny
any
(
"libplanner.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^qSetSubplanExecutionNode$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
schtExecNode
);
}
}
}
void
*
schtSendRsp
(
void
*
param
)
{
SSchJob
*
job
=
NULL
;
int32_t
code
=
0
;
while
(
true
)
{
job
=
*
(
SSchJob
**
)
param
;
if
(
job
)
{
break
;
}
usleep
(
1000
);
}
void
*
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SShellSubmitRspMsg
rsp
=
{
0
};
rsp
.
affectedRows
=
10
;
schHandleRspMsg
(
job
,
task
,
TDMT_VND_SUBMIT
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
return
NULL
;
}
void
*
pInsertJob
=
NULL
;
}
}
TEST
(
testCase
,
normalCase
)
{
TEST
(
queryTest
,
normalCase
)
{
void
*
mockPointer
=
(
void
*
)
0x1
;
void
*
mockPointer
=
(
void
*
)
0x1
;
char
*
clusterId
=
"cluster1"
;
char
*
clusterId
=
"cluster1"
;
char
*
dbname
=
"1.db1"
;
char
*
dbname
=
"1.db1"
;
...
@@ -84,16 +205,133 @@ TEST(testCase, normalCase) {
...
@@ -84,16 +205,133 @@ TEST(testCase, normalCase) {
SQueryDag
dag
=
{
0
};
SQueryDag
dag
=
{
0
};
SArray
*
qnodeList
=
taosArrayInit
(
1
,
sizeof
(
SEpAddr
));
SArray
*
qnodeList
=
taosArrayInit
(
1
,
sizeof
(
SEpAddr
));
SEpAddr
qnodeAddr
=
{
0
};
strcpy
(
qnodeAddr
.
fqdn
,
"qnode0.ep"
);
qnodeAddr
.
port
=
6031
;
taosArrayPush
(
qnodeList
,
&
qnodeAddr
);
int32_t
code
=
schedulerInit
(
NULL
);
int32_t
code
=
schedulerInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
code
,
0
);
mockBuildDag
(
&
dag
);
schtBuildQueryDag
(
&
dag
);
schtSetPlanToString
();
schtSetExecNode
();
code
=
scheduleAsyncExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
pJob
);
ASSERT_EQ
(
code
,
0
);
SSchJob
*
job
=
(
SSchJob
*
)
pJob
;
void
*
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SQueryTableRsp
rsp
=
{
0
};
code
=
schHandleRspMsg
(
job
,
task
,
TDMT_VND_QUERY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
code
=
scheduleExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
pJob
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SResReadyRsp
rsp
=
{
0
};
code
=
schHandleRspMsg
(
job
,
task
,
TDMT_VND_RES_READY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SQueryTableRsp
rsp
=
{
0
};
code
=
schHandleRspMsg
(
job
,
task
,
TDMT_VND_QUERY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
job
->
execTasks
,
NULL
);
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SResReadyRsp
rsp
=
{
0
};
code
=
schHandleRspMsg
(
job
,
task
,
TDMT_VND_RES_READY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
SRetrieveTableRsp
rsp
=
{
0
};
rsp
.
completed
=
1
;
rsp
.
numOfRows
=
10
;
code
=
schHandleRspMsg
(
job
,
NULL
,
TDMT_VND_FETCH
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
void
*
data
=
NULL
;
code
=
scheduleFetchRows
(
job
,
&
data
);
ASSERT_EQ
(
code
,
0
);
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
data
;
ASSERT_EQ
(
pRsp
->
completed
,
1
);
ASSERT_EQ
(
pRsp
->
numOfRows
,
10
);
data
=
NULL
;
code
=
scheduleFetchRows
(
job
,
&
data
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
data
,
(
void
*
)
NULL
);
scheduleFreeJob
(
pJob
);
}
}
TEST
(
insertTest
,
normalCase
)
{
void
*
mockPointer
=
(
void
*
)
0x1
;
char
*
clusterId
=
"cluster1"
;
char
*
dbname
=
"1.db1"
;
char
*
tablename
=
"table1"
;
SVgroupInfo
vgInfo
=
{
0
};
SQueryDag
dag
=
{
0
};
uint64_t
numOfRows
=
0
;
SArray
*
qnodeList
=
taosArrayInit
(
1
,
sizeof
(
SEpAddr
));
SEpAddr
qnodeAddr
=
{
0
};
strcpy
(
qnodeAddr
.
fqdn
,
"qnode0.ep"
);
qnodeAddr
.
port
=
6031
;
taosArrayPush
(
qnodeList
,
&
qnodeAddr
);
int32_t
code
=
schedulerInit
(
NULL
);
ASSERT_EQ
(
code
,
0
);
schtBuildInsertDag
(
&
dag
);
schtSetPlanToString
();
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_t
thread1
;
pthread_create
(
&
(
thread1
),
&
thattr
,
schtSendRsp
,
&
pInsertJob
);
code
=
scheduleExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
pInsertJob
,
&
numOfRows
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
numOfRows
,
20
);
scheduleFreeJob
(
pInsertJob
);
}
int
main
(
int
argc
,
char
**
argv
)
{
int
main
(
int
argc
,
char
**
argv
)
{
testing
::
InitGoogleTest
(
&
argc
,
argv
);
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
return
RUN_ALL_TESTS
();
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录