Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b3f9f81b
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b3f9f81b
编写于
6月 16, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
support show apps
上级
7156f525
变更
25
隐藏空白更改
内联
并排
Showing
25 changed file
with
277 addition
and
233 deletion
+277
-233
include/common/tmsg.h
include/common/tmsg.h
+0
-2
include/libs/qcom/query.h
include/libs/qcom/query.h
+0
-2
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+1
-1
include/util/taoserror.h
include/util/taoserror.h
+2
-0
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+1
-0
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+1
-1
source/client/src/clientHb.c
source/client/src/clientHb.c
+2
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+4
-4
source/client/src/clientMain.c
source/client/src/clientMain.c
+13
-6
source/common/src/systable.c
source/common/src/systable.c
+2
-2
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+5
-3
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+16
-5
source/dnode/mnode/impl/test/profile/profile.cpp
source/dnode/mnode/impl/test/profile/profile.cpp
+1
-2
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+2
-0
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+1
-1
source/libs/qcom/src/queryUtil.c
source/libs/qcom/src/queryUtil.c
+0
-4
source/libs/qworker/src/qwDbg.c
source/libs/qworker/src/qwDbg.c
+4
-14
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+19
-5
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+155
-155
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+0
-2
source/libs/scheduler/src/schUtil.c
source/libs/scheduler/src/schUtil.c
+12
-0
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+26
-15
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+5
-5
source/util/src/terror.c
source/util/src/terror.c
+2
-0
tests/script/tsim/show/basic.sim
tests/script/tsim/show/basic.sim
+3
-3
未找到文件。
include/common/tmsg.h
浏览文件 @
b3f9f81b
...
...
@@ -1315,8 +1315,6 @@ int32_t tSerializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq
int32_t
tDeserializeSSetStandbyReq
(
void
*
buf
,
int32_t
bufLen
,
SSetStandbyReq
*
pReq
);
typedef
struct
{
int32_t
connId
;
// todo remove
int32_t
queryId
;
// todo remove
char
queryStrId
[
TSDB_QUERY_ID_LEN
];
}
SKillQueryReq
;
...
...
include/libs/qcom/query.h
浏览文件 @
b3f9f81b
...
...
@@ -33,8 +33,6 @@ typedef enum {
JOB_TASK_STATUS_PARTIAL_SUCCEED
,
JOB_TASK_STATUS_SUCCEED
,
JOB_TASK_STATUS_FAILED
,
JOB_TASK_STATUS_CANCELLING
,
JOB_TASK_STATUS_CANCELLED
,
JOB_TASK_STATUS_DROPPING
,
}
EJobTaskType
;
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
b3f9f81b
...
...
@@ -127,7 +127,7 @@ void schedulerStopQueryHb(void *pTrans);
* Free the query job
* @param pJob
*/
void
schedulerFreeJob
(
int64_t
job
);
void
schedulerFreeJob
(
int64_t
job
,
int32_t
errCode
);
void
schedulerDestroy
(
void
);
...
...
include/util/taoserror.h
浏览文件 @
b3f9f81b
...
...
@@ -127,6 +127,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_STMT_API_ERROR TAOS_DEF_ERROR_CODE(0, 0X0225)
#define TSDB_CODE_TSC_STMT_TBNAME_ERROR TAOS_DEF_ERROR_CODE(0, 0X0226)
#define TSDB_CODE_TSC_STMT_CLAUSE_ERROR TAOS_DEF_ERROR_CODE(0, 0X0227)
#define TSDB_CODE_TSC_QUERY_KILLED TAOS_DEF_ERROR_CODE(0, 0X0228)
// mnode-common
#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300)
...
...
@@ -570,6 +571,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502)
#define TSDB_CODE_SCH_IGNORE_ERROR TAOS_DEF_ERROR_CODE(0, 0x2503)
#define TSDB_CODE_SCH_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x2504)
#define TSDB_CODE_SCH_JOB_IS_DROPPING TAOS_DEF_ERROR_CODE(0, 0x2505)
#define TSDB_CODE_QW_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x2550)
//parser
...
...
source/client/inc/clientInt.h
浏览文件 @
b3f9f81b
...
...
@@ -204,6 +204,7 @@ typedef struct SRequestObj {
SRequestSendRecvBody
body
;
bool
stableQuery
;
bool
killed
;
uint32_t
prevCode
;
//previous error code: todo refactor, add update flag for catalog
uint32_t
retry
;
}
SRequestObj
;
...
...
source/client/src/clientEnv.c
浏览文件 @
b3f9f81b
...
...
@@ -229,7 +229,7 @@ static void doDestroyRequest(void *p) {
taosHashRemove
(
pRequest
->
pTscObj
->
pRequests
,
&
pRequest
->
self
,
sizeof
(
pRequest
->
self
));
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
schedulerFreeJob
(
pRequest
->
body
.
queryJob
,
0
);
}
taosMemoryFreeClear
(
pRequest
->
msgBuf
);
...
...
source/client/src/clientHb.c
浏览文件 @
b3f9f81b
...
...
@@ -164,6 +164,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
pTscObj
->
connId
=
pRsp
->
query
->
connId
;
if
(
pRsp
->
query
->
killRid
)
{
tscDebug
(
"request rid %"
PRIx64
" need to be killed now"
,
pRsp
->
query
->
killRid
);
SRequestObj
*
pRequest
=
acquireRequest
(
pRsp
->
query
->
killRid
);
if
(
NULL
==
pRequest
)
{
tscDebug
(
"request 0x%"
PRIx64
" not exist to kill"
,
pRsp
->
query
->
killRid
);
...
...
@@ -304,7 +305,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
while
(
pIter
!=
NULL
)
{
int64_t
*
rid
=
pIter
;
SRequestObj
*
pRequest
=
acquireRequest
(
*
rid
);
if
(
NULL
==
pRequest
)
{
if
(
NULL
==
pRequest
||
pRequest
->
killed
)
{
pIter
=
taosHashIterate
(
pObj
->
pRequests
,
pIter
);
continue
;
}
...
...
source/client/src/clientImpl.c
浏览文件 @
b3f9f81b
...
...
@@ -418,7 +418,7 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod
while
(
true
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
schedulerFreeJob
(
pRequest
->
body
.
queryJob
,
0
);
}
pRequest
->
code
=
code
;
...
...
@@ -439,7 +439,7 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod
pRequest
->
body
.
resInfo
.
numOfRows
=
res
.
numOfRows
;
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
schedulerFreeJob
(
pRequest
->
body
.
queryJob
,
0
);
}
}
...
...
@@ -468,7 +468,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
schedulerFreeJob
(
pRequest
->
body
.
queryJob
,
0
);
}
pRequest
->
code
=
code
;
...
...
@@ -481,7 +481,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
pRequest
->
body
.
resInfo
.
numOfRows
=
res
.
numOfRows
;
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
schedulerFreeJob
(
pRequest
->
body
.
queryJob
,
0
);
}
}
...
...
source/client/src/clientMain.c
浏览文件 @
b3f9f81b
...
...
@@ -246,13 +246,14 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
if
(
TD_RES_QUERY
(
res
))
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
#if SYNC_ON_TOP_OF_ASYNC
return
doAsyncFetchRows
(
pRequest
,
true
,
true
);
#else
if
(
pRequest
->
type
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pRequest
->
type
==
TSDB_SQL_INSERT
||
pRequest
->
code
!=
TSDB_CODE_SUCCESS
||
taos_num_fields
(
res
)
==
0
)
{
pRequest
->
code
!=
TSDB_CODE_SUCCESS
||
taos_num_fields
(
res
)
==
0
||
pRequest
->
killed
)
{
return
NULL
;
}
#if SYNC_ON_TOP_OF_ASYNC
return
doAsyncFetchRows
(
pRequest
,
true
,
true
);
#else
return
doFetchRows
(
pRequest
,
true
,
true
);
#endif
...
...
@@ -482,14 +483,20 @@ void taos_stop_query(TAOS_RES *res) {
}
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
pRequest
->
killed
=
true
;
int32_t
numOfFields
=
taos_num_fields
(
pRequest
);
// It is not a query, no need to stop.
if
(
numOfFields
==
0
)
{
tscDebug
(
"request %"
PRIx64
" no need to be killed since not query"
,
pRequest
->
requestId
);
return
;
}
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
if
(
pRequest
->
body
.
queryJob
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
,
TSDB_CODE_TSC_QUERY_KILLED
);
}
tscDebug
(
"request %"
PRIx64
" killed"
,
pRequest
->
requestId
);
}
bool
taos_is_null
(
TAOS_RES
*
res
,
int32_t
row
,
int32_t
col
)
{
...
...
source/common/src/systable.c
浏览文件 @
b3f9f81b
...
...
@@ -77,7 +77,7 @@ static const SSysDbTableSchema userDBSchema[] = {
{.
name
=
"ntables"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_BIGINT
},
{.
name
=
"replica"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_TINYINT
},
{.
name
=
"strict"
,
.
bytes
=
9
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"duration"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"duration"
,
.
bytes
=
10
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"keep"
,
.
bytes
=
32
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"buffer"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"pagesize"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
...
...
@@ -302,7 +302,7 @@ static const SSysDbTableSchema offsetSchema[] = {
};
static
const
SSysDbTableSchema
querySchema
[]
=
{
{.
name
=
"query_id"
,
.
bytes
=
26
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"query_id"
,
.
bytes
=
TSDB_QUERY_ID_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"req_id"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_UBIGINT
},
{.
name
=
"connId"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_UINT
},
{.
name
=
"app"
,
.
bytes
=
TSDB_APP_NAME_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
b3f9f81b
...
...
@@ -1391,11 +1391,13 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
rows
,
(
const
char
*
)
strict
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
rows
,
(
const
char
*
)
&
pDb
->
cfg
.
daysPerFile
,
false
);
char
tmp
[
128
]
=
{
0
};
int32_t
len
=
0
;
len
=
sprintf
(
&
tmp
[
VARSTR_HEADER_SIZE
],
"%dm"
,
pDb
->
cfg
.
daysPerFile
);
varDataSetLen
(
tmp
,
len
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
rows
,
(
const
char
*
)
tmp
,
false
);
if
(
pDb
->
cfg
.
daysToKeep0
>
pDb
->
cfg
.
daysToKeep1
||
pDb
->
cfg
.
daysToKeep0
>
pDb
->
cfg
.
daysToKeep2
)
{
len
=
sprintf
(
&
tmp
[
VARSTR_HEADER_SIZE
],
"%dm,%dm,%dm"
,
pDb
->
cfg
.
daysToKeep1
,
pDb
->
cfg
.
daysToKeep2
,
pDb
->
cfg
.
daysToKeep0
);
...
...
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
b3f9f81b
...
...
@@ -634,16 +634,27 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
return
-
1
;
}
mInfo
(
"kill query msg is received, queryId:%d"
,
killReq
.
queryId
);
mInfo
(
"kill query msg is received, queryId:%s"
,
killReq
.
queryStrId
);
int32_t
connId
=
0
;
uint64_t
queryId
=
0
;
char
*
p
=
strchr
(
killReq
.
queryStrId
,
':'
);
if
(
NULL
==
p
)
{
mError
(
"invalid query id %s"
,
killReq
.
queryStrId
);
terrno
=
TSDB_CODE_MND_INVALID_QUERY_ID
;
return
-
1
;
}
*
p
=
0
;
connId
=
taosStr2Int32
(
killReq
.
queryStrId
,
NULL
,
16
);
queryId
=
taosStr2UInt64
(
p
+
1
,
NULL
,
16
);
SConnObj
*
pConn
=
taosCacheAcquireByKey
(
pMgmt
->
connCache
,
&
killReq
.
connId
,
sizeof
(
int32_t
));
SConnObj
*
pConn
=
taosCacheAcquireByKey
(
pMgmt
->
connCache
,
&
connId
,
sizeof
(
int32_t
));
if
(
pConn
==
NULL
)
{
mError
(
"connId:%
d, failed to kill queryId:%d, conn not exist"
,
killReq
.
connId
,
killReq
.
queryId
);
mError
(
"connId:%
x, failed to kill queryId:%"
PRIx64
", conn not exist"
,
connId
,
queryId
);
terrno
=
TSDB_CODE_MND_INVALID_CONN_ID
;
return
-
1
;
}
else
{
mInfo
(
"connId:%
d, queryId:%d is killed by user:%s"
,
killReq
.
connId
,
killReq
.
queryId
,
pReq
->
conn
.
user
);
pConn
->
killId
=
killReq
.
queryId
;
mInfo
(
"connId:%
x, queryId:%"
PRIx64
" is killed by user:%s"
,
connId
,
queryId
,
pReq
->
conn
.
user
);
pConn
->
killId
=
queryId
;
taosCacheRelease
(
pMgmt
->
connCache
,
(
void
**
)
&
pConn
,
false
);
return
0
;
}
...
...
source/dnode/mnode/impl/test/profile/profile.cpp
浏览文件 @
b3f9f81b
...
...
@@ -295,8 +295,7 @@ TEST_F(MndTestProfile, 07_KillQueryMsg) {
TEST_F
(
MndTestProfile
,
08
_KillQueryMsg_InvalidConn
)
{
SKillQueryReq
killReq
=
{
0
};
killReq
.
connId
=
2345
;
killReq
.
queryId
=
2345
;
strcpy
(
killReq
.
queryStrId
,
"2345:2345"
);
int32_t
contLen
=
tSerializeSKillQueryReq
(
NULL
,
0
,
&
killReq
);
void
*
pReq
=
rpcMallocCont
(
contLen
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
b3f9f81b
...
...
@@ -537,6 +537,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
goto
_error
;
}
//taosSsleep(20);
SDataBlockDescNode
*
pDescNode
=
pTableScanNode
->
scan
.
node
.
pOutputDataBlockDesc
;
int32_t
numOfCols
=
0
;
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
b3f9f81b
...
...
@@ -1449,7 +1449,7 @@ SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId) {
CHECK_PARSER_STATUS
(
pCxt
);
SKillQueryStmt
*
pStmt
=
(
SKillQueryStmt
*
)
nodesMakeNode
(
QUERY_NODE_KILL_QUERY_STMT
);
CHECK_OUT_OF_MEM
(
pStmt
);
strncpy
(
pStmt
->
queryId
,
pQueryId
->
z
,
TMIN
(
pQueryId
->
n
,
sizeof
(
pStmt
->
queryId
)
-
1
)
);
trimString
(
pQueryId
->
z
,
pQueryId
->
n
,
pStmt
->
queryId
,
sizeof
(
pStmt
->
queryId
)
-
1
);
return
(
SNode
*
)
pStmt
;
}
...
...
source/libs/qcom/src/queryUtil.c
浏览文件 @
b3f9f81b
...
...
@@ -177,10 +177,6 @@ char* jobTaskStatusStr(int32_t status) {
return
"SUCCEED"
;
case
JOB_TASK_STATUS_FAILED
:
return
"FAILED"
;
case
JOB_TASK_STATUS_CANCELLING
:
return
"CANCELLING"
;
case
JOB_TASK_STATUS_CANCELLED
:
return
"CANCELLED"
;
case
JOB_TASK_STATUS_DROPPING
:
return
"DROPPING"
;
default:
...
...
source/libs/qworker/src/qwDbg.c
浏览文件 @
b3f9f81b
...
...
@@ -44,40 +44,30 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
break
;
case
JOB_TASK_STATUS_EXECUTING
:
if
(
newStatus
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLING
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_PARTIAL_SUCCEED
:
if
(
newStatus
!=
JOB_TASK_STATUS_EXECUTING
&&
newStatus
!=
JOB_TASK_STATUS_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLED
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_SUCCEED
:
if
(
newStatus
!=
JOB_TASK_STATUS_CANCELLED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
)
{
if
(
newStatus
!=
JOB_TASK_STATUS_DROPPING
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_FAILED
:
if
(
newStatus
!=
JOB_TASK_STATUS_
CANCELLED
&&
newStatus
!=
JOB_TASK_STATUS_
DROPPING
)
{
if
(
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_CANCELLING
:
if
(
newStatus
!=
JOB_TASK_STATUS_CANCELLED
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_CANCELLED
:
case
JOB_TASK_STATUS_DROPPING
:
if
(
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
b3f9f81b
...
...
@@ -48,6 +48,12 @@ enum {
SCH_FETCH_CB
,
};
typedef
enum
{
SCH_OP_NULL
=
0
,
SCH_OP_EXEC
,
SCH_OP_FETCH
,
}
SCH_OP_TYPE
;
typedef
struct
SSchTrans
{
void
*
pTrans
;
void
*
pHandle
;
...
...
@@ -188,11 +194,15 @@ typedef struct SSchTask {
typedef
struct
SSchJobAttr
{
EExplainMode
explainMode
;
bool
syncSchedule
;
bool
queryJob
;
bool
needFlowCtrl
;
}
SSchJobAttr
;
typedef
struct
{
int32_t
op
;
bool
sync
;
}
SSchOpStatus
;
typedef
struct
SSchJob
{
int64_t
refId
;
uint64_t
queryId
;
...
...
@@ -217,8 +227,7 @@ typedef struct SSchJob {
int8_t
status
;
SQueryNodeAddr
resNode
;
tsem_t
rspSem
;
int8_t
userFetch
;
int32_t
remoteFetch
;
SSchOpStatus
opStatus
;
SSchTask
*
fetchTask
;
int32_t
errCode
;
SRWLatch
resLock
;
...
...
@@ -227,7 +236,6 @@ typedef struct SSchJob {
int32_t
resNumOfRows
;
SSchResInfo
userRes
;
const
char
*
sql
;
int32_t
userCb
;
SQueryProfileSummary
summary
;
}
SSchJob
;
...
...
@@ -285,6 +293,10 @@ extern SSchedulerMgmt schMgmt;
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
#define SCH_GET_JOB_STATUS_STR(job) jobTaskStatusStr(SCH_GET_JOB_STATUS(job))
#define SCH_JOB_IN_SYNC_OP(job) ((job)->opStatus.op && (job)->opStatus.sync)
#define SCH_JOB_IN_ASYNC_EXEC_OP(job) (((job)->opStatus.op == SCH_OP_EXEC) && (!(job)->opStatus.sync))
#define SCH_JOB_IN_ASYNC_FETCH_OP(job) (((job)->opStatus.op == SCH_OP_FETCH) && (!(job)->opStatus.sync))
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
...
...
@@ -356,7 +368,7 @@ int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *
int32_t
schAppendTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
,
int32_t
execIdx
);
int32_t
schExecStaticExplainJob
(
SSchedulerReq
*
pReq
,
int64_t
*
job
,
bool
sync
);
int32_t
schExecJobImpl
(
SSchedulerReq
*
pReq
,
int64_t
*
job
,
SQueryResult
*
pRes
,
bool
sync
);
int32_t
sch
Chk
UpdateJobStatus
(
SSchJob
*
pJob
,
int8_t
newStatus
);
int32_t
schUpdateJobStatus
(
SSchJob
*
pJob
,
int8_t
newStatus
);
int32_t
schCancelJob
(
SSchJob
*
pJob
);
int32_t
schProcessOnJobDropped
(
SSchJob
*
pJob
,
int32_t
errCode
);
uint64_t
schGenTaskId
(
void
);
...
...
@@ -368,6 +380,8 @@ int32_t schAsyncFetchRows(SSchJob *pJob);
int32_t
schUpdateTaskHandle
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
bool
dropExecNode
,
void
*
handle
,
int32_t
execIdx
);
int32_t
schProcessOnTaskStatusRsp
(
SQueryNodeEpId
*
pEpId
,
SArray
*
pStatusList
);
void
schFreeSMsgSendInfo
(
SMsgSendInfo
*
msgSendInfo
);
char
*
schGetOpStr
(
SCH_OP_TYPE
type
);
int32_t
schBeginOperation
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
bool
sync
);
#ifdef __cplusplus
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
b3f9f81b
...
...
@@ -52,7 +52,6 @@ int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob, SQueryResult* pRes, b
}
pJob
->
attr
.
explainMode
=
pReq
->
pDag
->
explainInfo
.
mode
;
pJob
->
attr
.
syncSchedule
=
syncSchedule
;
pJob
->
conn
=
*
pReq
->
pConn
;
pJob
->
sql
=
pReq
->
sql
;
pJob
->
userRes
.
queryRes
=
pRes
;
...
...
@@ -161,12 +160,11 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
*
pStatus
=
status
;
}
return
(
status
==
JOB_TASK_STATUS_FAILED
||
status
==
JOB_TASK_STATUS_CANCELLED
||
status
==
JOB_TASK_STATUS_CANCELLING
||
status
==
JOB_TASK_STATUS_DROPPING
||
return
(
status
==
JOB_TASK_STATUS_FAILED
||
status
==
JOB_TASK_STATUS_DROPPING
||
status
==
JOB_TASK_STATUS_SUCCEED
);
}
int32_t
sch
Chk
UpdateJobStatus
(
SSchJob
*
pJob
,
int8_t
newStatus
)
{
int32_t
schUpdateJobStatus
(
SSchJob
*
pJob
,
int8_t
newStatus
)
{
int32_t
code
=
0
;
int8_t
oriStatus
=
0
;
...
...
@@ -175,7 +173,11 @@ int32_t schChkUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
oriStatus
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
oriStatus
==
newStatus
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
if
(
newStatus
==
JOB_TASK_STATUS_DROPPING
)
{
SCH_ERR_JRET
(
TSDB_CODE_SCH_JOB_IS_DROPPING
);
}
SCH_ERR_JRET
(
TSDB_CODE_SCH_IGNORE_ERROR
);
}
switch
(
oriStatus
)
{
...
...
@@ -193,7 +195,6 @@ int32_t schChkUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break
;
case
JOB_TASK_STATUS_EXECUTING
:
if
(
newStatus
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLING
&&
newStatus
!=
JOB_TASK_STATUS_CANCELLED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
...
...
@@ -208,13 +209,11 @@ int32_t schChkUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break
;
case
JOB_TASK_STATUS_SUCCEED
:
case
JOB_TASK_STATUS_FAILED
:
case
JOB_TASK_STATUS_CANCELLING
:
if
(
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_CANCELLED
:
case
JOB_TASK_STATUS_DROPPING
:
SCH_ERR_JRET
(
TSDB_CODE_QRY_JOB_FREED
);
break
;
...
...
@@ -238,8 +237,65 @@ int32_t schChkUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
_return:
SCH_JOB_ELOG
(
"invalid job status update, from %s to %s"
,
jobTaskStatusStr
(
oriStatus
),
jobTaskStatusStr
(
newStatus
));
SCH_ERR_RET
(
code
);
SCH_RET
(
code
);
}
void
schEndOperation
(
SSchJob
*
pJob
)
{
int32_t
op
=
atomic_load_32
(
&
pJob
->
opStatus
.
op
);
if
(
SCH_OP_NULL
==
op
)
{
SCH_JOB_DLOG
(
"job already not in any operation, status:%s"
,
jobTaskStatusStr
(
pJob
->
status
));
return
;
}
atomic_store_32
(
&
pJob
->
opStatus
.
op
,
SCH_OP_NULL
);
SCH_JOB_DLOG
(
"job end %s operation"
,
schGetOpStr
(
op
));
}
int32_t
schBeginOperation
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
bool
sync
)
{
int32_t
code
=
0
;
if
(
SCH_OP_NULL
!=
atomic_val_compare_exchange_32
(
&
pJob
->
opStatus
.
op
,
SCH_OP_NULL
,
type
))
{
SCH_JOB_ELOG
(
"job already in %s operation"
,
schGetOpStr
(
pJob
->
opStatus
.
op
));
SCH_ERR_JRET
(
TSDB_CODE_TSC_APP_ERROR
);
}
SCH_JOB_ELOG
(
"job start %s operation"
,
schGetOpStr
(
pJob
->
opStatus
.
op
));
pJob
->
opStatus
.
sync
=
sync
;
switch
(
type
)
{
case
SCH_OP_EXEC
:
SCH_ERR_JRET
(
schUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_EXECUTING
));
break
;
case
SCH_OP_FETCH
:
if
(
!
SCH_JOB_NEED_FETCH
(
pJob
))
{
SCH_JOB_ELOG
(
"no need to fetch data, status:%s"
,
SCH_GET_JOB_STATUS_STR
(
pJob
));
SCH_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
int8_t
status
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_JOB_ELOG
(
"job need to stop cause of status %s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
else
if
(
status
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
SCH_JOB_ELOG
(
"job status error for fetch, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
break
;
default:
SCH_JOB_ELOG
(
"unknown operation type %d"
,
type
);
SCH_ERR_JRET
(
TSDB_CODE_TSC_APP_ERROR
);
}
return
TSDB_CODE_SUCCESS
;
_return:
schEndOperation
(
pJob
);
SCH_RET
(
code
);
}
int32_t
schBuildTaskRalation
(
SSchJob
*
pJob
,
SHashObj
*
planToTask
)
{
...
...
@@ -828,7 +884,7 @@ int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes) {
int32_t
schSetJobFetchRes
(
SSchJob
*
pJob
,
void
**
pData
)
{
int32_t
code
=
0
;
if
(
pJob
->
resData
&&
((
SRetrieveTableRsp
*
)
pJob
->
resData
)
->
completed
)
{
SCH_ERR_RET
(
sch
Chk
UpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_SUCCEED
));
SCH_ERR_RET
(
schUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_SUCCEED
));
}
while
(
true
)
{
...
...
@@ -855,15 +911,17 @@ int32_t schSetJobFetchRes(SSchJob* pJob, void** pData) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schNotifyUser
Query
Res
(
SSchJob
*
pJob
)
{
pJob
->
userRes
.
queryRes
=
taosMemoryCalloc
(
1
,
sizeof
(
*
pJob
->
userRes
.
queryRes
));
if
(
p
Job
->
userRes
.
query
Res
)
{
schSetJobQueryRes
(
pJob
,
p
Job
->
userRes
.
query
Res
);
int32_t
schNotifyUser
Exec
Res
(
SSchJob
*
pJob
)
{
SQueryResult
*
pRes
=
taosMemoryCalloc
(
1
,
sizeof
(
SQueryResult
));
if
(
pRes
)
{
schSetJobQueryRes
(
pJob
,
pRes
);
}
(
*
pJob
->
userRes
.
execFp
)(
pJob
->
userRes
.
queryRes
,
pJob
->
userRes
.
userParam
,
atomic_load_32
(
&
pJob
->
errCode
)
);
schEndOperation
(
pJob
);
pJob
->
userRes
.
queryRes
=
NULL
;
SCH_JOB_DLOG
(
"sch start to invoke exec cb, code: %s"
,
tstrerror
(
pJob
->
errCode
));
(
*
pJob
->
userRes
.
execFp
)(
pRes
,
pJob
->
userRes
.
userParam
,
atomic_load_32
(
&
pJob
->
errCode
));
SCH_JOB_DLOG
(
"sch end from query cb, code: %s"
,
tstrerror
(
pJob
->
errCode
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -871,35 +929,49 @@ int32_t schNotifyUserQueryRes(SSchJob* pJob) {
int32_t
schNotifyUserFetchRes
(
SSchJob
*
pJob
)
{
void
*
pRes
=
NULL
;
SCH_ERR_RET
(
schSetJobFetchRes
(
pJob
,
&
pRes
));
schSetJobFetchRes
(
pJob
,
&
pRes
);
schEndOperation
(
pJob
);
SCH_JOB_DLOG
(
"sch start to invoke fetch cb, code: %s"
,
tstrerror
(
pJob
->
errCode
));
(
*
pJob
->
userRes
.
fetchFp
)(
pRes
,
pJob
->
userRes
.
userParam
,
atomic_load_32
(
&
pJob
->
errCode
));
SCH_JOB_DLOG
(
"sch end from fetch cb, code: %s"
,
tstrerror
(
pJob
->
errCode
));
return
TSDB_CODE_SUCCESS
;
}
void
schPostJobRes
(
SSchJob
*
pJob
,
SCH_OP_TYPE
op
)
{
if
(
SCH_OP_NULL
==
pJob
->
opStatus
.
op
)
{
SCH_JOB_DLOG
(
"job not in any op, no need to post job res, status:%s"
,
jobTaskStatusStr
(
pJob
->
status
));
return
;
}
if
(
op
&&
pJob
->
opStatus
.
op
!=
op
)
{
SCH_JOB_ELOG
(
"job in op %s mis-match with expected %s"
,
schGetOpStr
(
pJob
->
opStatus
.
op
),
schGetOpStr
(
op
));
return
;
}
if
(
SCH_JOB_IN_SYNC_OP
(
pJob
))
{
tsem_post
(
&
pJob
->
rspSem
);
}
else
if
(
SCH_JOB_IN_ASYNC_EXEC_OP
(
pJob
))
{
schNotifyUserExecRes
(
pJob
);
}
else
if
(
SCH_JOB_IN_ASYNC_FETCH_OP
(
pJob
))
{
schNotifyUserFetchRes
(
pJob
);
}
else
{
SCH_JOB_ELOG
(
"job not in any operation, status:%s"
,
jobTaskStatusStr
(
pJob
->
status
));
}
}
int32_t
schProcessOnJobFailureImpl
(
SSchJob
*
pJob
,
int32_t
status
,
int32_t
errCode
)
{
// if already FAILED, no more processing
SCH_ERR_RET
(
sch
Chk
UpdateJobStatus
(
pJob
,
status
));
SCH_ERR_RET
(
schUpdateJobStatus
(
pJob
,
status
));
schUpdateJobErrCode
(
pJob
,
errCode
);
if
(
atomic_load_8
(
&
pJob
->
userFetch
)
||
pJob
->
attr
.
syncSchedule
)
{
tsem_post
(
&
pJob
->
rspSem
);
}
int32_t
code
=
atomic_load_32
(
&
pJob
->
errCode
);
SCH_JOB_DLOG
(
"job failed with error: %s"
,
tstrerror
(
code
));
if
(
!
pJob
->
attr
.
syncSchedule
)
{
if
(
SCH_EXEC_CB
==
atomic_val_compare_exchange_32
(
&
pJob
->
userCb
,
SCH_EXEC_CB
,
0
))
{
schNotifyUserQueryRes
(
pJob
);
}
else
if
(
SCH_FETCH_CB
==
atomic_val_compare_exchange_32
(
&
pJob
->
userCb
,
SCH_FETCH_CB
,
0
))
{
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
1
,
0
);
schNotifyUserFetchRes
(
pJob
);
}
}
schPostJobRes
(
pJob
,
0
);
SCH_RET
(
code
);
}
...
...
@@ -918,20 +990,9 @@ int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) {
int32_t
schProcessOnJobPartialSuccess
(
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
SCH_ERR_RET
(
sch
Chk
UpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_PARTIAL_SUCCEED
));
SCH_ERR_RET
(
schUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_PARTIAL_SUCCEED
));
if
(
pJob
->
attr
.
syncSchedule
)
{
tsem_post
(
&
pJob
->
rspSem
);
}
else
if
(
SCH_EXEC_CB
==
atomic_val_compare_exchange_32
(
&
pJob
->
userCb
,
SCH_EXEC_CB
,
0
))
{
schNotifyUserQueryRes
(
pJob
);
}
else
if
(
SCH_FETCH_CB
==
atomic_val_compare_exchange_32
(
&
pJob
->
userCb
,
SCH_FETCH_CB
,
0
))
{
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
1
,
0
);
schNotifyUserFetchRes
(
pJob
);
}
if
(
atomic_load_8
(
&
pJob
->
userFetch
))
{
SCH_ERR_JRET
(
schFetchFromRemote
(
pJob
));
}
schPostJobRes
(
pJob
,
SCH_OP_EXEC
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -940,16 +1001,8 @@ _return:
SCH_RET
(
schProcessOnJobFailure
(
pJob
,
code
));
}
void
schProcessOnDataFetched
(
SSchJob
*
job
)
{
atomic_val_compare_exchange_32
(
&
job
->
remoteFetch
,
1
,
0
);
if
(
job
->
attr
.
syncSchedule
)
{
tsem_post
(
&
job
->
rspSem
);
}
else
if
(
SCH_FETCH_CB
==
atomic_val_compare_exchange_32
(
&
job
->
userCb
,
SCH_FETCH_CB
,
0
))
{
atomic_val_compare_exchange_8
(
&
job
->
userFetch
,
1
,
0
);
schNotifyUserFetchRes
(
job
);
}
void
schProcessOnDataFetched
(
SSchJob
*
pJob
)
{
schPostJobRes
(
pJob
,
SCH_OP_FETCH
);
}
// Note: no more task error processing, handled in function internal
...
...
@@ -1127,15 +1180,8 @@ _return:
int32_t
schFetchFromRemote
(
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
if
(
atomic_val_compare_exchange_32
(
&
pJob
->
remoteFetch
,
0
,
1
)
!=
0
)
{
SCH_JOB_ELOG
(
"prior fetching not finished, remoteFetch:%d"
,
atomic_load_32
(
&
pJob
->
remoteFetch
));
return
TSDB_CODE_SUCCESS
;
}
void
*
resData
=
atomic_load_ptr
(
&
pJob
->
resData
);
if
(
resData
)
{
atomic_val_compare_exchange_32
(
&
pJob
->
remoteFetch
,
1
,
0
);
SCH_JOB_DLOG
(
"res already fetched, res:%p"
,
resData
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1146,8 +1192,6 @@ int32_t schFetchFromRemote(SSchJob *pJob) {
_return:
atomic_val_compare_exchange_32
(
&
pJob
->
remoteFetch
,
1
,
0
);
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pJob
->
fetchTask
,
code
));
}
...
...
@@ -1382,8 +1426,6 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
int32_t
schLaunchJob
(
SSchJob
*
pJob
)
{
SSchLevel
*
level
=
taosArrayGet
(
pJob
->
levels
,
pJob
->
levelIdx
);
SCH_ERR_RET
(
schChkUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_EXECUTING
));
SCH_ERR_RET
(
schChkJobNeedFlowCtrl
(
pJob
,
level
));
SCH_ERR_RET
(
schLaunchLevelTasks
(
pJob
,
level
));
...
...
@@ -1483,24 +1525,36 @@ int32_t schExecJobImpl(SSchedulerReq *pReq, int64_t *job, SQueryResult* pRes, bo
int32_t
code
=
0
;
SSchJob
*
pJob
=
NULL
;
SCH_ERR_RET
(
schInitJob
(
pReq
,
&
pJob
,
pRes
,
sync
));
SCH_ERR_
J
RET
(
schInitJob
(
pReq
,
&
pJob
,
pRes
,
sync
));
qDebug
(
"QID:0x%"
PRIx64
" job refId 0x%"
PRIx64
" started"
,
pReq
->
pDag
->
queryId
,
pJob
->
refId
);
qDebug
(
"QID:0x%"
PRIx64
"
sch
job refId 0x%"
PRIx64
" started"
,
pReq
->
pDag
->
queryId
,
pJob
->
refId
);
*
job
=
pJob
->
refId
;
SCH_ERR_JRET
(
schLaunchJob
(
pJob
));
SCH_ERR_JRET
(
schBeginOperation
(
pJob
,
SCH_OP_EXEC
,
sync
));
code
=
schLaunchJob
(
pJob
);
if
(
sync
)
{
SCH_JOB_DLOG
(
"will wait for rsp now, job status:%s"
,
SCH_GET_JOB_STATUS_STR
(
pJob
));
tsem_wait
(
&
pJob
->
rspSem
);
}
else
{
pJob
->
userCb
=
SCH_EXEC_CB
;
schEndOperation
(
pJob
);
}
else
if
(
code
)
{
schPostJobRes
(
pJob
,
SCH_OP_EXEC
);
}
SCH_JOB_DLOG
(
"job exec done, job status:%s, jobId:0x%"
PRIx64
,
SCH_GET_JOB_STATUS_STR
(
pJob
),
pJob
->
refId
);
SCH_JOB_DLOG
(
"job exec done, job status:%s, jobId:0x%"
PRIx64
,
SCH_GET_JOB_STATUS_STR
(
pJob
),
pJob
->
refId
);
schReleaseJob
(
pJob
->
refId
);
SCH_RET
(
code
);
_return:
if
(
!
sync
)
{
pReq
->
fp
(
NULL
,
pReq
->
cbParam
,
code
);
}
schReleaseJob
(
pJob
->
refId
);
SCH_RET
(
code
);
...
...
@@ -1534,10 +1588,10 @@ int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob) {
*
pJob
=
0
;
if
(
EXPLAIN_MODE_STATIC
==
pReq
->
pDag
->
explainInfo
.
mode
)
{
SCH_ERR_RET
(
schExecStaticExplainJob
(
pReq
,
pJob
,
false
));
}
else
{
SCH_ERR_RET
(
schExecJobImpl
(
pReq
,
pJob
,
NULL
,
false
));
SCH_RET
(
schExecStaticExplainJob
(
pReq
,
pJob
,
false
));
}
SCH_ERR_RET
(
schExecJobImpl
(
pReq
,
pJob
,
NULL
,
false
));
return
code
;
}
...
...
@@ -1549,17 +1603,24 @@ int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) {
SSchJob
*
pJob
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchJob
));
if
(
NULL
==
pJob
)
{
qError
(
"QID:%"
PRIx64
" calloc %d failed"
,
pReq
->
pDag
->
queryId
,
(
int32_t
)
sizeof
(
SSchJob
));
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
pReq
->
fp
(
NULL
,
pReq
->
cbParam
,
code
);
SCH_ERR_RET
(
code
);
}
pJob
->
sql
=
pReq
->
sql
;
pJob
->
attr
.
queryJob
=
true
;
pJob
->
attr
.
syncSchedule
=
sync
;
pJob
->
attr
.
explainMode
=
pReq
->
pDag
->
explainInfo
.
mode
;
pJob
->
queryId
=
pReq
->
pDag
->
queryId
;
pJob
->
subPlans
=
pReq
->
pDag
->
pSubplans
;
pJob
->
userRes
.
execFp
=
pReq
->
fp
;
pJob
->
userRes
.
userParam
=
pReq
->
cbParam
;
code
=
schBeginOperation
(
pJob
,
SCH_OP_EXEC
,
sync
);
if
(
code
)
{
pReq
->
fp
(
NULL
,
pReq
->
cbParam
,
code
);
SCH_ERR_RET
(
code
);
}
SCH_ERR_JRET
(
qExecStaticExplain
(
pReq
->
pDag
,
(
SRetrieveTableRsp
**
)
&
pJob
->
resData
));
...
...
@@ -1571,7 +1632,7 @@ int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) {
if
(
NULL
==
schAcquireJob
(
refId
))
{
SCH_JOB_ELOG
(
"schAcquireJob job failed, refId:%"
PRIx64
,
refId
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
SCH_ERR_
J
RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
pJob
->
refId
=
refId
;
...
...
@@ -1582,9 +1643,11 @@ int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) {
*
job
=
pJob
->
refId
;
SCH_JOB_DLOG
(
"job exec done, job status:%s"
,
SCH_GET_JOB_STATUS_STR
(
pJob
));
if
(
!
pJob
->
attr
.
syncSchedule
)
{
code
=
schNotifyUserQueryRes
(
pJob
);
if
(
!
sync
)
{
schPostJobRes
(
pJob
,
SCH_OP_EXEC
);
}
else
{
schEndOperation
(
pJob
);
}
schReleaseJob
(
pJob
->
refId
);
...
...
@@ -1593,56 +1656,29 @@ int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync) {
_return:
schEndOperation
(
pJob
);
if
(
!
sync
)
{
pReq
->
fp
(
NULL
,
pReq
->
cbParam
,
code
);
}
schFreeJobImpl
(
pJob
);
SCH_RET
(
code
);
}
int32_t
schFetchRows
(
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
status
==
JOB_TASK_STATUS_DROPPING
)
{
SCH_JOB_ELOG
(
"job is dropping, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
!
SCH_JOB_NEED_FETCH
(
pJob
))
{
SCH_JOB_ELOG
(
"no need to fetch data, status:%s"
,
SCH_GET_JOB_STATUS_STR
(
pJob
));
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
0
,
1
)
!=
0
)
{
SCH_JOB_ELOG
(
"prior fetching not finished, userFetch:%d"
,
atomic_load_8
(
&
pJob
->
userFetch
));
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
JOB_TASK_STATUS_FAILED
==
status
||
JOB_TASK_STATUS_DROPPING
==
status
)
{
SCH_JOB_ELOG
(
"job failed or dropping, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_JRET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
else
if
(
status
==
JOB_TASK_STATUS_SUCCEED
)
{
SCH_JOB_DLOG
(
"job already succeed, status:%s"
,
jobTaskStatusStr
(
status
));
goto
_return
;
}
else
if
(
status
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
SCH_JOB_ELOG
(
"job status error for fetch, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
!
(
pJob
->
attr
.
explainMode
==
EXPLAIN_MODE_STATIC
))
{
SCH_ERR_JRET
(
schFetchFromRemote
(
pJob
));
tsem_wait
(
&
pJob
->
rspSem
);
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
JOB_TASK_STATUS_FAILED
==
status
||
JOB_TASK_STATUS_DROPPING
==
status
)
{
SCH_JOB_ELOG
(
"job failed or dropping, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_JRET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
}
SCH_ERR_JRET
(
schSetJobFetchRes
(
pJob
,
pJob
->
userRes
.
fetchRes
));
_return:
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
1
,
0
);
schEndOperation
(
pJob
);
SCH_RET
(
code
);
}
...
...
@@ -1650,50 +1686,14 @@ _return:
int32_t
schAsyncFetchRows
(
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
int8_t
status
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
status
==
JOB_TASK_STATUS_DROPPING
)
{
SCH_JOB_ELOG
(
"job is dropping, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
!
SCH_JOB_NEED_FETCH
(
pJob
))
{
SCH_JOB_ELOG
(
"no need to fetch data, status:%s"
,
SCH_GET_JOB_STATUS_STR
(
pJob
));
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
0
,
1
)
!=
0
)
{
SCH_JOB_ELOG
(
"prior fetching not finished, userFetch:%d"
,
atomic_load_8
(
&
pJob
->
userFetch
));
SCH_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
JOB_TASK_STATUS_FAILED
==
status
||
JOB_TASK_STATUS_DROPPING
==
status
)
{
SCH_JOB_ELOG
(
"job failed or dropping, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_JRET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
else
if
(
status
==
JOB_TASK_STATUS_SUCCEED
)
{
SCH_JOB_DLOG
(
"job already succeed, status:%s"
,
jobTaskStatusStr
(
status
));
goto
_return
;
}
else
if
(
status
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
SCH_JOB_ELOG
(
"job status error for fetch, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
pJob
->
attr
.
explainMode
==
EXPLAIN_MODE_STATIC
)
{
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
1
,
0
);
SCH_ERR_JRET
(
schNotifyUserFetchRes
(
pJob
));
}
else
{
pJob
->
userCb
=
SCH_FETCH_CB
;
SCH_ERR_JRET
(
schFetchFromRemote
(
pJob
));
schPostJobRes
(
pJob
,
SCH_OP_FETCH
);
return
TSDB_CODE_SUCCESS
;
}
SCH_ERR_RET
(
schFetchFromRemote
(
pJob
));
return
TSDB_CODE_SUCCESS
;
_return:
atomic_val_compare_exchange_8
(
&
pJob
->
userFetch
,
1
,
0
);
SCH_RET
(
code
);
}
source/libs/scheduler/src/schRemote.c
浏览文件 @
b3f9f81b
...
...
@@ -315,8 +315,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
return
TSDB_CODE_SUCCESS
;
}
atomic_val_compare_exchange_32
(
&
pJob
->
remoteFetch
,
1
,
0
);
SCH_ERR_JRET
(
schFetchFromRemote
(
pJob
));
taosMemoryFreeClear
(
msg
);
...
...
source/libs/scheduler/src/schUtil.c
浏览文件 @
b3f9f81b
...
...
@@ -21,6 +21,18 @@
#include "tref.h"
#include "trpc.h"
char
*
schGetOpStr
(
SCH_OP_TYPE
type
)
{
switch
(
type
)
{
case
SCH_OP_NULL
:
return
"NULL"
;
case
SCH_OP_EXEC
:
return
"EXEC"
;
case
SCH_OP_FETCH
:
return
"FETCH"
;
default:
return
"UNKNOWN"
;
}
}
void
schCleanClusterHb
(
void
*
pTrans
)
{
SCH_LOCK
(
SCH_WRITE
,
&
schMgmt
.
hbLock
);
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
b3f9f81b
...
...
@@ -78,16 +78,18 @@ int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes)
int32_t
schedulerAsyncExecJob
(
SSchedulerReq
*
pReq
,
int64_t
*
pJob
)
{
int32_t
code
=
0
;
if
(
NULL
==
pReq
||
NULL
==
pJob
)
{
code
=
TSDB_CODE_QRY_INVALID_INPUT
;
}
else
{
code
=
schAsyncExecJob
(
pReq
,
pJob
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
schAsyncExecJob
(
pReq
,
pJob
);
_return:
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pReq
->
fp
(
NULL
,
pReq
->
cbParam
,
code
);
}
return
code
;
SCH_RET
(
code
)
;
}
int32_t
schedulerFetchRows
(
int64_t
job
,
void
**
pData
)
{
...
...
@@ -102,7 +104,8 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
pJob
->
attr
.
syncSchedule
=
true
;
SCH_ERR_RET
(
schBeginOperation
(
pJob
,
SCH_OP_FETCH
,
true
));
pJob
->
userRes
.
fetchRes
=
pData
;
code
=
schFetchRows
(
pJob
);
...
...
@@ -112,24 +115,30 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
}
void
schedulerAsyncFetchRows
(
int64_t
job
,
schedulerFetchCallback
fp
,
void
*
param
)
{
int32_t
code
=
0
;
if
(
NULL
==
fp
||
NULL
==
param
)
{
fp
(
NULL
,
param
,
TSDB_CODE_QRY_INVALID_INPUT
);
return
;
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SSchJob
*
pJob
=
schAcquireJob
(
job
);
if
(
NULL
==
pJob
)
{
qError
(
"acquire job from jobRef list failed, may be dropped, jobId:0x%"
PRIx64
,
job
);
fp
(
NULL
,
param
,
TSDB_CODE_SCH_STATUS_ERROR
);
return
;
qError
(
"acquire sch job from job list failed, may be dropped, jobId:0x%"
PRIx64
,
job
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
pJob
->
attr
.
syncSchedule
=
false
;
SCH_ERR_JRET
(
schBeginOperation
(
pJob
,
SCH_OP_FETCH
,
false
));
pJob
->
userRes
.
fetchFp
=
fp
;
pJob
->
userRes
.
userParam
=
param
;
/*code = */
schAsyncFetchRows
(
pJob
);
SCH_ERR_JRET
(
schAsyncFetchRows
(
pJob
));
_return:
if
(
code
)
{
fp
(
NULL
,
param
,
code
);
}
schReleaseJob
(
job
);
}
...
...
@@ -188,15 +197,17 @@ void schedulerStopQueryHb(void *pTrans) {
schCleanClusterHb
(
pTrans
);
}
void
schedulerFreeJob
(
int64_t
job
)
{
void
schedulerFreeJob
(
int64_t
job
,
int32_t
errCode
)
{
SSchJob
*
pJob
=
schAcquireJob
(
job
);
if
(
NULL
==
pJob
)
{
qError
(
"acquire job from jobRef list failed, may be dropped, jobId:0x%"
PRIx64
,
job
);
return
;
}
if
(
atomic_load_8
(
&
pJob
->
userFetch
)
>
0
)
{
schProcessOnJobDropped
(
pJob
,
TSDB_CODE_QRY_JOB_FREED
);
int32_t
code
=
schProcessOnJobDropped
(
pJob
,
errCode
);
if
(
TSDB_CODE_SCH_JOB_IS_DROPPING
==
code
)
{
SCH_JOB_DLOG
(
"sch job is already dropping, refId:%"
PRIx64
,
job
);
return
;
}
SCH_JOB_DLOG
(
"start to remove job from jobRef list, refId:%"
PRIx64
,
job
);
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
b3f9f81b
...
...
@@ -457,7 +457,7 @@ void schtFreeQueryJob(int32_t freeThread) {
int64_t
job
=
queryJobRefId
;
if
(
job
&&
atomic_val_compare_exchange_64
(
&
queryJobRefId
,
job
,
0
))
{
schedulerFreeJob
(
job
);
schedulerFreeJob
(
job
,
0
);
if
(
freeThread
)
{
if
(
++
freeNum
%
schtTestPrintNum
==
0
)
{
printf
(
"FreeNum:%d
\n
"
,
freeNum
);
...
...
@@ -724,7 +724,7 @@ TEST(queryTest, normalCase) {
schReleaseJob
(
job
);
schedulerFreeJob
(
job
);
schedulerFreeJob
(
job
,
0
);
schtFreeQueryDag
(
&
dag
);
...
...
@@ -828,7 +828,7 @@ TEST(queryTest, readyFirstCase) {
schReleaseJob
(
job
);
schedulerFreeJob
(
job
);
schedulerFreeJob
(
job
,
0
);
schtFreeQueryDag
(
&
dag
);
...
...
@@ -940,7 +940,7 @@ TEST(queryTest, flowCtrlCase) {
schReleaseJob
(
job
);
schedulerFreeJob
(
job
);
schedulerFreeJob
(
job
,
0
);
schtFreeQueryDag
(
&
dag
);
...
...
@@ -994,7 +994,7 @@ TEST(insertTest, normalCase) {
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
res
.
numOfRows
,
20
);
schedulerFreeJob
(
insertJobRefId
);
schedulerFreeJob
(
insertJobRefId
,
0
);
schedulerDestroy
();
}
...
...
source/util/src/terror.c
浏览文件 @
b3f9f81b
...
...
@@ -132,6 +132,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input")
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSC_STMT_API_ERROR
,
"Stmt API usage error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSC_STMT_TBNAME_ERROR
,
"Stmt table name not set"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSC_STMT_CLAUSE_ERROR
,
"not supported stmt clause"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSC_QUERY_KILLED
,
"Query killed"
)
// mnode-common
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_APP_ERROR
,
"Mnode internal error"
)
...
...
@@ -450,6 +451,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_VG_META_MISMATCH, "table meta and vgroup
TAOS_DEFINE_ERROR
(
TSDB_CODE_SCH_STATUS_ERROR
,
"scheduler status error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SCH_INTERNAL_ERROR
,
"scheduler internal error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SCH_TIMEOUT_ERROR
,
"Task timeout"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SCH_JOB_IS_DROPPING
,
"Job is dropping"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QW_MSG_ERROR
,
"Invalid msg order"
)
// parser
...
...
tests/script/tsim/show/basic.sim
浏览文件 @
b3f9f81b
...
...
@@ -98,7 +98,7 @@ if $rows != 1 then
endi
#sql select * from information_schema.`streams`
sql select * from information_schema.user_tables
if $rows != 2
8
then
if $rows != 2
9
then
return -1
endi
#sql select * from information_schema.user_table_distributed
...
...
@@ -196,7 +196,7 @@ if $rows != 1 then
endi
#sql select * from performance_schema.`streams`
sql select * from information_schema.user_tables
if $rows != 2
8
then
if $rows != 2
9
then
return -1
endi
#sql select * from information_schema.user_table_distributed
...
...
@@ -210,4 +210,4 @@ if $rows != 3 then
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode2 -s stop -x SIGINT
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录