Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3c0a7e63
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3c0a7e63
编写于
12月 28, 2021
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/tq
上级
7f2efea9
5618e7e6
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
1049 addition
and
450 deletion
+1049
-450
include/common/tmsg.h
include/common/tmsg.h
+6
-6
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+1
-1
source/dnode/vnode/impl/inc/vnodeDef.h
source/dnode/vnode/impl/inc/vnodeDef.h
+1
-1
source/dnode/vnode/impl/inc/vnodeQuery.h
source/dnode/vnode/impl/inc/vnodeQuery.h
+3
-0
source/dnode/vnode/impl/src/vnodeQuery.c
source/dnode/vnode/impl/src/vnodeQuery.c
+17
-3
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+1
-1
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+62
-28
source/libs/parser/src/dCDAstProcess.c
source/libs/parser/src/dCDAstProcess.c
+2
-2
source/libs/qcom/src/querymsg.c
source/libs/qcom/src/querymsg.c
+2
-0
source/libs/qworker/inc/qworkerInt.h
source/libs/qworker/inc/qworkerInt.h
+46
-7
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+310
-253
source/libs/qworker/test/qworkerTests.cpp
source/libs/qworker/test/qworkerTests.cpp
+363
-5
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+7
-1
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+221
-135
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+7
-7
未找到文件。
include/common/tmsg.h
浏览文件 @
3c0a7e63
...
...
@@ -1018,7 +1018,7 @@ typedef struct {
}
SUpdateTagValRsp
;
typedef
struct
SSubQueryMsg
{
uint64_t
s
cheduler
Id
;
uint64_t
sId
;
uint64_t
queryId
;
uint64_t
taskId
;
uint32_t
contentLen
;
...
...
@@ -1026,7 +1026,7 @@ typedef struct SSubQueryMsg {
}
SSubQueryMsg
;
typedef
struct
SResReadyMsg
{
uint64_t
s
cheduler
Id
;
uint64_t
sId
;
uint64_t
queryId
;
uint64_t
taskId
;
}
SResReadyMsg
;
...
...
@@ -1036,13 +1036,13 @@ typedef struct SResReadyRsp {
}
SResReadyRsp
;
typedef
struct
SResFetchMsg
{
uint64_t
s
cheduler
Id
;
uint64_t
sId
;
uint64_t
queryId
;
uint64_t
taskId
;
}
SResFetchMsg
;
typedef
struct
SSchTasksStatusMsg
{
uint64_t
s
cheduler
Id
;
uint64_t
sId
;
}
SSchTasksStatusMsg
;
typedef
struct
STaskStatus
{
...
...
@@ -1057,7 +1057,7 @@ typedef struct SSchedulerStatusRsp {
}
SSchedulerStatusRsp
;
typedef
struct
STaskCancelMsg
{
uint64_t
s
cheduler
Id
;
uint64_t
sId
;
uint64_t
queryId
;
uint64_t
taskId
;
}
STaskCancelMsg
;
...
...
@@ -1067,7 +1067,7 @@ typedef struct STaskCancelRsp {
}
STaskCancelRsp
;
typedef
struct
STaskDropMsg
{
uint64_t
s
cheduler
Id
;
uint64_t
sId
;
uint64_t
queryId
;
uint64_t
taskId
;
}
STaskDropMsg
;
...
...
source/client/src/clientEnv.c
浏览文件 @
3c0a7e63
...
...
@@ -227,7 +227,7 @@ void taos_init_imp(void) {
rpcInit
();
SCatalogCfg
cfg
=
{.
enableVgroupCache
=
true
,
.
maxDBCacheNum
=
100
,
.
maxTblCacheNum
=
100
};
SCatalogCfg
cfg
=
{.
maxDBCacheNum
=
100
,
.
maxTblCacheNum
=
100
};
catalogInit
(
&
cfg
);
tscDebug
(
"starting to initialize TAOS driver, local ep: %s"
,
tsLocalEp
);
...
...
source/dnode/vnode/impl/inc/vnodeDef.h
浏览文件 @
3c0a7e63
...
...
@@ -73,7 +73,7 @@ struct SVnode {
SVnodeSync
*
pSync
;
SVnodeFS
*
pFs
;
tsem_t
canCommit
;
void
*
pQuery
;
SQHandle
*
pQuery
;
};
int
vnodeScheduleTask
(
SVnodeTask
*
task
);
...
...
source/dnode/vnode/impl/inc/vnodeQuery.h
浏览文件 @
3c0a7e63
...
...
@@ -22,6 +22,9 @@ extern "C" {
#include "vnodeInt.h"
#include "qworker.h"
typedef
struct
SQWorkerMgmt
SQHandle
;
int
vnodeQueryOpen
(
SVnode
*
pVnode
);
#ifdef __cplusplus
...
...
source/dnode/vnode/impl/src/vnodeQuery.c
浏览文件 @
3c0a7e63
...
...
@@ -22,13 +22,27 @@ int vnodeQueryOpen(SVnode *pVnode) {
int
vnodeProcessQueryReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
vInfo
(
"query message is processed"
);
qWorkerProcessQueryMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
return
0
;
return
qWorkerProcessQueryMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
}
int
vnodeProcessFetchReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
vInfo
(
"fetch message is processed"
);
qWorkerProcessFetchMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_FETCH
:
return
qWorkerProcessFetchMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_RES_READY
:
return
qWorkerProcessReadyMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_TASKS_STATUS
:
return
qWorkerProcessStatusMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_CANCEL_TASK
:
return
qWorkerProcessCancelMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_DROP_TASK
:
return
qWorkerProcessDropMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
default:
vError
(
"unknown msg type:%d in fetch queue"
,
pMsg
->
msgType
);
return
TSDB_CODE_VND_APP_ERROR
;
break
;
}
return
0
;
}
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
3c0a7e63
...
...
@@ -46,7 +46,6 @@ typedef struct STableMetaCache {
}
STableMetaCache
;
typedef
struct
SCatalog
{
SVgroupListCache
vgroupCache
;
SDBVgroupCache
dbCache
;
STableMetaCache
tableCache
;
}
SCatalog
;
...
...
@@ -67,6 +66,7 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define CTG_CACHE_ENABLED() (ctgMgmt.cfg.maxDBCacheNum > 0 || ctgMgmt.cfg.maxTblCacheNum > 0)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
3c0a7e63
...
...
@@ -146,8 +146,44 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
}
}
int32_t
ctgGetTableMetaFromMnode
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
STableMetaOutput
*
output
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pDBName
||
NULL
==
pTableName
||
NULL
==
output
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
char
tbFullName
[
TSDB_TABLE_FNAME_LEN
];
snprintf
(
tbFullName
,
sizeof
(
tbFullName
),
"%s.%s"
,
pDBName
,
pTableName
);
SBuildTableMetaInput
bInput
=
{.
vgId
=
0
,
.
tableFullName
=
tbFullName
};
char
*
msg
=
NULL
;
SEpSet
*
pVnodeEpSet
=
NULL
;
int32_t
msgLen
=
0
;
CTG_ERR_RET
(
queryBuildMsg
[
TDMT_MND_STB_META
](
&
bInput
,
&
msg
,
0
,
&
msgLen
));
SRpcMsg
rpcMsg
=
{
.
msgType
=
TDMT_MND_STB_META
,
.
pCont
=
msg
,
.
contLen
=
msgLen
,
};
SRpcMsg
rpcRsp
=
{
0
};
int32_t
ctgGetTableMetaFromMnode
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SVgroupInfo
*
vgroupInfo
,
STableMetaOutput
*
output
)
{
rpcSendRecv
(
pRpc
,
(
SEpSet
*
)
pMgmtEps
,
&
rpcMsg
,
&
rpcRsp
);
if
(
TSDB_CODE_SUCCESS
!=
rpcRsp
.
code
)
{
ctgError
(
"error rsp for table meta, code:%x"
,
rpcRsp
.
code
);
CTG_ERR_RET
(
rpcRsp
.
code
);
}
CTG_ERR_RET
(
queryProcessMsgRsp
[
TDMT_MND_STB_META
](
output
,
rpcRsp
.
pCont
,
rpcRsp
.
contLen
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetTableMetaFromVnode
(
struct
SCatalog
*
pCatalog
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
const
char
*
pTableName
,
SVgroupInfo
*
vgroupInfo
,
STableMetaOutput
*
output
)
{
if
(
NULL
==
pCatalog
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
pDBName
||
NULL
==
pTableName
||
NULL
==
vgroupInfo
||
NULL
==
output
)
{
CTG_ERR_RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
...
...
@@ -307,7 +343,9 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
ctgError
(
"init hash[%d] for tablemeta cache failed"
,
ctgMgmt
.
cfg
.
maxTblCacheNum
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
}
if
(
NULL
==
pCatalog
->
tableCache
.
stableCache
)
{
pCatalog
->
tableCache
.
stableCache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxTblCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
tableCache
.
stableCache
)
{
ctgError
(
"init hash[%d] for stablemeta cache failed"
,
ctgMgmt
.
cfg
.
maxTblCacheNum
);
...
...
@@ -318,55 +356,51 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
if
(
output
->
metaNum
==
2
)
{
if
(
taosHashPut
(
pCatalog
->
tableCache
.
cache
,
output
->
ctbFname
,
strlen
(
output
->
ctbFname
),
&
output
->
ctbMeta
,
sizeof
(
output
->
ctbMeta
))
!=
0
)
{
ctgError
(
"push ctable[%s] to table cache failed"
,
output
->
ctbFname
);
goto
error_exit
;
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
)
;
}
if
(
TSDB_SUPER_TABLE
!=
output
->
tbMeta
->
tableType
)
{
ctgError
(
"table type[%d] error, expected:%d"
,
output
->
tbMeta
->
tableType
,
TSDB_SUPER_TABLE
);
goto
error_exit
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
)
;
}
}
int32_t
tbSize
=
sizeof
(
*
output
->
tbMeta
)
+
sizeof
(
SSchema
)
*
(
output
->
tbMeta
->
tableInfo
.
numOfColumns
+
output
->
tbMeta
->
tableInfo
.
numOfTags
);
if
(
taosHashPut
(
pCatalog
->
tableCache
.
cache
,
output
->
tbFname
,
strlen
(
output
->
tbFname
),
output
->
tbMeta
,
tbSize
)
!=
0
)
{
ctgError
(
"push table[%s] to table cache failed"
,
output
->
tbFname
);
goto
error_exit
;
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
)
;
}
if
(
TSDB_SUPER_TABLE
==
output
->
tbMeta
->
tableType
)
{
if
(
taosHashPut
(
pCatalog
->
tableCache
.
stableCache
,
&
output
->
tbMeta
->
suid
,
sizeof
(
output
->
tbMeta
->
suid
),
&
output
->
tbMeta
,
POINTER_BYTES
)
!=
0
)
{
ctgError
(
"push suid[%"
PRIu64
"] to stable cache failed"
,
output
->
tbMeta
->
suid
);
goto
error_exit
;
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
)
;
}
}
return
TSDB_CODE_SUCCESS
;
error_exit:
if
(
pCatalog
->
vgroupCache
.
cache
)
{
taosHashCleanup
(
pCatalog
->
vgroupCache
.
cache
);
pCatalog
->
vgroupCache
.
cache
=
NULL
;
}
pCatalog
->
vgroupCache
.
vgroupVersion
=
CTG_DEFAULT_INVALID_VERSION
;
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
int32_t
catalogInit
(
SCatalogCfg
*
cfg
)
{
ctgMgmt
.
pCluster
=
taosHashInit
(
CTG_DEFAULT_CACHE_CLUSTER_NUMBER
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
ctgMgmt
.
pCluster
)
{
CTG_ERR_
LRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
,
"init %d cluster cache failed"
,
CTG_DEFAULT_CACHE_CLUSTER_NUMBER
);
if
(
ctgMgmt
.
pCluster
)
{
ctgError
(
"catalog already init"
);
CTG_ERR_
RET
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
if
(
cfg
)
{
memcpy
(
&
ctgMgmt
.
cfg
,
cfg
,
sizeof
(
*
cfg
));
}
else
{
ctgMgmt
.
cfg
.
enableVgroupCache
=
true
;
ctgMgmt
.
cfg
.
maxDBCacheNum
=
CTG_DEFAULT_CACHE_DB_NUMBER
;
ctgMgmt
.
cfg
.
maxTblCacheNum
=
CTG_DEFAULT_CACHE_TABLEMETA_NUMBER
;
}
if
(
CTG_CACHE_ENABLED
())
{
ctgMgmt
.
pCluster
=
taosHashInit
(
CTG_DEFAULT_CACHE_CLUSTER_NUMBER
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
ctgMgmt
.
pCluster
)
{
CTG_ERR_LRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
,
"init %d cluster cache failed"
,
CTG_DEFAULT_CACHE_CLUSTER_NUMBER
);
}
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -381,21 +415,19 @@ int32_t catalogGetHandle(const char* clusterId , struct SCatalog** catalogHandle
}
size_t
clen
=
strlen
(
clusterId
);
SCatalog
*
clusterCtg
=
(
SCatalog
*
)
taosHashGet
(
ctgMgmt
.
pCluster
,
clusterId
,
clen
);
SCatalog
*
*
ctg
=
(
SCatalog
*
*
)
taosHashGet
(
ctgMgmt
.
pCluster
,
clusterId
,
clen
);
if
(
c
lusterCtg
)
{
*
catalogHandle
=
clusterC
tg
;
if
(
c
tg
&&
(
*
ctg
)
)
{
*
catalogHandle
=
*
c
tg
;
return
TSDB_CODE_SUCCESS
;
}
clusterCtg
=
calloc
(
1
,
sizeof
(
*
clusterCt
g
));
SCatalog
*
clusterCtg
=
calloc
(
1
,
sizeof
(
SCatalo
g
));
if
(
NULL
==
clusterCtg
)
{
ctgError
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
*
clusterCt
g
));
ctgError
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SCatalo
g
));
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
clusterCtg
->
vgroupCache
.
vgroupVersion
=
CTG_DEFAULT_INVALID_VERSION
;
if
(
taosHashPut
(
ctgMgmt
.
pCluster
,
clusterId
,
clen
,
&
clusterCtg
,
POINTER_BYTES
))
{
ctgError
(
"put cluster %s cache to hash failed"
,
clusterId
);
tfree
(
clusterCtg
);
...
...
@@ -443,7 +475,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
}
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
pCatalog
->
dbCache
.
cache
=
taosHashInit
(
CTG_DEFAULT_CACHE_DB_NUMBER
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
pCatalog
->
dbCache
.
cache
=
taosHashInit
(
ctgMgmt
.
cfg
.
maxDBCacheNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
ctgError
(
"init hash[%d] for db cache failed"
,
CTG_DEFAULT_CACHE_DB_NUMBER
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
...
...
@@ -515,7 +547,9 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe
STableMetaOutput
output
=
{
0
};
CTG_ERR_RET
(
ctgGetTableMetaFromMnode
(
pCatalog
,
pRpc
,
pMgmtEps
,
pDBName
,
pTableName
,
&
vgroupInfo
,
&
output
));
//CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output));
CTG_ERR_RET
(
ctgGetTableMetaFromMnode
(
pCatalog
,
pRpc
,
pMgmtEps
,
pDBName
,
pTableName
,
&
output
));
CTG_ERR_RET
(
ctgUpdateTableMetaCache
(
pCatalog
,
&
output
));
...
...
source/libs/parser/src/dCDAstProcess.c
浏览文件 @
3c0a7e63
...
...
@@ -313,7 +313,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
return
code
;
}
co
de
=
tNameGetTableName
(
&
name
,
pCreateTableInfo
->
tagdata
.
name
);
co
nst
char
*
pSTableName
=
tNameGetTableName
(
&
name
);
SArray
*
pValList
=
pCreateTableInfo
->
pTagVals
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -326,7 +326,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
char
dbName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
&
name
,
dbName
);
catalogGetTableMeta
(
pCtx
->
pCatalog
,
pCtx
->
pTransporter
,
&
pCtx
->
mgmtEpSet
,
dbName
,
p
CreateTableInfo
->
tagdata
.
n
ame
,
&
pSuperTableMeta
);
catalogGetTableMeta
(
pCtx
->
pCatalog
,
pCtx
->
pTransporter
,
&
pCtx
->
mgmtEpSet
,
dbName
,
p
STableN
ame
,
&
pSuperTableMeta
);
// too long tag values will return invalid sql, not be truncated automatically
SSchema
*
pTagSchema
=
getTableTagSchema
(
pSuperTableMeta
);
...
...
source/libs/qcom/src/querymsg.c
浏览文件 @
3c0a7e63
...
...
@@ -266,9 +266,11 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
void
initQueryModuleMsgHandle
()
{
queryBuildMsg
[
TDMT_VND_TABLE_META
]
=
queryBuildTableMetaReqMsg
;
queryBuildMsg
[
TDMT_MND_STB_META
]
=
queryBuildTableMetaReqMsg
;
queryBuildMsg
[
TDMT_MND_USE_DB
]
=
queryBuildUseDbMsg
;
queryProcessMsgRsp
[
TDMT_VND_TABLE_META
]
=
queryProcessTableMetaRsp
;
queryProcessMsgRsp
[
TDMT_MND_STB_META
]
=
queryProcessTableMetaRsp
;
queryProcessMsgRsp
[
TDMT_MND_USE_DB
]
=
queryProcessUseDBRsp
;
}
...
...
source/libs/qworker/inc/qworkerInt.h
浏览文件 @
3c0a7e63
...
...
@@ -42,25 +42,41 @@ enum {
QW_WRITE
,
};
typedef
struct
SQWorkerTaskStatus
{
enum
{
QW_EXIST_ACQUIRE
=
1
,
QW_EXIST_RET_ERR
,
};
enum
{
QW_NOT_EXIST_RET_ERR
=
1
,
QW_NOT_EXIST_ADD
,
};
enum
{
QW_ADD_RET_ERR
=
1
,
QW_ADD_ACQUIRE
,
};
typedef
struct
SQWTaskStatus
{
SRWLatch
lock
;
int32_t
code
;
int8_t
status
;
int8_t
ready
;
bool
cancel
;
bool
drop
;
}
SQW
orker
TaskStatus
;
}
SQWTaskStatus
;
typedef
struct
SQWorkerResCache
{
SRWLatch
lock
;
void
*
data
;
}
SQWorkerResCache
;
typedef
struct
SQW
orker
SchStatus
{
typedef
struct
SQWSchStatus
{
int32_t
lastAccessTs
;
// timestamp in second
SRWLatch
tasksLock
;
SHashObj
*
tasksHash
;
// key:queryId+taskId, value: SQWorkerTaskStatus
}
SQW
orker
SchStatus
;
}
SQWSchStatus
;
// Qnode/Vnode level task management
typedef
struct
SQWorkerMgmt
{
...
...
@@ -71,7 +87,7 @@ typedef struct SQWorkerMgmt {
SHashObj
*
resHash
;
//key: queryId+taskId, value: SQWorkerResCache
}
SQWorkerMgmt
;
#define QW_GOT_RES_DATA(data) (
fals
e)
#define QW_GOT_RES_DATA(data) (
tru
e)
#define QW_LOW_RES_DATA(data) (false)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
...
...
@@ -86,8 +102,31 @@ typedef struct SQWorkerMgmt {
#define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define QW_LOCK(type, _lock) (QW_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define QW_UNLOCK(type, _lock) (QW_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
#define QW_LOCK(type, _lock) do { \
if (QW_READ == (type)) { \
if ((*(_lock)) < 0) assert(0); \
taosRLockLatch(_lock); \
qDebug("RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} else { \
if ((*(_lock)) < 0) assert(0); \
taosWLockLatch(_lock); \
qDebug("WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} \
} while (0)
#define QW_UNLOCK(type, _lock) do { \
if (QW_READ == (type)) { \
if ((*(_lock)) <= 0) assert(0); \
taosRUnLockLatch(_lock); \
qDebug("RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} else { \
if ((*(_lock)) <= 0) assert(0); \
taosWUnLockLatch(_lock); \
qDebug("WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} \
} while (0)
static
int32_t
qwAcquireScheduler
(
int32_t
rwType
,
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
SQWSchStatus
**
sch
,
int32_t
nOpt
);
#ifdef __cplusplus
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
3c0a7e63
...
...
@@ -4,38 +4,42 @@
#include "qworkerInt.h"
#include "planner.h"
int32_t
qw
CheckStatusSwitch
(
int8_t
oriStatus
,
int8_t
newStatus
)
{
int32_t
qw
ValidateStatus
(
int8_t
oriStatus
,
int8_t
newStatus
)
{
int32_t
code
=
0
;
if
(
oriStatus
==
newStatus
)
{
if
(
newStatus
==
JOB_TASK_STATUS_CANCELLING
)
{
return
TSDB_CODE_SUCCESS
;
}
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
switch
(
oriStatus
)
{
case
JOB_TASK_STATUS_NULL
:
if
(
newStatus
!=
JOB_TASK_STATUS_EXECUTING
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
)
{
if
(
newStatus
!=
JOB_TASK_STATUS_EXECUTING
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_NOT_START
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_NOT_START
:
if
(
newStatus
!=
JOB_TASK_STATUS_
EXECUTING
&&
newStatus
!=
JOB_TASK_STATUS_FAI
LED
)
{
if
(
newStatus
!=
JOB_TASK_STATUS_
CANCEL
LED
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_EXECUTING
:
if
(
newStatus
!=
JOB_TASK_STATUS_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLING
)
{
if
(
newStatus
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLING
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_PARTIAL_SUCCEED
:
if
(
newStatus
!=
JOB_TASK_STATUS_EXECUTING
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLING
)
{
if
(
newStatus
!=
JOB_TASK_STATUS_EXECUTING
&&
newStatus
!=
JOB_TASK_STATUS_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLED
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
...
...
@@ -49,6 +53,10 @@ int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) {
break
;
case
JOB_TASK_STATUS_CANCELLED
:
case
JOB_TASK_STATUS_DROPPING
:
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
break
;
default:
qError
(
"invalid task status:%d"
,
oriStatus
);
return
TSDB_CODE_QRY_APP_ERROR
;
...
...
@@ -58,17 +66,17 @@ int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) {
_return:
qError
(
"invalid task status
:%d"
,
ori
Status
);
qError
(
"invalid task status
, from %d to %d"
,
oriStatus
,
new
Status
);
QW_ERR_RET
(
code
);
}
int32_t
qwUpdateTaskInfo
(
SQW
orker
TaskStatus
*
task
,
int8_t
type
,
void
*
data
)
{
int32_t
qwUpdateTaskInfo
(
SQWTaskStatus
*
task
,
int8_t
type
,
void
*
data
)
{
int32_t
code
=
0
;
switch
(
type
)
{
case
QW_TASK_INFO_STATUS
:
{
int8_t
newStatus
=
*
(
int8_t
*
)
data
;
QW_ERR_RET
(
qw
CheckStatusSwitch
(
task
->
status
,
newStatus
));
QW_ERR_RET
(
qw
ValidateStatus
(
task
->
status
,
newStatus
));
task
->
status
=
newStatus
;
break
;
}
...
...
@@ -80,9 +88,9 @@ int32_t qwUpdateTaskInfo(SQWorkerTaskStatus *task, int8_t type, void *data) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwAddTaskRes
ult
(
SQWorkerMgmt
*
mgmt
,
uint64_t
queryId
,
uint64_t
task
Id
,
void
*
data
)
{
char
id
[
sizeof
(
q
ueryId
)
+
sizeof
(
task
Id
)]
=
{
0
};
QW_SET_QTID
(
id
,
q
ueryId
,
task
Id
);
int32_t
qwAddTaskRes
Cache
(
SQWorkerMgmt
*
mgmt
,
uint64_t
qId
,
uint64_t
t
Id
,
void
*
data
)
{
char
id
[
sizeof
(
q
Id
)
+
sizeof
(
t
Id
)]
=
{
0
};
QW_SET_QTID
(
id
,
q
Id
,
t
Id
);
SQWorkerResCache
resCache
=
{
0
};
resCache
.
data
=
data
;
...
...
@@ -90,7 +98,7 @@ int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, v
QW_LOCK
(
QW_WRITE
,
&
mgmt
->
resLock
);
if
(
0
!=
taosHashPut
(
mgmt
->
resHash
,
id
,
sizeof
(
id
),
&
resCache
,
sizeof
(
SQWorkerResCache
)))
{
QW_UNLOCK
(
QW_WRITE
,
&
mgmt
->
resLock
);
qError
(
"taosHashPut queryId[%"
PRIx64
"] taskId[%"
PRIx64
"] to resHash failed"
,
q
ueryId
,
task
Id
);
qError
(
"taosHashPut queryId[%"
PRIx64
"] taskId[%"
PRIx64
"] to resHash failed"
,
q
Id
,
t
Id
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
...
...
@@ -99,37 +107,8 @@ int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, v
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwGetTaskResult
(
SQWorkerMgmt
*
mgmt
,
uint64_t
queryId
,
uint64_t
taskId
,
void
**
data
)
{
char
id
[
sizeof
(
queryId
)
+
sizeof
(
taskId
)]
=
{
0
};
QW_SET_QTID
(
id
,
queryId
,
taskId
);
SQWorkerResCache
*
resCache
=
taosHashGet
(
mgmt
->
resHash
,
id
,
sizeof
(
id
));
if
(
NULL
==
resCache
)
{
qError
(
"no task res for queryId[%"
PRIx64
"] taskId[%"
PRIx64
"]"
,
queryId
,
taskId
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
*
data
=
resCache
->
data
;
return
TSDB_CODE_SUCCESS
;
}
static
FORCE_INLINE
int32_t
qwAcquireScheduler
(
int32_t
rwType
,
SQWorkerMgmt
*
mgmt
,
uint64_t
schedulerId
,
SQWorkerSchStatus
**
sch
)
{
QW_LOCK
(
rwType
,
&
mgmt
->
schLock
);
*
sch
=
taosHashGet
(
mgmt
->
schHash
,
&
schedulerId
,
sizeof
(
schedulerId
));
if
(
NULL
==
(
*
sch
))
{
QW_LOCK
(
rwType
,
&
mgmt
->
schLock
);
return
TSDB_CODE_QRY_SCH_NOT_EXIST
;
}
return
TSDB_CODE_SUCCESS
;
}
static
FORCE_INLINE
int32_t
qwInsertAndAcquireScheduler
(
int32_t
rwType
,
SQWorkerMgmt
*
mgmt
,
uint64_t
schedulerId
,
SQWorkerSchStatus
**
sch
)
{
SQWorkerSchStatus
newSch
=
{
0
};
static
int32_t
qwAddScheduler
(
int32_t
rwType
,
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
SQWSchStatus
**
sch
)
{
SQWSchStatus
newSch
=
{
0
};
newSch
.
tasksHash
=
taosHashInit
(
mgmt
->
cfg
.
maxSchTaskNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
newSch
.
tasksHash
)
{
qError
(
"taosHashInit %d failed"
,
mgmt
->
cfg
.
maxSchTaskNum
);
...
...
@@ -138,19 +117,18 @@ static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorker
while
(
true
)
{
QW_LOCK
(
QW_WRITE
,
&
mgmt
->
schLock
);
int32_t
code
=
taosHashPut
(
mgmt
->
schHash
,
&
s
chedulerId
,
sizeof
(
scheduler
Id
),
&
newSch
,
sizeof
(
newSch
));
int32_t
code
=
taosHashPut
(
mgmt
->
schHash
,
&
s
Id
,
sizeof
(
s
Id
),
&
newSch
,
sizeof
(
newSch
));
if
(
0
!=
code
)
{
if
(
!
HASH_NODE_EXIST
(
code
))
{
QW_UNLOCK
(
QW_WRITE
,
&
mgmt
->
schLock
);
qError
(
"taosHashPut s
chedulerId[%"
PRIx64
"] to scheduleHash failed"
,
scheduler
Id
);
qError
(
"taosHashPut s
Id[%"
PRIx64
"] to scheduleHash failed"
,
s
Id
);
taosHashCleanup
(
newSch
.
tasksHash
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
}
QW_UNLOCK
(
QW_WRITE
,
&
mgmt
->
schLock
);
if
(
TSDB_CODE_SUCCESS
==
qwAcquireScheduler
(
rwType
,
mgmt
,
schedulerId
,
sch
))
{
taosHashCleanup
(
newSch
.
tasksHash
);
if
(
TSDB_CODE_SUCCESS
==
qwAcquireScheduler
(
rwType
,
mgmt
,
sId
,
sch
,
QW_NOT_EXIST_ADD
))
{
return
TSDB_CODE_SUCCESS
;
}
}
...
...
@@ -159,63 +137,122 @@ static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorker
}
static
int32_t
qwAcquireScheduler
(
int32_t
rwType
,
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
SQWSchStatus
**
sch
,
int32_t
nOpt
)
{
QW_LOCK
(
rwType
,
&
mgmt
->
schLock
);
*
sch
=
taosHashGet
(
mgmt
->
schHash
,
&
sId
,
sizeof
(
sId
));
if
(
NULL
==
(
*
sch
))
{
QW_UNLOCK
(
rwType
,
&
mgmt
->
schLock
);
if
(
QW_NOT_EXIST_ADD
==
nOpt
)
{
return
qwAddScheduler
(
rwType
,
mgmt
,
sId
,
sch
);
}
else
if
(
QW_NOT_EXIST_RET_ERR
==
nOpt
)
{
return
TSDB_CODE_QRY_SCH_NOT_EXIST
;
}
else
{
assert
(
0
);
}
}
return
TSDB_CODE_SUCCESS
;
}
static
FORCE_INLINE
void
qwReleaseScheduler
(
int32_t
rwType
,
SQWorkerMgmt
*
mgmt
)
{
QW_UNLOCK
(
rwType
,
&
mgmt
->
schLock
);
}
static
FORCE_INLINE
int32_t
qwAcquireTask
(
int32_t
rwType
,
SQWorkerSchStatus
*
sch
,
uint64_t
queryId
,
uint64_t
taskId
,
SQWorker
TaskStatus
**
task
)
{
char
id
[
sizeof
(
q
ueryId
)
+
sizeof
(
task
Id
)]
=
{
0
};
QW_SET_QTID
(
id
,
q
ueryId
,
task
Id
);
static
int32_t
qwAcquireTaskImpl
(
int32_t
rwType
,
SQWSchStatus
*
sch
,
uint64_t
qId
,
uint64_t
tId
,
SQW
TaskStatus
**
task
)
{
char
id
[
sizeof
(
q
Id
)
+
sizeof
(
t
Id
)]
=
{
0
};
QW_SET_QTID
(
id
,
q
Id
,
t
Id
);
QW_LOCK
(
rwType
,
&
sch
->
tasksLock
);
*
task
=
taosHashGet
(
sch
->
tasksHash
,
id
,
sizeof
(
id
));
if
(
NULL
==
(
*
task
))
{
QW_UNLOCK
(
rwType
,
&
sch
->
tasksLock
);
return
TSDB_CODE_QRY_TASK_NOT_EXIST
;
}
return
TSDB_CODE_SUCCESS
;
}
static
FORCE_INLINE
int32_t
qwInsertAndAcquireTask
(
int32_t
rwType
,
SQWorkerSchStatus
*
sch
,
uint64_t
queryId
,
uint64_t
taskId
,
int8_t
status
,
bool
*
inserted
,
SQWorkerTaskStatus
**
task
)
{
char
id
[
sizeof
(
queryId
)
+
sizeof
(
taskId
)]
=
{
0
};
QW_SET_QTID
(
id
,
queryId
,
taskId
);
static
int32_t
qwAcquireTask
(
int32_t
rwType
,
SQWSchStatus
*
sch
,
uint64_t
qId
,
uint64_t
tId
,
SQWTaskStatus
**
task
)
{
return
qwAcquireTaskImpl
(
rwType
,
sch
,
qId
,
tId
,
task
);
}
static
FORCE_INLINE
void
qwReleaseTask
(
int32_t
rwType
,
SQWSchStatus
*
sch
)
{
QW_UNLOCK
(
rwType
,
&
sch
->
tasksLock
);
}
int32_t
qwAddTaskToSch
(
int32_t
rwType
,
SQWSchStatus
*
sch
,
uint64_t
qId
,
uint64_t
tId
,
int8_t
status
,
int32_t
eOpt
,
SQWTaskStatus
**
task
)
{
int32_t
code
=
0
;
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
SQWTaskStatus
ntask
=
{
0
};
ntask
.
status
=
status
;
while
(
true
)
{
*
inserted
=
false
;
QW_LOCK
(
QW_WRITE
,
&
sch
->
tasksLock
);
int32_t
code
=
taosHashPut
(
sch
->
tasksHash
,
id
,
sizeof
(
id
),
&
status
,
sizeof
(
status
));
int32_t
code
=
taosHashPut
(
sch
->
tasksHash
,
id
,
sizeof
(
id
),
&
ntask
,
sizeof
(
ntask
));
if
(
0
!=
code
)
{
QW_UNLOCK
(
QW_WRITE
,
&
sch
->
tasksLock
);
if
(
HASH_NODE_EXIST
(
code
))
{
if
(
qwAcquireTask
(
rwType
,
sch
,
queryId
,
taskId
,
task
))
{
continue
;
if
(
QW_EXIST_ACQUIRE
==
eOpt
&&
rwType
&&
task
)
{
if
(
qwAcquireTask
(
rwType
,
sch
,
qId
,
tId
,
task
))
{
continue
;
}
}
else
if
(
QW_EXIST_RET_ERR
==
eOpt
)
{
return
TSDB_CODE_QRY_TASK_ALREADY_EXIST
;
}
else
{
assert
(
0
);
}
break
;
}
else
{
qError
(
"taosHashPut queryId[%"
PRIx64
"] taskId[%"
PRIx64
"] to scheduleHash failed"
,
q
ueryId
,
task
Id
);
qError
(
"taosHashPut queryId[%"
PRIx64
"] taskId[%"
PRIx64
"] to scheduleHash failed"
,
q
Id
,
t
Id
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
}
QW_UNLOCK
(
QW_WRITE
,
&
sch
->
tasksLock
);
*
inserted
=
true
;
if
(
TSDB_CODE_SUCCESS
==
qwAcquireTask
(
rwType
,
sch
,
queryId
,
taskId
,
task
))
{
return
TSDB_CODE_SUCCESS
;
if
(
rwType
&&
task
)
{
if
(
TSDB_CODE_SUCCESS
==
qwAcquireTask
(
rwType
,
sch
,
qId
,
tId
,
task
))
{
return
TSDB_CODE_SUCCESS
;
}
}
else
{
break
;
}
}
}
return
TSDB_CODE_SUCCESS
;
}
static
FORCE_INLINE
void
qwReleaseTask
(
int32_t
rwType
,
SQWorkerSchStatus
*
sch
)
{
QW_UNLOCK
(
rwType
,
&
sch
->
tasksLock
);
static
int32_t
qwAddTask
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
int32_t
status
,
int32_t
eOpt
,
SQWSchStatus
**
sch
,
SQWTaskStatus
**
task
)
{
SQWSchStatus
*
tsch
=
NULL
;
QW_ERR_RET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
sId
,
&
tsch
,
QW_NOT_EXIST_ADD
));
int32_t
code
=
qwAddTaskToSch
(
QW_READ
,
tsch
,
qId
,
tId
,
status
,
eOpt
,
task
);
if
(
code
)
{
qwReleaseScheduler
(
QW_WRITE
,
mgmt
);
}
if
(
NULL
==
task
)
{
qwReleaseScheduler
(
QW_READ
,
mgmt
);
}
else
if
(
sch
)
{
*
sch
=
tsch
;
}
QW_RET
(
code
);
}
static
FORCE_INLINE
int32_t
qwAcquireTaskResCache
(
int32_t
rwType
,
SQWorkerMgmt
*
mgmt
,
uint64_t
queryId
,
uint64_t
taskId
,
SQWorkerResCache
**
res
)
{
char
id
[
sizeof
(
queryId
)
+
sizeof
(
taskId
)]
=
{
0
};
QW_SET_QTID
(
id
,
queryId
,
taskId
);
...
...
@@ -235,27 +272,24 @@ static FORCE_INLINE void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgm
}
int32_t
qwGetSchTasksStatus
(
SQWorkerMgmt
*
mgmt
,
uint64_t
s
cheduler
Id
,
SSchedulerStatusRsp
**
rsp
)
{
SQW
orkerSchStatus
*
schStatus
=
NULL
;
int32_t
qwGetSchTasksStatus
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
SSchedulerStatusRsp
**
rsp
)
{
SQW
SchStatus
*
sch
=
NULL
;
int32_t
taskNum
=
0
;
if
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
schedulerId
,
&
schStatus
))
{
qWarn
(
"no scheduler for schedulerId[%"
PRIx64
"]"
,
schedulerId
);
}
else
{
schStatus
->
lastAccessTs
=
taosGetTimestampSec
();
QW_ERR_RET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
sId
,
&
sch
,
QW_NOT_EXIST_RET_ERR
));
sch
->
lastAccessTs
=
taosGetTimestampSec
();
QW_LOCK
(
QW_READ
,
&
schStatus
->
tasksLock
);
taskNum
=
taosHashGetSize
(
schStatus
->
tasksHash
);
}
QW_LOCK
(
QW_READ
,
&
sch
->
tasksLock
);
taskNum
=
taosHashGetSize
(
sch
->
tasksHash
);
int32_t
size
=
sizeof
(
SSchedulerStatusRsp
)
+
sizeof
((
*
rsp
)
->
status
[
0
])
*
taskNum
;
*
rsp
=
calloc
(
1
,
size
);
if
(
NULL
==
*
rsp
)
{
qError
(
"calloc %d failed"
,
size
);
if
(
schStatus
)
{
QW_UNLOCK
(
QW_READ
,
&
schStatus
->
tasksLock
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
}
QW_UNLOCK
(
QW_READ
,
&
sch
->
tasksLock
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
...
...
@@ -264,23 +298,19 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SScheduler
size_t
keyLen
=
0
;
int32_t
i
=
0
;
if
(
schStatus
)
{
void
*
pIter
=
taosHashIterate
(
schStatus
->
tasksHash
,
NULL
);
while
(
pIter
)
{
SQWorkerTaskStatus
*
taskStatus
=
(
SQWorkerTaskStatus
*
)
pIter
;
taosHashGetKey
(
pIter
,
&
key
,
&
keyLen
);
void
*
pIter
=
taosHashIterate
(
sch
->
tasksHash
,
NULL
);
while
(
pIter
)
{
SQWTaskStatus
*
taskStatus
=
(
SQWTaskStatus
*
)
pIter
;
taosHashGetKey
(
pIter
,
&
key
,
&
keyLen
);
QW_GET_QTID
(
key
,
(
*
rsp
)
->
status
[
i
].
queryId
,
(
*
rsp
)
->
status
[
i
].
taskId
);
(
*
rsp
)
->
status
[
i
].
status
=
taskStatus
->
status
;
pIter
=
taosHashIterate
(
schStatus
->
tasksHash
,
pIter
);
}
}
QW_GET_QTID
(
key
,
(
*
rsp
)
->
status
[
i
].
queryId
,
(
*
rsp
)
->
status
[
i
].
taskId
);
(
*
rsp
)
->
status
[
i
].
status
=
taskStatus
->
status
;
pIter
=
taosHashIterate
(
sch
->
tasksHash
,
pIter
);
}
if
(
schStatus
)
{
QW_UNLOCK
(
QW_READ
,
&
schStatus
->
tasksLock
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
}
QW_UNLOCK
(
QW_READ
,
&
sch
->
tasksLock
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
(
*
rsp
)
->
num
=
taskNum
;
...
...
@@ -289,115 +319,81 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SScheduler
int32_t
qwUpdateSchLastAccess
(
SQWorkerMgmt
*
mgmt
,
uint64_t
s
cheduler
Id
)
{
SQW
orkerSchStatus
*
schStatus
=
NULL
;
int32_t
qwUpdateSchLastAccess
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
)
{
SQW
SchStatus
*
sch
=
NULL
;
QW_ERR_RET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
s
chedulerId
,
&
schStatus
));
QW_ERR_RET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
s
Id
,
&
sch
,
QW_NOT_EXIST_RET_ERR
));
sch
Status
->
lastAccessTs
=
taosGetTimestampSec
();
sch
->
lastAccessTs
=
taosGetTimestampSec
();
qwReleaseScheduler
(
QW_READ
,
mgmt
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwGetTaskStatus
(
SQWorkerMgmt
*
mgmt
,
uint64_t
schedulerId
,
uint64_t
queryId
,
uint64_t
taskId
,
int8_t
*
taskStatus
)
{
SQWorkerSchStatus
*
sch
=
NULL
;
SQWorkerTaskStatus
*
task
=
NULL
;
int32_t
qwUpdateTaskStatus
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
int8_t
status
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
QW_ERR_RET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
schedulerId
,
&
sch
));
QW_ERR_
JRET
(
qwAcquireTask
(
QW_READ
,
sch
,
queryId
,
taskId
,
&
task
));
QW_ERR_
RET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
sId
,
&
sch
,
QW_NOT_EXIST_RET_ERR
));
*
taskStatus
=
task
->
status
;
QW_ERR_JRET
(
qwAcquireTask
(
QW_READ
,
sch
,
qId
,
tId
,
&
task
))
;
QW_LOCK
(
QW_WRITE
,
&
task
->
lock
);
qwUpdateTaskInfo
(
task
,
QW_TASK_INFO_STATUS
,
&
status
);
QW_UNLOCK
(
QW_WRITE
,
&
task
->
lock
);
_return:
if
(
task
)
{
qwReleaseTask
(
QW_READ
,
sch
);
}
if
(
sch
)
{
qwReleaseScheduler
(
QW_READ
,
mgmt
);
}
qwReleaseTask
(
QW_READ
,
sch
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
QW_RET
(
code
);
}
int32_t
qw
SwitchTaskStatus
(
SQWorkerMgmt
*
mgmt
,
uint64_t
schedulerId
,
uint64_t
queryId
,
uint64_t
taskId
,
int8_t
taskStatus
)
{
SQW
orker
SchStatus
*
sch
=
NULL
;
SQW
orker
TaskStatus
*
task
=
NULL
;
int32_t
qw
GetTaskStatus
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
queryId
,
uint64_t
taskId
,
int8_t
*
taskStatus
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
bool
inserted
=
false
;
if
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
schedulerId
,
&
sch
))
{
if
(
qwCheckStatusSwitch
(
JOB_TASK_STATUS_NULL
,
taskStatus
))
{
qError
(
"switch status error, not start to %d"
,
taskStatus
);
QW_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
QW_ERR_RET
(
qwInsertAndAcquireScheduler
(
QW_READ
,
mgmt
,
schedulerId
,
&
sch
));
if
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
sId
,
&
sch
,
QW_NOT_EXIST_RET_ERR
))
{
*
taskStatus
=
JOB_TASK_STATUS_NULL
;
return
TSDB_CODE_SUCCESS
;
}
if
(
qwAcquireTask
(
QW_READ
,
sch
,
queryId
,
taskId
,
&
task
))
{
if
(
qwCheckStatusSwitch
(
JOB_TASK_STATUS_NOT_START
,
taskStatus
))
{
qwReleaseScheduler
(
QW_READ
,
mgmt
);
qError
(
"switch status error, not start to %d"
,
taskStatus
);
QW_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
QW_ERR_JRET
(
qwInsertAndAcquireTask
(
QW_READ
,
sch
,
queryId
,
taskId
,
taskStatus
,
&
inserted
,
&
task
));
if
(
inserted
)
{
qwReleaseTask
(
QW_READ
,
sch
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
return
TSDB_CODE_SUCCESS
;
}
QW_LOCK
(
QW_WRITE
,
&
task
->
lock
);
code
=
qwUpdateTaskInfo
(
task
,
QW_TASK_INFO_STATUS
,
&
taskStatus
);
QW_UNLOCK
(
QW_WRITE
,
&
task
->
lock
);
qwReleaseTask
(
QW_READ
,
sch
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
QW_RET
(
code
);
*
taskStatus
=
JOB_TASK_STATUS_NULL
;
return
TSDB_CODE_SUCCESS
;
}
QW_LOCK
(
QW_WRITE
,
&
task
->
lock
);
code
=
qwUpdateTaskInfo
(
task
,
QW_TASK_INFO_STATUS
,
&
taskStatus
);
QW_UNLOCK
(
QW_WRITE
,
&
task
->
lock
);
_return:
*
taskStatus
=
task
->
status
;
qwReleaseTask
(
QW_READ
,
sch
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
QW_RET
(
code
);
}
int32_t
qwCancelTask
(
SQWorkerMgmt
*
mgmt
,
uint64_t
s
cheduler
Id
,
uint64_t
queryId
,
uint64_t
taskId
)
{
SQW
orker
SchStatus
*
sch
=
NULL
;
SQW
orker
TaskStatus
*
task
=
NULL
;
int32_t
qwCancelTask
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
queryId
,
uint64_t
taskId
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
if
(
TSDB_CODE_SUCCESS
!=
qwAcquireScheduler
(
QW_READ
,
mgmt
,
schedulerId
,
&
sch
))
{
QW_ERR_RET
(
qwSwitchTaskStatus
(
mgmt
,
schedulerId
,
queryId
,
taskId
,
JOB_TASK_STATUS_NOT_START
));
QW_ERR_RET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
schedulerId
,
&
sch
));
}
QW_ERR_RET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
sId
,
&
sch
,
QW_NOT_EXIST_ADD
));
if
(
qwAcquireTask
(
QW_READ
,
sch
,
queryId
,
taskId
,
&
task
))
{
code
=
qwSwitchTaskStatus
(
mgmt
,
schedulerId
,
queryId
,
taskId
,
JOB_TASK_STATUS_NOT_START
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
code
=
qwAddTask
(
mgmt
,
sId
,
queryId
,
taskId
,
JOB_TASK_STATUS_NOT_START
,
QW_EXIST_ACQUIRE
,
&
sch
,
&
task
);
if
(
code
)
{
qwReleaseScheduler
(
QW_READ
,
mgmt
);
QW_ERR_RET
(
code
);
}
QW_ERR_JRET
(
qwAcquireTask
(
QW_READ
,
sch
,
queryId
,
taskId
,
&
task
));
}
QW_LOCK
(
QW_WRITE
,
&
task
->
lock
);
...
...
@@ -423,6 +419,7 @@ int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId,
}
QW_UNLOCK
(
QW_WRITE
,
&
task
->
lock
);
qwReleaseTask
(
QW_READ
,
sch
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
...
...
@@ -449,9 +446,9 @@ _return:
int32_t
qwDropTask
(
SQWorkerMgmt
*
mgmt
,
uint64_t
s
cheduler
Id
,
uint64_t
queryId
,
uint64_t
taskId
)
{
SQW
orker
SchStatus
*
sch
=
NULL
;
SQW
orker
TaskStatus
*
task
=
NULL
;
int32_t
qwDropTask
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
queryId
,
uint64_t
taskId
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
char
id
[
sizeof
(
queryId
)
+
sizeof
(
taskId
)]
=
{
0
};
QW_SET_QTID
(
id
,
queryId
,
taskId
);
...
...
@@ -462,15 +459,15 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, u
}
QW_UNLOCK
(
QW_WRITE
,
&
mgmt
->
resLock
);
if
(
TSDB_CODE_SUCCESS
!=
qwAcquireScheduler
(
QW_WRITE
,
mgmt
,
s
chedulerId
,
&
sch
))
{
qWarn
(
"scheduler %"
PRIx64
" doesn't exist"
,
s
cheduler
Id
);
if
(
TSDB_CODE_SUCCESS
!=
qwAcquireScheduler
(
QW_WRITE
,
mgmt
,
s
Id
,
&
sch
,
QW_NOT_EXIST_RET_ERR
))
{
qWarn
(
"scheduler %"
PRIx64
" doesn't exist"
,
sId
);
return
TSDB_CODE_SUCCESS
;
}
if
(
qwAcquireTask
(
QW_WRITE
,
sch
,
queryId
,
taskId
,
&
task
))
{
qwReleaseScheduler
(
QW_WRITE
,
mgmt
);
qWarn
(
"scheduler %"
PRIx64
" queryId %"
PRIx64
" taskId:%"
PRIx64
" doesn't exist"
,
s
cheduler
Id
,
queryId
,
taskId
);
qWarn
(
"scheduler %"
PRIx64
" queryId %"
PRIx64
" taskId:%"
PRIx64
" doesn't exist"
,
sId
,
queryId
,
taskId
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -483,21 +480,21 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, u
}
int32_t
qwCancelDropTask
(
SQWorkerMgmt
*
mgmt
,
uint64_t
s
cheduler
Id
,
uint64_t
queryId
,
uint64_t
taskId
)
{
SQW
orker
SchStatus
*
sch
=
NULL
;
SQW
orker
TaskStatus
*
task
=
NULL
;
int32_t
qwCancelDropTask
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
queryId
,
uint64_t
taskId
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
if
(
TSDB_CODE_SUCCESS
!=
qwAcquireScheduler
(
QW_READ
,
mgmt
,
schedulerId
,
&
sch
))
{
qWarn
(
"scheduler %"
PRIx64
" doesn't exist"
,
schedulerId
);
return
TSDB_CODE_SUCCESS
;
}
QW_ERR_RET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
sId
,
&
sch
,
QW_NOT_EXIST_ADD
));
if
(
qwAcquireTask
(
QW_READ
,
sch
,
queryId
,
taskId
,
&
task
))
{
qwReleaseScheduler
(
QW_READ
,
mgmt
);
qWarn
(
"scheduler %"
PRIx64
" queryId %"
PRIx64
" taskId:%"
PRIx64
" doesn't exist"
,
schedulerId
,
queryId
,
taskId
);
return
TSDB_CODE_SUCCESS
;
code
=
qwAddTask
(
mgmt
,
sId
,
queryId
,
taskId
,
JOB_TASK_STATUS_NOT_START
,
QW_EXIST_ACQUIRE
,
&
sch
,
&
task
);
if
(
code
)
{
qwReleaseScheduler
(
QW_READ
,
mgmt
);
QW_ERR_RET
(
code
);
}
}
QW_LOCK
(
QW_WRITE
,
&
task
->
lock
);
...
...
@@ -508,7 +505,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t quer
int8_t
newStatus
=
0
;
if
(
task
->
status
==
JOB_TASK_STATUS_EXECUTING
)
{
newStatus
=
JOB_TASK_STATUS_
CANCELL
ING
;
newStatus
=
JOB_TASK_STATUS_
DROPP
ING
;
QW_ERR_JRET
(
qwUpdateTaskInfo
(
task
,
QW_TASK_INFO_STATUS
,
&
newStatus
));
}
else
if
(
task
->
status
==
JOB_TASK_STATUS_CANCELLING
||
task
->
status
==
JOB_TASK_STATUS_DROPPING
||
task
->
status
==
JOB_TASK_STATUS_NOT_START
)
{
QW_UNLOCK
(
QW_WRITE
,
&
task
->
lock
);
...
...
@@ -521,7 +518,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t quer
qwReleaseTask
(
QW_READ
,
sch
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
QW_ERR_RET
(
qwDropTask
(
mgmt
,
s
cheduler
Id
,
queryId
,
taskId
));
QW_ERR_RET
(
qwDropTask
(
mgmt
,
sId
,
queryId
,
taskId
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -604,6 +601,7 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) {
}
SRpcMsg
rpcRsp
=
{
.
msgType
=
pMsg
->
msgType
+
1
,
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
...
...
@@ -673,12 +671,12 @@ int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) {
int32_t
qwCheckAndSendReadyRsp
(
SQWorkerMgmt
*
mgmt
,
uint64_t
s
cheduler
Id
,
uint64_t
queryId
,
uint64_t
taskId
,
SRpcMsg
*
pMsg
,
int32_t
rspCode
)
{
SQW
orker
SchStatus
*
sch
=
NULL
;
SQW
orker
TaskStatus
*
task
=
NULL
;
int32_t
qwCheckAndSendReadyRsp
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
queryId
,
uint64_t
taskId
,
SRpcMsg
*
pMsg
,
int32_t
rspCode
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
QW_ERR_RET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
s
chedulerId
,
&
sch
));
QW_ERR_RET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
s
Id
,
&
sch
,
QW_NOT_EXIST_RET_ERR
));
QW_ERR_JRET
(
qwAcquireTask
(
QW_READ
,
sch
,
queryId
,
taskId
,
&
task
));
...
...
@@ -706,10 +704,8 @@ _return:
if
(
task
)
{
QW_UNLOCK
(
QW_WRITE
,
&
task
->
lock
);
}
if
(
sch
)
{
qwReleaseTask
(
QW_READ
,
sch
);
}
qwReleaseScheduler
(
QW_READ
,
mgmt
);
...
...
@@ -717,12 +713,12 @@ _return:
QW_RET
(
code
);
}
int32_t
qwSetAndSendReadyRsp
(
SQWorkerMgmt
*
mgmt
,
uint64_t
s
cheduler
Id
,
uint64_t
queryId
,
uint64_t
taskId
,
SRpcMsg
*
pMsg
)
{
SQW
orker
SchStatus
*
sch
=
NULL
;
SQW
orker
TaskStatus
*
task
=
NULL
;
int32_t
qwSetAndSendReadyRsp
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
queryId
,
uint64_t
taskId
,
SRpcMsg
*
pMsg
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
QW_ERR_RET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
s
chedulerId
,
&
sch
));
QW_ERR_RET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
s
Id
,
&
sch
,
QW_NOT_EXIST_RET_ERR
));
QW_ERR_JRET
(
qwAcquireTask
(
QW_READ
,
sch
,
queryId
,
taskId
,
&
task
));
...
...
@@ -745,9 +741,6 @@ _return:
if
(
task
)
{
QW_UNLOCK
(
QW_WRITE
,
&
task
->
lock
);
}
if
(
sch
)
{
qwReleaseTask
(
QW_READ
,
sch
);
}
...
...
@@ -756,15 +749,15 @@ _return:
QW_RET
(
code
);
}
int32_t
qwCheckTaskCancelDrop
(
SQWorkerMgmt
*
mgmt
,
uint64_t
s
cheduler
Id
,
uint64_t
queryId
,
uint64_t
taskId
,
bool
*
needStop
)
{
SQW
orker
SchStatus
*
sch
=
NULL
;
SQW
orker
TaskStatus
*
task
=
NULL
;
int32_t
qwCheckTaskCancelDrop
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
queryId
,
uint64_t
taskId
,
bool
*
needStop
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
int8_t
status
=
JOB_TASK_STATUS_CANCELLED
;
*
needStop
=
false
;
if
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
s
chedulerId
,
&
sch
))
{
if
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
s
Id
,
&
sch
,
QW_NOT_EXIST_RET_ERR
))
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -776,11 +769,13 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_
QW_LOCK
(
QW_READ
,
&
task
->
lock
);
if
((
!
task
->
cancel
)
&&
(
!
task
->
drop
))
{
qError
(
"no cancel or drop, but task:%"
PRIx64
" exists"
,
taskId
);
QW_UNLOCK
(
QW_READ
,
&
task
->
lock
);
qwReleaseTask
(
QW_READ
,
sch
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
return
TSDB_CODE_SUCCESS
;
QW_RET
(
TSDB_CODE_QRY_APP_ERROR
)
;
}
QW_UNLOCK
(
QW_READ
,
&
task
->
lock
);
...
...
@@ -791,30 +786,40 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_
QW_LOCK
(
QW_WRITE
,
&
task
->
lock
);
qwUpdateTaskInfo
(
task
,
QW_TASK_INFO_STATUS
,
&
status
);
QW_UNLOCK
(
QW_WRITE
,
&
task
->
lock
);
}
else
if
(
task
->
drop
)
{
}
if
(
task
->
drop
)
{
qwReleaseTask
(
QW_READ
,
sch
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
qwDropTask
(
mgmt
,
scheduler
Id
,
queryId
,
taskId
);
return
qwDropTask
(
mgmt
,
s
Id
,
queryId
,
taskId
);
}
qwReleaseTask
(
QW_READ
,
sch
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwHandleFetch
(
SQWorkerResCache
*
res
,
SQWorkerMgmt
*
mgmt
,
uint64_t
s
cheduler
Id
,
uint64_t
queryId
,
uint64_t
taskId
,
SRpcMsg
*
pMsg
)
{
SQW
orker
SchStatus
*
sch
=
NULL
;
SQW
orker
TaskStatus
*
task
=
NULL
;
int32_t
qwHandleFetch
(
SQWorkerResCache
*
res
,
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
queryId
,
uint64_t
taskId
,
SRpcMsg
*
pMsg
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
int32_t
needRsp
=
true
;
void
*
data
=
NULL
;
QW_ERR_JRET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
s
chedulerId
,
&
sch
));
QW_ERR_JRET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
s
Id
,
&
sch
,
QW_NOT_EXIST_RET_ERR
));
QW_ERR_JRET
(
qwAcquireTask
(
QW_READ
,
sch
,
queryId
,
taskId
,
&
task
));
QW_LOCK
(
QW_READ
,
&
task
->
lock
);
if
(
task
->
status
!=
JOB_TASK_STATUS_EXECUTING
&&
task
->
status
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
&&
task
->
status
!=
JOB_TASK_STATUS_SUCCEED
)
{
if
(
task
->
cancel
||
task
->
drop
)
{
qError
(
"task is already cancelled or dropped"
);
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
task
->
status
!=
JOB_TASK_STATUS_EXECUTING
&&
task
->
status
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
qError
(
"invalid status %d for fetch"
,
task
->
status
);
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
...
...
@@ -840,10 +845,10 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu
_return:
if
(
task
)
{
QW_UNLOCK
(
QW_READ
,
&
task
->
lock
);
qwReleaseTask
(
QW_READ
,
sch
);
}
if
(
sch
)
{
qwReleaseTask
(
QW_READ
,
sch
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
}
...
...
@@ -854,37 +859,46 @@ _return:
QW_RET
(
code
);
}
int32_t
qwQueryPostProcess
(
SQWorkerMgmt
*
mgmt
,
uint64_t
s
chedulerId
,
uint64_t
queryId
,
uint64_t
task
Id
,
int8_t
status
,
int32_t
errCode
)
{
SQW
orker
SchStatus
*
sch
=
NULL
;
SQW
orker
TaskStatus
*
task
=
NULL
;
int32_t
qwQueryPostProcess
(
SQWorkerMgmt
*
mgmt
,
uint64_t
s
Id
,
uint64_t
qId
,
uint64_t
t
Id
,
int8_t
status
,
int32_t
errCode
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
int8_t
newStatus
=
JOB_TASK_STATUS_CANCELLED
;
code
=
qwAcquireScheduler
(
QW_READ
,
mgmt
,
s
chedulerId
,
&
sch
);
code
=
qwAcquireScheduler
(
QW_READ
,
mgmt
,
s
Id
,
&
sch
,
QW_NOT_EXIST_ADD
);
if
(
code
)
{
qError
(
"s
chedulerId:%"
PRIx64
" not in cache"
,
scheduler
Id
);
qError
(
"s
Id:%"
PRIx64
" not in cache"
,
s
Id
);
QW_ERR_RET
(
code
);
}
code
=
qwAcquireTask
(
QW_READ
,
sch
,
q
ueryId
,
task
Id
,
&
task
);
code
=
qwAcquireTask
(
QW_READ
,
sch
,
q
Id
,
t
Id
,
&
task
);
if
(
code
)
{
qwReleaseScheduler
(
QW_READ
,
mgmt
);
qError
(
"schedulerId:%"
PRIx64
" queryId:%"
PRIx64
" taskId:%"
PRIx64
" not in cache"
,
schedulerId
,
queryId
,
taskId
);
QW_ERR_RET
(
code
);
if
(
JOB_TASK_STATUS_PARTIAL_SUCCEED
==
status
||
JOB_TASK_STATUS_SUCCEED
==
status
)
{
qError
(
"sId:%"
PRIx64
" queryId:%"
PRIx64
" taskId:%"
PRIx64
" not in cache"
,
sId
,
qId
,
tId
);
QW_ERR_RET
(
code
);
}
QW_ERR_RET
(
qwAddTask
(
mgmt
,
sId
,
qId
,
tId
,
status
,
QW_EXIST_ACQUIRE
,
&
sch
,
&
task
));
}
if
(
task
->
cancel
)
{
QW_LOCK
(
QW_WRITE
,
&
task
->
lock
);
qwUpdateTaskInfo
(
task
,
QW_TASK_INFO_STATUS
,
&
newStatus
);
QW_UNLOCK
(
QW_WRITE
,
&
task
->
lock
);
}
else
if
(
task
->
drop
)
{
}
if
(
task
->
drop
)
{
qwReleaseTask
(
QW_READ
,
sch
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
qwDropTask
(
mgmt
,
s
chedulerId
,
queryId
,
task
Id
);
qwDropTask
(
mgmt
,
s
Id
,
qId
,
t
Id
);
return
TSDB_CODE_SUCCESS
;
}
else
{
}
if
(
!
(
task
->
cancel
||
task
->
drop
))
{
QW_LOCK
(
QW_WRITE
,
&
task
->
lock
);
qwUpdateTaskInfo
(
task
,
QW_TASK_INFO_STATUS
,
&
status
);
task
->
code
=
errCode
;
...
...
@@ -938,24 +952,24 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
code
=
0
;
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
qError
(
"invalid query msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
QW_ERR_
J
RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
s
chedulerId
=
htobe64
(
msg
->
scheduler
Id
);
msg
->
s
Id
=
htobe64
(
msg
->
s
Id
);
msg
->
queryId
=
htobe64
(
msg
->
queryId
);
msg
->
taskId
=
htobe64
(
msg
->
taskId
);
msg
->
contentLen
=
ntohl
(
msg
->
contentLen
);
bool
queryDone
=
false
;
bool
queryRsp
=
false
;
bool
queryRsp
ed
=
false
;
bool
needStop
=
false
;
SSubplan
*
plan
=
NULL
;
int32_t
code
=
0
;
QW_ERR_JRET
(
qwCheckTaskCancelDrop
(
qWorkerMgmt
,
msg
->
s
cheduler
Id
,
msg
->
queryId
,
msg
->
taskId
,
&
needStop
));
QW_ERR_JRET
(
qwCheckTaskCancelDrop
(
qWorkerMgmt
,
msg
->
sId
,
msg
->
queryId
,
msg
->
taskId
,
&
needStop
));
if
(
needStop
)
{
qWarn
(
"task need stop"
);
QW_ERR_JRET
(
TSDB_CODE_QRY_TASK_CANCELLED
);
...
...
@@ -963,7 +977,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
code
=
qStringToSubplan
(
msg
->
msg
,
&
plan
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
qError
(
"schId:%"
PRIx64
",qId:%"
PRIx64
",taskId:%"
PRIx64
" string to subplan failed, code:%d"
,
msg
->
s
cheduler
Id
,
msg
->
queryId
,
msg
->
taskId
,
code
);
qError
(
"schId:%"
PRIx64
",qId:%"
PRIx64
",taskId:%"
PRIx64
" string to subplan failed, code:%d"
,
msg
->
sId
,
msg
->
queryId
,
msg
->
taskId
,
code
);
QW_ERR_JRET
(
code
);
}
...
...
@@ -974,12 +988,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if
(
code
)
{
QW_ERR_JRET
(
code
);
}
else
{
QW_ERR_JRET
(
qw
SwitchTaskStatus
(
qWorkerMgmt
,
msg
->
schedulerId
,
msg
->
queryId
,
msg
->
taskId
,
JOB_TASK_STATUS_EXECUTING
));
QW_ERR_JRET
(
qw
AddTask
(
qWorkerMgmt
,
msg
->
sId
,
msg
->
queryId
,
msg
->
taskId
,
JOB_TASK_STATUS_EXECUTING
,
QW_EXIST_RET_ERR
,
NULL
,
NULL
));
}
QW_ERR_JRET
(
qwBuildAndSendQueryRsp
(
pMsg
,
TSDB_CODE_SUCCESS
));
queryRsp
=
true
;
queryRsp
ed
=
true
;
//TODO call executer to execute subquery
code
=
0
;
...
...
@@ -990,29 +1004,29 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if
(
code
)
{
QW_ERR_JRET
(
code
);
}
else
{
QW_ERR_JRET
(
qwAddTaskRes
ult
(
qWorkerMgmt
,
msg
->
queryId
,
msg
->
taskId
,
data
));
QW_ERR_JRET
(
qwAddTaskRes
Cache
(
qWorkerMgmt
,
msg
->
queryId
,
msg
->
taskId
,
data
));
QW_ERR_JRET
(
qw
SwitchTaskStatus
(
qWorkerMgmt
,
msg
->
scheduler
Id
,
msg
->
queryId
,
msg
->
taskId
,
JOB_TASK_STATUS_PARTIAL_SUCCEED
));
}
QW_ERR_JRET
(
qw
UpdateTaskStatus
(
qWorkerMgmt
,
msg
->
s
Id
,
msg
->
queryId
,
msg
->
taskId
,
JOB_TASK_STATUS_PARTIAL_SUCCEED
));
}
_return:
if
(
queryRsp
)
{
code
=
qwCheckAndSendReadyRsp
(
qWorkerMgmt
,
msg
->
s
cheduler
Id
,
msg
->
queryId
,
msg
->
taskId
,
pMsg
,
code
);
if
(
queryRsp
ed
)
{
code
=
qwCheckAndSendReadyRsp
(
qWorkerMgmt
,
msg
->
sId
,
msg
->
queryId
,
msg
->
taskId
,
pMsg
,
code
);
}
else
{
code
=
qwBuildAndSendQueryRsp
(
pMsg
,
code
);
}
int8_t
status
=
0
;
if
(
TSDB_CODE_SUCCESS
!=
code
||
queryDone
)
{
if
(
code
)
{
status
=
JOB_TASK_STATUS_FAILED
;
//TODO set CANCELLED from code
}
else
{
status
=
JOB_TASK_STATUS_SUCCEED
;
}
qwQueryPostProcess
(
qWorkerMgmt
,
msg
->
schedulerId
,
msg
->
queryId
,
msg
->
taskId
,
status
,
code
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
status
=
JOB_TASK_STATUS_FAILED
;
}
else
if
(
queryDone
)
{
status
=
JOB_TASK_STATUS_SUCCEED
;
}
else
{
status
=
JOB_TASK_STATUS_PARTIAL_SUCCEED
;
}
qwQueryPostProcess
(
qWorkerMgmt
,
msg
->
sId
,
msg
->
queryId
,
msg
->
taskId
,
status
,
code
);
QW_RET
(
code
);
}
...
...
@@ -1023,12 +1037,16 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
}
SResReadyMsg
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
=
sizeof
(
*
msg
))
{
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid task status msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
QW_ERR_RET
(
qwSetAndSendReadyRsp
(
qWorkerMgmt
,
msg
->
schedulerId
,
msg
->
queryId
,
msg
->
taskId
,
pMsg
));
msg
->
sId
=
htobe64
(
msg
->
sId
);
msg
->
queryId
=
htobe64
(
msg
->
queryId
);
msg
->
taskId
=
htobe64
(
msg
->
taskId
);
QW_ERR_RET
(
qwSetAndSendReadyRsp
(
qWorkerMgmt
,
msg
->
sId
,
msg
->
queryId
,
msg
->
taskId
,
pMsg
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1040,14 +1058,16 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t
code
=
0
;
SSchTasksStatusMsg
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
=
sizeof
(
*
msg
))
{
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid task status msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
htobe64
(
msg
->
sId
);
SSchedulerStatusRsp
*
sStatus
=
NULL
;
QW_ERR_JRET
(
qwGetSchTasksStatus
(
qWorkerMgmt
,
msg
->
s
cheduler
Id
,
&
sStatus
));
QW_ERR_JRET
(
qwGetSchTasksStatus
(
qWorkerMgmt
,
msg
->
sId
,
&
sStatus
));
_return:
...
...
@@ -1062,11 +1082,15 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
SResFetchMsg
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
=
sizeof
(
*
msg
))
{
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
QW_ERR_RET
(
qwUpdateSchLastAccess
(
qWorkerMgmt
,
msg
->
schedulerId
));
msg
->
sId
=
htobe64
(
msg
->
sId
);
msg
->
queryId
=
htobe64
(
msg
->
queryId
);
msg
->
taskId
=
htobe64
(
msg
->
taskId
);
QW_ERR_RET
(
qwUpdateSchLastAccess
(
qWorkerMgmt
,
msg
->
sId
));
void
*
data
=
NULL
;
SQWorkerResCache
*
res
=
NULL
;
...
...
@@ -1074,7 +1098,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET
(
qwAcquireTaskResCache
(
QW_READ
,
qWorkerMgmt
,
msg
->
queryId
,
msg
->
taskId
,
&
res
));
QW_ERR_JRET
(
qwHandleFetch
(
res
,
qWorkerMgmt
,
msg
->
s
cheduler
Id
,
msg
->
queryId
,
msg
->
taskId
,
pMsg
));
QW_ERR_JRET
(
qwHandleFetch
(
res
,
qWorkerMgmt
,
msg
->
sId
,
msg
->
queryId
,
msg
->
taskId
,
pMsg
));
_return:
...
...
@@ -1090,12 +1114,16 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t
code
=
0
;
STaskCancelMsg
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
=
sizeof
(
*
msg
))
{
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid task cancel msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
QW_ERR_JRET
(
qwCancelTask
(
qWorkerMgmt
,
msg
->
schedulerId
,
msg
->
queryId
,
msg
->
taskId
));
msg
->
sId
=
htobe64
(
msg
->
sId
);
msg
->
queryId
=
htobe64
(
msg
->
queryId
);
msg
->
taskId
=
htobe64
(
msg
->
taskId
);
QW_ERR_JRET
(
qwCancelTask
(
qWorkerMgmt
,
msg
->
sId
,
msg
->
queryId
,
msg
->
taskId
));
_return:
...
...
@@ -1111,12 +1139,16 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t
code
=
0
;
STaskDropMsg
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
=
sizeof
(
*
msg
))
{
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid task drop msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
QW_ERR_JRET
(
qwCancelDropTask
(
qWorkerMgmt
,
msg
->
schedulerId
,
msg
->
queryId
,
msg
->
taskId
));
msg
->
sId
=
htobe64
(
msg
->
sId
);
msg
->
queryId
=
htobe64
(
msg
->
queryId
);
msg
->
taskId
=
htobe64
(
msg
->
taskId
);
QW_ERR_JRET
(
qwCancelDropTask
(
qWorkerMgmt
,
msg
->
sId
,
msg
->
queryId
,
msg
->
taskId
));
_return:
...
...
@@ -1125,6 +1157,31 @@ _return:
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerContinueQuery
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
0
;
int8_t
status
=
0
;
bool
queryDone
=
false
;
uint64_t
sId
,
qId
,
tId
;
//TODO call executer to continue execute subquery
code
=
0
;
void
*
data
=
NULL
;
queryDone
=
false
;
//TODO call executer to continue execute subquery
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
status
=
JOB_TASK_STATUS_FAILED
;
}
else
if
(
queryDone
)
{
status
=
JOB_TASK_STATUS_SUCCEED
;
}
else
{
status
=
JOB_TASK_STATUS_PARTIAL_SUCCEED
;
}
code
=
qwQueryPostProcess
(
qWorkerMgmt
,
sId
,
qId
,
tId
,
status
,
code
);
QW_RET
(
code
);
}
void
qWorkerDestroy
(
void
**
qWorkerMgmt
)
{
if
(
NULL
==
qWorkerMgmt
||
NULL
==
*
qWorkerMgmt
)
{
...
...
source/libs/qworker/test/qworkerTests.cpp
浏览文件 @
3c0a7e63
...
...
@@ -36,10 +36,25 @@
namespace
{
bool
testStop
=
false
;
int32_t
qwtStringToPlan
(
const
char
*
str
,
SSubplan
**
subplan
)
{
return
0
;
}
void
qwtRpcSendResponse
(
const
SRpcMsg
*
pRsp
)
{
if
(
TDMT_VND_TASKS_STATUS_RSP
==
pRsp
->
msgType
)
{
SSchedulerStatusRsp
*
rsp
=
(
SSchedulerStatusRsp
*
)
pRsp
->
pCont
;
printf
(
"task num:%d
\n
"
,
rsp
->
num
);
for
(
int32_t
i
=
0
;
i
<
rsp
->
num
;
++
i
)
{
STaskStatus
*
task
=
&
rsp
->
status
[
i
];
printf
(
"qId:%"
PRIx64
",tId:%"
PRIx64
",status:%d
\n
"
,
task
->
queryId
,
task
->
taskId
,
task
->
status
);
}
}
return
;
}
void
stubSetStringToPlan
()
{
static
Stub
stub
;
...
...
@@ -54,11 +69,148 @@ void stubSetStringToPlan() {
}
}
void
stubSetRpcSendResponse
()
{
static
Stub
stub
;
stub
.
set
(
rpcSendResponse
,
qwtRpcSendResponse
);
{
AddrAny
any
(
"libplanner.so"
);
std
::
map
<
std
::
string
,
void
*>
result
;
any
.
get_global_func_addr_dynsym
(
"^rpcSendResponse$"
,
result
);
for
(
const
auto
&
f
:
result
)
{
stub
.
set
(
f
.
second
,
qwtRpcSendResponse
);
}
}
}
void
*
queryThread
(
void
*
param
)
{
SRpcMsg
queryRpc
=
{
0
};
int32_t
code
=
0
;
uint32_t
n
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
void
*
mgmt
=
param
;
SSubQueryMsg
*
queryMsg
=
(
SSubQueryMsg
*
)
calloc
(
1
,
sizeof
(
SSubQueryMsg
)
+
100
);
queryMsg
->
queryId
=
htobe64
(
1
);
queryMsg
->
sId
=
htobe64
(
1
);
queryMsg
->
taskId
=
htobe64
(
1
);
queryMsg
->
contentLen
=
htonl
(
100
);
queryRpc
.
pCont
=
queryMsg
;
queryRpc
.
contLen
=
sizeof
(
SSubQueryMsg
)
+
100
;
while
(
!
testStop
)
{
qWorkerProcessQueryMsg
(
mockPointer
,
mgmt
,
&
queryRpc
);
usleep
(
rand
()
%
5
);
if
(
++
n
%
50000
==
0
)
{
printf
(
"query:%d
\n
"
,
n
);
}
}
return
NULL
;
}
void
*
readyThread
(
void
*
param
)
{
SRpcMsg
readyRpc
=
{
0
};
int32_t
code
=
0
;
uint32_t
n
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
void
*
mgmt
=
param
;
SResReadyMsg
readyMsg
=
{
0
};
readyMsg
.
sId
=
htobe64
(
1
);
readyMsg
.
queryId
=
htobe64
(
1
);
readyMsg
.
taskId
=
htobe64
(
1
);
readyRpc
.
pCont
=
&
readyMsg
;
readyRpc
.
contLen
=
sizeof
(
SResReadyMsg
);
while
(
!
testStop
)
{
code
=
qWorkerProcessReadyMsg
(
mockPointer
,
mgmt
,
&
readyRpc
);
usleep
(
rand
()
%
5
);
if
(
++
n
%
50000
==
0
)
{
printf
(
"ready:%d
\n
"
,
n
);
}
}
return
NULL
;
}
void
*
fetchThread
(
void
*
param
)
{
SRpcMsg
fetchRpc
=
{
0
};
int32_t
code
=
0
;
uint32_t
n
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
void
*
mgmt
=
param
;
SResFetchMsg
fetchMsg
=
{
0
};
fetchMsg
.
sId
=
htobe64
(
1
);
fetchMsg
.
queryId
=
htobe64
(
1
);
fetchMsg
.
taskId
=
htobe64
(
1
);
fetchRpc
.
pCont
=
&
fetchMsg
;
fetchRpc
.
contLen
=
sizeof
(
SResFetchMsg
);
while
(
!
testStop
)
{
code
=
qWorkerProcessFetchMsg
(
mockPointer
,
mgmt
,
&
fetchRpc
);
usleep
(
rand
()
%
5
);
if
(
++
n
%
50000
==
0
)
{
printf
(
"fetch:%d
\n
"
,
n
);
}
}
return
NULL
;
}
void
*
dropThread
(
void
*
param
)
{
SRpcMsg
dropRpc
=
{
0
};
int32_t
code
=
0
;
uint32_t
n
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
void
*
mgmt
=
param
;
STaskDropMsg
dropMsg
=
{
0
};
dropMsg
.
sId
=
htobe64
(
1
);
dropMsg
.
queryId
=
htobe64
(
1
);
dropMsg
.
taskId
=
htobe64
(
1
);
dropRpc
.
pCont
=
&
dropMsg
;
dropRpc
.
contLen
=
sizeof
(
STaskDropMsg
);
while
(
!
testStop
)
{
code
=
qWorkerProcessDropMsg
(
mockPointer
,
mgmt
,
&
dropRpc
);
usleep
(
rand
()
%
5
);
if
(
++
n
%
50000
==
0
)
{
printf
(
"drop:%d
\n
"
,
n
);
}
}
return
NULL
;
}
void
*
statusThread
(
void
*
param
)
{
SRpcMsg
statusRpc
=
{
0
};
int32_t
code
=
0
;
uint32_t
n
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
void
*
mgmt
=
param
;
SSchTasksStatusMsg
statusMsg
=
{
0
};
statusMsg
.
sId
=
htobe64
(
1
);
statusRpc
.
pCont
=
&
statusMsg
;
statusRpc
.
contLen
=
sizeof
(
SSchTasksStatusMsg
);
statusRpc
.
msgType
=
TDMT_VND_TASKS_STATUS
;
while
(
!
testStop
)
{
statusMsg
.
sId
=
htobe64
(
1
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
usleep
(
rand
()
%
5
);
if
(
++
n
%
50000
==
0
)
{
printf
(
"status:%d
\n
"
,
n
);
}
}
return
NULL
;
}
}
TEST
(
testCase
,
normalCase
)
{
TEST
(
seqTest
,
normalCase
)
{
void
*
mgmt
=
NULL
;
int32_t
code
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
...
...
@@ -66,48 +218,254 @@ TEST(testCase, normalCase) {
SRpcMsg
readyRpc
=
{
0
};
SRpcMsg
fetchRpc
=
{
0
};
SRpcMsg
dropRpc
=
{
0
};
SRpcMsg
statusRpc
=
{
0
};
SSubQueryMsg
*
queryMsg
=
(
SSubQueryMsg
*
)
calloc
(
1
,
sizeof
(
SSubQueryMsg
)
+
100
);
queryMsg
->
queryId
=
htobe64
(
1
);
queryMsg
->
s
cheduler
Id
=
htobe64
(
1
);
queryMsg
->
sId
=
htobe64
(
1
);
queryMsg
->
taskId
=
htobe64
(
1
);
queryMsg
->
contentLen
=
htonl
(
100
);
queryRpc
.
pCont
=
queryMsg
;
queryRpc
.
contLen
=
sizeof
(
SSubQueryMsg
)
+
100
;
SResReadyMsg
readyMsg
=
{
0
};
readyMsg
.
s
cheduler
Id
=
htobe64
(
1
);
readyMsg
.
sId
=
htobe64
(
1
);
readyMsg
.
queryId
=
htobe64
(
1
);
readyMsg
.
taskId
=
htobe64
(
1
);
readyRpc
.
pCont
=
&
readyMsg
;
readyRpc
.
contLen
=
sizeof
(
SResReadyMsg
);
SResFetchMsg
fetchMsg
=
{
0
};
fetchMsg
.
s
cheduler
Id
=
htobe64
(
1
);
fetchMsg
.
sId
=
htobe64
(
1
);
fetchMsg
.
queryId
=
htobe64
(
1
);
fetchMsg
.
taskId
=
htobe64
(
1
);
fetchRpc
.
pCont
=
&
fetchMsg
;
fetchRpc
.
contLen
=
sizeof
(
SResFetchMsg
);
STaskDropMsg
dropMsg
=
{
0
};
dropMsg
.
s
cheduler
Id
=
htobe64
(
1
);
dropMsg
.
sId
=
htobe64
(
1
);
dropMsg
.
queryId
=
htobe64
(
1
);
dropMsg
.
taskId
=
htobe64
(
1
);
dropRpc
.
pCont
=
&
dropMsg
;
dropRpc
.
contLen
=
sizeof
(
STaskDropMsg
);
SSchTasksStatusMsg
statusMsg
=
{
0
};
statusMsg
.
sId
=
htobe64
(
1
);
statusRpc
.
pCont
=
&
statusMsg
;
statusRpc
.
contLen
=
sizeof
(
SSchTasksStatusMsg
);
statusRpc
.
msgType
=
TDMT_VND_TASKS_STATUS
;
stubSetStringToPlan
();
stubSetRpcSendResponse
();
code
=
qWorkerInit
(
NULL
,
&
mgmt
);
ASSERT_EQ
(
code
,
0
);
statusMsg
.
sId
=
htobe64
(
1
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
code
=
qWorkerProcessQueryMsg
(
mockPointer
,
mgmt
,
&
queryRpc
);
ASSERT_EQ
(
code
,
0
);
statusMsg
.
sId
=
htobe64
(
1
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
code
=
qWorkerProcessReadyMsg
(
mockPointer
,
mgmt
,
&
readyRpc
);
ASSERT_EQ
(
code
,
0
);
statusMsg
.
sId
=
htobe64
(
1
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
code
=
qWorkerProcessFetchMsg
(
mockPointer
,
mgmt
,
&
fetchRpc
);
ASSERT_EQ
(
code
,
0
);
statusMsg
.
sId
=
htobe64
(
1
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
code
=
qWorkerProcessDropMsg
(
mockPointer
,
mgmt
,
&
dropRpc
);
ASSERT_EQ
(
code
,
0
);
statusMsg
.
sId
=
htobe64
(
1
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
qWorkerDestroy
(
&
mgmt
);
}
TEST
(
seqTest
,
cancelFirst
)
{
void
*
mgmt
=
NULL
;
int32_t
code
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SRpcMsg
queryRpc
=
{
0
};
SRpcMsg
dropRpc
=
{
0
};
SRpcMsg
statusRpc
=
{
0
};
SSubQueryMsg
*
queryMsg
=
(
SSubQueryMsg
*
)
calloc
(
1
,
sizeof
(
SSubQueryMsg
)
+
100
);
queryMsg
->
queryId
=
htobe64
(
1
);
queryMsg
->
sId
=
htobe64
(
1
);
queryMsg
->
taskId
=
htobe64
(
1
);
queryMsg
->
contentLen
=
htonl
(
100
);
queryRpc
.
pCont
=
queryMsg
;
queryRpc
.
contLen
=
sizeof
(
SSubQueryMsg
)
+
100
;
STaskDropMsg
dropMsg
=
{
0
};
dropMsg
.
sId
=
htobe64
(
1
);
dropMsg
.
queryId
=
htobe64
(
1
);
dropMsg
.
taskId
=
htobe64
(
1
);
dropRpc
.
pCont
=
&
dropMsg
;
dropRpc
.
contLen
=
sizeof
(
STaskDropMsg
);
SSchTasksStatusMsg
statusMsg
=
{
0
};
statusMsg
.
sId
=
htobe64
(
1
);
statusRpc
.
pCont
=
&
statusMsg
;
statusRpc
.
contLen
=
sizeof
(
SSchTasksStatusMsg
);
statusRpc
.
msgType
=
TDMT_VND_TASKS_STATUS
;
stubSetStringToPlan
();
stubSetRpcSendResponse
();
code
=
qWorkerInit
(
NULL
,
&
mgmt
);
ASSERT_EQ
(
code
,
0
);
statusMsg
.
sId
=
htobe64
(
1
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
code
=
qWorkerProcessDropMsg
(
mockPointer
,
mgmt
,
&
dropRpc
);
ASSERT_EQ
(
code
,
0
);
statusMsg
.
sId
=
htobe64
(
1
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
code
=
qWorkerProcessQueryMsg
(
mockPointer
,
mgmt
,
&
queryRpc
);
ASSERT_EQ
(
code
,
0
);
statusMsg
.
sId
=
htobe64
(
1
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
qWorkerDestroy
(
&
mgmt
);
}
TEST
(
seqTest
,
randCase
)
{
void
*
mgmt
=
NULL
;
int32_t
code
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
SRpcMsg
queryRpc
=
{
0
};
SRpcMsg
readyRpc
=
{
0
};
SRpcMsg
fetchRpc
=
{
0
};
SRpcMsg
dropRpc
=
{
0
};
SRpcMsg
statusRpc
=
{
0
};
SSubQueryMsg
*
queryMsg
=
(
SSubQueryMsg
*
)
calloc
(
1
,
sizeof
(
SSubQueryMsg
)
+
100
);
queryMsg
->
queryId
=
htobe64
(
1
);
queryMsg
->
sId
=
htobe64
(
1
);
queryMsg
->
taskId
=
htobe64
(
1
);
queryMsg
->
contentLen
=
htonl
(
100
);
queryRpc
.
pCont
=
queryMsg
;
queryRpc
.
contLen
=
sizeof
(
SSubQueryMsg
)
+
100
;
SResReadyMsg
readyMsg
=
{
0
};
readyMsg
.
sId
=
htobe64
(
1
);
readyMsg
.
queryId
=
htobe64
(
1
);
readyMsg
.
taskId
=
htobe64
(
1
);
readyRpc
.
pCont
=
&
readyMsg
;
readyRpc
.
contLen
=
sizeof
(
SResReadyMsg
);
SResFetchMsg
fetchMsg
=
{
0
};
fetchMsg
.
sId
=
htobe64
(
1
);
fetchMsg
.
queryId
=
htobe64
(
1
);
fetchMsg
.
taskId
=
htobe64
(
1
);
fetchRpc
.
pCont
=
&
fetchMsg
;
fetchRpc
.
contLen
=
sizeof
(
SResFetchMsg
);
STaskDropMsg
dropMsg
=
{
0
};
dropMsg
.
sId
=
htobe64
(
1
);
dropMsg
.
queryId
=
htobe64
(
1
);
dropMsg
.
taskId
=
htobe64
(
1
);
dropRpc
.
pCont
=
&
dropMsg
;
dropRpc
.
contLen
=
sizeof
(
STaskDropMsg
);
SSchTasksStatusMsg
statusMsg
=
{
0
};
statusMsg
.
sId
=
htobe64
(
1
);
statusRpc
.
pCont
=
&
statusMsg
;
statusRpc
.
contLen
=
sizeof
(
SSchTasksStatusMsg
);
statusRpc
.
msgType
=
TDMT_VND_TASKS_STATUS
;
stubSetStringToPlan
();
stubSetRpcSendResponse
();
srand
(
time
(
NULL
));
code
=
qWorkerInit
(
NULL
,
&
mgmt
);
ASSERT_EQ
(
code
,
0
);
int32_t
t
=
0
;
int32_t
maxr
=
10001
;
while
(
true
)
{
int32_t
r
=
rand
()
%
maxr
;
if
(
r
>=
0
&&
r
<
maxr
/
5
)
{
printf
(
"Query,%d
\n
"
,
t
++
);
code
=
qWorkerProcessQueryMsg
(
mockPointer
,
mgmt
,
&
queryRpc
);
}
else
if
(
r
>=
maxr
/
5
&&
r
<
maxr
*
2
/
5
)
{
printf
(
"Ready,%d
\n
"
,
t
++
);
code
=
qWorkerProcessReadyMsg
(
mockPointer
,
mgmt
,
&
readyRpc
);
}
else
if
(
r
>=
maxr
*
2
/
5
&&
r
<
maxr
*
3
/
5
)
{
printf
(
"Fetch,%d
\n
"
,
t
++
);
code
=
qWorkerProcessFetchMsg
(
mockPointer
,
mgmt
,
&
fetchRpc
);
}
else
if
(
r
>=
maxr
*
3
/
5
&&
r
<
maxr
*
4
/
5
)
{
printf
(
"Drop,%d
\n
"
,
t
++
);
code
=
qWorkerProcessDropMsg
(
mockPointer
,
mgmt
,
&
dropRpc
);
}
else
if
(
r
>=
maxr
*
4
/
5
&&
r
<
maxr
-
1
)
{
printf
(
"Status,%d
\n
"
,
t
++
);
statusMsg
.
sId
=
htobe64
(
1
);
code
=
qWorkerProcessStatusMsg
(
mockPointer
,
mgmt
,
&
statusRpc
);
ASSERT_EQ
(
code
,
0
);
}
else
{
printf
(
"QUIT RAND NOW"
);
break
;
}
}
qWorkerDestroy
(
&
mgmt
);
}
TEST
(
seqTest
,
multithreadRand
)
{
void
*
mgmt
=
NULL
;
int32_t
code
=
0
;
void
*
mockPointer
=
(
void
*
)
0x1
;
stubSetStringToPlan
();
stubSetRpcSendResponse
();
srand
(
time
(
NULL
));
code
=
qWorkerInit
(
NULL
,
&
mgmt
);
ASSERT_EQ
(
code
,
0
);
pthread_attr_t
thattr
;
pthread_attr_init
(
&
thattr
);
pthread_t
t1
,
t2
,
t3
,
t4
,
t5
;
pthread_create
(
&
(
t1
),
&
thattr
,
queryThread
,
mgmt
);
pthread_create
(
&
(
t2
),
&
thattr
,
readyThread
,
NULL
);
pthread_create
(
&
(
t3
),
&
thattr
,
fetchThread
,
NULL
);
pthread_create
(
&
(
t4
),
&
thattr
,
dropThread
,
NULL
);
pthread_create
(
&
(
t5
),
&
thattr
,
statusThread
,
NULL
);
int32_t
t
=
0
;
int32_t
maxr
=
10001
;
sleep
(
300
);
testStop
=
true
;
sleep
(
1
);
qWorkerDestroy
(
&
mgmt
);
}
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
3c0a7e63
...
...
@@ -38,11 +38,16 @@ enum {
typedef
struct
SSchedulerMgmt
{
uint64_t
taskId
;
uint64_t
s
cheduler
Id
;
uint64_t
sId
;
SSchedulerCfg
cfg
;
SHashObj
*
jobs
;
// key: queryId, value: SQueryJob*
}
SSchedulerMgmt
;
typedef
struct
SSchCallbackParam
{
uint64_t
queryId
;
uint64_t
taskId
;
}
SSchCallbackParam
;
typedef
struct
SSchLevel
{
int32_t
level
;
int8_t
status
;
...
...
@@ -120,6 +125,7 @@ typedef struct SSchJob {
extern
int32_t
schLaunchTask
(
SSchJob
*
job
,
SSchTask
*
task
);
extern
int32_t
schBuildAndSendMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
);
#ifdef __cplusplus
}
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
3c0a7e63
...
...
@@ -21,36 +21,6 @@
SSchedulerMgmt
schMgmt
=
{
0
};
int32_t
schBuildAndSendRequest
(
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
__taos_async_fn_t
fp
)
{
/*
SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT);
if (pRequest == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SRequestMsgBody body = {0};
buildConnectMsg(pRequest, &body);
int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
tsem_wait(&pRequest->body.rspSem);
destroyConnectMsg(&body);
if (pRequest->code != TSDB_CODE_SUCCESS) {
const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
printf("failed to connect to server, reason: %s\n\n", errorMsg);
destroyRequest(pRequest);
taos_close(pTscObj);
pTscObj = NULL;
} else {
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
destroyRequest(pRequest);
}
*/
}
int32_t
schBuildTaskRalation
(
SSchJob
*
job
,
SHashObj
*
planToTask
)
{
for
(
int32_t
i
=
0
;
i
<
job
->
levelNum
;
++
i
)
{
SSchLevel
*
level
=
taosArrayGet
(
job
->
levels
,
i
);
...
...
@@ -312,100 +282,6 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schAsyncSendMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
)
{
int32_t
msgSize
=
0
;
void
*
msg
=
NULL
;
switch
(
msgType
)
{
case
TDMT_VND_SUBMIT
:
{
if
(
NULL
==
task
->
msg
||
task
->
msgLen
<=
0
)
{
qError
(
"submit msg is NULL"
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
msgSize
=
task
->
msgLen
;
msg
=
task
->
msg
;
break
;
}
case
TDMT_VND_QUERY
:
{
if
(
NULL
==
task
->
msg
)
{
qError
(
"query msg is NULL"
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
msgSize
=
sizeof
(
SSubQueryMsg
)
+
task
->
msgLen
;
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
qError
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SSubQueryMsg
*
pMsg
=
msg
;
pMsg
->
schedulerId
=
htobe64
(
schMgmt
.
schedulerId
);
pMsg
->
queryId
=
htobe64
(
job
->
queryId
);
pMsg
->
taskId
=
htobe64
(
task
->
taskId
);
pMsg
->
contentLen
=
htonl
(
task
->
msgLen
);
memcpy
(
pMsg
->
msg
,
task
->
msg
,
task
->
msgLen
);
break
;
}
case
TDMT_VND_RES_READY
:
{
msgSize
=
sizeof
(
SResReadyMsg
);
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
qError
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SResReadyMsg
*
pMsg
=
msg
;
pMsg
->
schedulerId
=
htobe64
(
schMgmt
.
schedulerId
);
pMsg
->
queryId
=
htobe64
(
job
->
queryId
);
pMsg
->
taskId
=
htobe64
(
task
->
taskId
);
break
;
}
case
TDMT_VND_FETCH
:
{
if
(
NULL
==
task
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
msgSize
=
sizeof
(
SResFetchMsg
);
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
qError
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SResFetchMsg
*
pMsg
=
msg
;
pMsg
->
schedulerId
=
htobe64
(
schMgmt
.
schedulerId
);
pMsg
->
queryId
=
htobe64
(
job
->
queryId
);
pMsg
->
taskId
=
htobe64
(
task
->
taskId
);
break
;
}
case
TDMT_VND_DROP_TASK
:{
msgSize
=
sizeof
(
STaskDropMsg
);
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
qError
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
STaskDropMsg
*
pMsg
=
msg
;
pMsg
->
schedulerId
=
htobe64
(
schMgmt
.
schedulerId
);
pMsg
->
queryId
=
htobe64
(
job
->
queryId
);
pMsg
->
taskId
=
htobe64
(
task
->
taskId
);
break
;
}
default:
qError
(
"unknown msg type:%d"
,
msgType
);
break
;
}
//TODO SEND MSG
//taosAsyncExec(rpcSendRequest(void * shandle, const SEpSet * pEpSet, SRpcMsg * pMsg, int64_t * pRid), p, &code);
return
TSDB_CODE_SUCCESS
;
}
int32_t
schTaskCheckAndSetRetry
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
errCode
,
bool
*
needRetry
)
{
// TODO set retry or not based on task type/errCode/retry times/job status/available eps...
// TODO if needRetry, set task retry info
...
...
@@ -424,7 +300,7 @@ int32_t schFetchFromRemote(SSchJob *job) {
return
TSDB_CODE_SUCCESS
;
}
SCH_ERR_JRET
(
sch
Async
SendMsg
(
job
,
job
->
fetchTask
,
TDMT_VND_FETCH
));
SCH_ERR_JRET
(
sch
BuildAnd
SendMsg
(
job
,
job
->
fetchTask
,
TDMT_VND_FETCH
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -577,11 +453,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
sch
Handle
RspMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
)
{
int32_t
sch
Process
RspMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
switch
(
msgType
)
{
case
TDMT_VND_SUBMIT
:
{
case
TDMT_VND_SUBMIT
_RSP
:
{
SShellSubmitRspMsg
*
rsp
=
(
SShellSubmitRspMsg
*
)
msg
;
if
(
rsp
->
code
!=
TSDB_CODE_SUCCESS
)
{
SCH_ERR_JRET
(
schProcessOnTaskFailure
(
job
,
task
,
rsp
->
code
));
...
...
@@ -595,20 +471,20 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg
}
break
;
}
case
TDMT_VND_QUERY
:
{
case
TDMT_VND_QUERY
_RSP
:
{
SQueryTableRsp
*
rsp
=
(
SQueryTableRsp
*
)
msg
;
if
(
rsp
->
code
!=
TSDB_CODE_SUCCESS
)
{
SCH_ERR_JRET
(
schProcessOnTaskFailure
(
job
,
task
,
rsp
->
code
));
}
else
{
code
=
sch
Async
SendMsg
(
job
,
task
,
TDMT_VND_RES_READY
);
code
=
sch
BuildAnd
SendMsg
(
job
,
task
,
TDMT_VND_RES_READY
);
if
(
code
)
{
goto
_task_error
;
}
}
break
;
}
case
TDMT_VND_RES_READY
:
{
case
TDMT_VND_RES_READY
_RSP
:
{
SResReadyRsp
*
rsp
=
(
SResReadyRsp
*
)
msg
;
if
(
rsp
->
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -621,7 +497,7 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg
}
break
;
}
case
TDMT_VND_FETCH
:
{
case
TDMT_VND_FETCH
_RSP
:
{
SCH_ERR_JRET
(
rspCode
);
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
msg
;
...
...
@@ -631,6 +507,9 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg
SCH_ERR_JRET
(
schProcessOnDataFetched
(
job
));
break
;
}
case
TDMT_VND_DROP_TASK
:
{
}
default:
qError
(
"unknown msg type:%d received"
,
msgType
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
...
...
@@ -648,6 +527,211 @@ _return:
}
int32_t
schHandleCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
msgType
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
SSchCallbackParam
*
pParam
=
(
SSchCallbackParam
*
)
param
;
SSchJob
**
job
=
taosHashGet
(
schMgmt
.
jobs
,
&
pParam
->
queryId
,
sizeof
(
pParam
->
queryId
));
if
(
NULL
==
job
||
NULL
==
(
*
job
))
{
qError
(
"taosHashGet queryId:%"
PRIx64
" not exist"
,
pParam
->
queryId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
SSchTask
**
task
=
taosHashGet
((
*
job
)
->
execTasks
,
&
pParam
->
taskId
,
sizeof
(
pParam
->
taskId
));
if
(
NULL
==
task
||
NULL
==
(
*
task
))
{
qError
(
"taosHashGet taskId:%"
PRIx64
" not exist"
,
pParam
->
taskId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
schProcessRspMsg
(
*
job
,
*
task
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
);
_return:
tfree
(
param
);
SCH_RET
(
code
);
}
int32_t
schHandleSubmitCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
return
schHandleCallback
(
param
,
pMsg
,
TDMT_VND_SUBMIT_RSP
,
code
);
}
int32_t
schHandleQueryCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
return
schHandleCallback
(
param
,
pMsg
,
TDMT_VND_QUERY_RSP
,
code
);
}
int32_t
schHandleFetchCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
return
schHandleCallback
(
param
,
pMsg
,
TDMT_VND_FETCH_RSP
,
code
);
}
int32_t
schHandleReadyCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
return
schHandleCallback
(
param
,
pMsg
,
TDMT_VND_RES_READY_RSP
,
code
);
}
int32_t
schHandleDropCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SSchCallbackParam
*
pParam
=
(
SSchCallbackParam
*
)
param
;
qDebug
(
"drop task rsp received, queryId:%"
PRIx64
",taksId:%"
PRIx64
",code:%d"
,
pParam
->
queryId
,
pParam
->
taskId
,
code
);
}
int32_t
schGetCallbackFp
(
int32_t
msgType
,
__async_send_cb_fn_t
*
fp
)
{
switch
(
msgType
)
{
case
TDMT_VND_SUBMIT
:
*
fp
=
schHandleSubmitCallback
;
break
;
case
TDMT_VND_QUERY
:
*
fp
=
schHandleQueryCallback
;
break
;
case
TDMT_VND_RES_READY
:
*
fp
=
schHandleReadyCallback
;
break
;
case
TDMT_VND_FETCH
:
*
fp
=
schHandleFetchCallback
;
break
;
case
TDMT_VND_DROP_TASK
:
*
fp
=
schHandleDropCallback
;
break
;
default:
qError
(
"unknown msg type:%d"
,
msgType
);
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schAsyncSendMsg
(
void
*
transport
,
SEpSet
*
epSet
,
uint64_t
qId
,
uint64_t
tId
,
int32_t
msgType
,
void
*
msg
,
uint32_t
msgSize
)
{
int32_t
code
=
0
;
SMsgSendInfo
*
pMsgSendInfo
=
calloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
NULL
==
pMsgSendInfo
)
{
qError
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SMsgSendInfo
));
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SSchCallbackParam
*
param
=
calloc
(
1
,
sizeof
(
SSchCallbackParam
));
if
(
NULL
==
param
)
{
qError
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SSchCallbackParam
));
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
__async_send_cb_fn_t
fp
=
NULL
;
SCH_ERR_JRET
(
schGetCallbackFp
(
msgType
,
&
fp
));
param
->
queryId
=
qId
;
param
->
taskId
=
tId
;
pMsgSendInfo
->
param
=
param
;
pMsgSendInfo
->
msgInfo
.
pData
=
msg
;
pMsgSendInfo
->
msgInfo
.
len
=
msgSize
;
pMsgSendInfo
->
msgType
=
msgType
;
pMsgSendInfo
->
fp
=
fp
;
int64_t
transporterId
=
0
;
SCH_ERR_JRET
(
asyncSendMsgToServer
(
transport
,
epSet
,
&
transporterId
,
pMsgSendInfo
));
return
TSDB_CODE_SUCCESS
;
_return:
tfree
(
param
);
tfree
(
pMsgSendInfo
);
SCH_RET
(
code
);
}
int32_t
schBuildAndSendMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
)
{
uint32_t
msgSize
=
0
;
void
*
msg
=
NULL
;
int32_t
code
=
0
;
switch
(
msgType
)
{
case
TDMT_VND_SUBMIT
:
{
if
(
NULL
==
task
->
msg
||
task
->
msgLen
<=
0
)
{
qError
(
"submit msg is NULL"
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
msgSize
=
task
->
msgLen
;
msg
=
task
->
msg
;
break
;
}
case
TDMT_VND_QUERY
:
{
if
(
NULL
==
task
->
msg
)
{
qError
(
"query msg is NULL"
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
msgSize
=
sizeof
(
SSubQueryMsg
)
+
task
->
msgLen
;
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
qError
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SSubQueryMsg
*
pMsg
=
msg
;
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
job
->
queryId
);
pMsg
->
taskId
=
htobe64
(
task
->
taskId
);
pMsg
->
contentLen
=
htonl
(
task
->
msgLen
);
memcpy
(
pMsg
->
msg
,
task
->
msg
,
task
->
msgLen
);
break
;
}
case
TDMT_VND_RES_READY
:
{
msgSize
=
sizeof
(
SResReadyMsg
);
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
qError
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SResReadyMsg
*
pMsg
=
msg
;
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
job
->
queryId
);
pMsg
->
taskId
=
htobe64
(
task
->
taskId
);
break
;
}
case
TDMT_VND_FETCH
:
{
if
(
NULL
==
task
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
msgSize
=
sizeof
(
SResFetchMsg
);
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
qError
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SResFetchMsg
*
pMsg
=
msg
;
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
job
->
queryId
);
pMsg
->
taskId
=
htobe64
(
task
->
taskId
);
break
;
}
case
TDMT_VND_DROP_TASK
:{
msgSize
=
sizeof
(
STaskDropMsg
);
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
qError
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
STaskDropMsg
*
pMsg
=
msg
;
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
job
->
queryId
);
pMsg
->
taskId
=
htobe64
(
task
->
taskId
);
break
;
}
default:
qError
(
"unknown msg type:%d"
,
msgType
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
break
;
}
SCH_ERR_JRET
(
schAsyncSendMsg
(
job
->
transport
,
&
task
->
plan
->
execEpSet
,
job
->
queryId
,
task
->
taskId
,
msgType
,
msg
,
msgSize
));
return
TSDB_CODE_SUCCESS
;
_return:
tfree
(
msg
);
SCH_RET
(
code
);
}
int32_t
schLaunchTask
(
SSchJob
*
job
,
SSchTask
*
task
)
{
...
...
@@ -664,7 +748,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
int32_t
msgType
=
(
plan
->
type
==
QUERY_TYPE_MODIFY
)
?
TDMT_VND_SUBMIT
:
TDMT_VND_QUERY
;
SCH_ERR_RET
(
sch
Async
SendMsg
(
job
,
task
,
msgType
));
SCH_ERR_RET
(
sch
BuildAnd
SendMsg
(
job
,
task
,
msgType
));
SCH_ERR_RET
(
schPushTaskToExecList
(
job
,
task
));
...
...
@@ -673,6 +757,8 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schLaunchJob
(
SSchJob
*
job
)
{
SSchLevel
*
level
=
taosArrayGet
(
job
->
levels
,
job
->
levelIdx
);
for
(
int32_t
i
=
0
;
i
<
level
->
taskNum
;
++
i
)
{
...
...
@@ -690,7 +776,7 @@ void schDropJobAllTasks(SSchJob *job) {
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
sch
Async
SendMsg
(
job
,
task
,
TDMT_VND_DROP_TASK
);
sch
BuildAnd
SendMsg
(
job
,
task
,
TDMT_VND_DROP_TASK
);
pIter
=
taosHashIterate
(
job
->
succTasks
,
pIter
);
}
...
...
@@ -699,7 +785,7 @@ void schDropJobAllTasks(SSchJob *job) {
while
(
pIter
)
{
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
sch
Async
SendMsg
(
job
,
task
,
TDMT_VND_DROP_TASK
);
sch
BuildAnd
SendMsg
(
job
,
task
,
TDMT_VND_DROP_TASK
);
pIter
=
taosHashIterate
(
job
->
succTasks
,
pIter
);
}
...
...
@@ -717,7 +803,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
SCH_ERR_LRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
,
"init %d schduler jobs failed"
,
schMgmt
.
cfg
.
maxJobNum
);
}
schMgmt
.
s
cheduler
Id
=
1
;
//TODO GENERATE A UUID
schMgmt
.
sId
=
1
;
//TODO GENERATE A UUID
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
3c0a7e63
...
...
@@ -36,7 +36,7 @@
namespace
{
extern
"C"
int32_t
sch
Handle
RspMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
);
extern
"C"
int32_t
sch
Process
RspMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
);
void
schtBuildQueryDag
(
SQueryDag
*
dag
)
{
uint64_t
qId
=
0x0000000000000001
;
...
...
@@ -182,7 +182,7 @@ void *schtSendRsp(void *param) {
SShellSubmitRspMsg
rsp
=
{
0
};
rsp
.
affectedRows
=
10
;
sch
Handle
RspMsg
(
job
,
task
,
TDMT_VND_SUBMIT
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
sch
Process
RspMsg
(
job
,
task
,
TDMT_VND_SUBMIT
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
}
...
...
@@ -227,7 +227,7 @@ TEST(queryTest, normalCase) {
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SQueryTableRsp
rsp
=
{
0
};
code
=
sch
Handle
RspMsg
(
job
,
task
,
TDMT_VND_QUERY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
code
=
sch
Process
RspMsg
(
job
,
task
,
TDMT_VND_QUERY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
...
...
@@ -238,7 +238,7 @@ TEST(queryTest, normalCase) {
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SResReadyRsp
rsp
=
{
0
};
code
=
sch
Handle
RspMsg
(
job
,
task
,
TDMT_VND_RES_READY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
code
=
sch
Process
RspMsg
(
job
,
task
,
TDMT_VND_RES_READY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
...
...
@@ -249,7 +249,7 @@ TEST(queryTest, normalCase) {
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SQueryTableRsp
rsp
=
{
0
};
code
=
sch
Handle
RspMsg
(
job
,
task
,
TDMT_VND_QUERY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
code
=
sch
Process
RspMsg
(
job
,
task
,
TDMT_VND_QUERY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
...
...
@@ -260,7 +260,7 @@ TEST(queryTest, normalCase) {
SSchTask
*
task
=
*
(
SSchTask
**
)
pIter
;
SResReadyRsp
rsp
=
{
0
};
code
=
sch
Handle
RspMsg
(
job
,
task
,
TDMT_VND_RES_READY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
code
=
sch
Process
RspMsg
(
job
,
task
,
TDMT_VND_RES_READY
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
pIter
=
taosHashIterate
(
job
->
execTasks
,
pIter
);
...
...
@@ -269,7 +269,7 @@ TEST(queryTest, normalCase) {
SRetrieveTableRsp
rsp
=
{
0
};
rsp
.
completed
=
1
;
rsp
.
numOfRows
=
10
;
code
=
sch
Handle
RspMsg
(
job
,
NULL
,
TDMT_VND_FETCH
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
code
=
sch
Process
RspMsg
(
job
,
NULL
,
TDMT_VND_FETCH
,
(
char
*
)
&
rsp
,
sizeof
(
rsp
),
0
);
ASSERT_EQ
(
code
,
0
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录