Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8b896332
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
8b896332
编写于
7月 06, 2022
作者:
D
dapan1121
提交者:
GitHub
7月 06, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14558 from taosdata/feature/insertselect
enh: refactor scheduler code
上级
78cbb515
641531bc
变更
23
展开全部
隐藏空白更改
内联
并排
Showing
23 changed file
with
1808 addition
and
1538 deletion
+1808
-1538
include/libs/qcom/query.h
include/libs/qcom/query.h
+15
-11
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+10
-32
include/util/taoserror.h
include/util/taoserror.h
+1
-1
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+1
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+39
-27
source/client/src/clientMain.c
source/client/src/clientMain.c
+6
-1
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+1
-1
source/libs/executor/src/dataInserter.c
source/libs/executor/src/dataInserter.c
+254
-0
source/libs/qcom/src/queryUtil.c
source/libs/qcom/src/queryUtil.c
+8
-8
source/libs/qworker/inc/qwInt.h
source/libs/qworker/inc/qwInt.h
+2
-2
source/libs/qworker/src/qwDbg.c
source/libs/qworker/src/qwDbg.c
+18
-18
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+9
-9
source/libs/scheduler/inc/schInt.h
source/libs/scheduler/inc/schInt.h
+62
-22
source/libs/scheduler/src/schDbg.c
source/libs/scheduler/src/schDbg.c
+3
-3
source/libs/scheduler/src/schFlowCtrl.c
source/libs/scheduler/src/schFlowCtrl.c
+1
-1
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+301
-1163
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+59
-87
source/libs/scheduler/src/schStatus.c
source/libs/scheduler/src/schStatus.c
+94
-0
source/libs/scheduler/src/schTask.c
source/libs/scheduler/src/schTask.c
+830
-0
source/libs/scheduler/src/schUtil.c
source/libs/scheduler/src/schUtil.c
+34
-1
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+26
-128
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+33
-22
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/libs/qcom/query.h
浏览文件 @
8b896332
...
...
@@ -29,12 +29,13 @@ extern "C" {
typedef
enum
{
JOB_TASK_STATUS_NULL
=
0
,
JOB_TASK_STATUS_NOT_START
=
1
,
JOB_TASK_STATUS_EXECUTING
,
JOB_TASK_STATUS_PARTIAL_SUCCEED
,
JOB_TASK_STATUS_SUCCEED
,
JOB_TASK_STATUS_FAILED
,
JOB_TASK_STATUS_DROPPING
,
JOB_TASK_STATUS_INIT
,
JOB_TASK_STATUS_EXEC
,
JOB_TASK_STATUS_PART_SUCC
,
JOB_TASK_STATUS_SUCC
,
JOB_TASK_STATUS_FAIL
,
JOB_TASK_STATUS_DROP
,
JOB_TASK_STATUS_MAX
,
}
EJobTaskType
;
typedef
enum
{
...
...
@@ -59,10 +60,6 @@ typedef struct STableComInfo {
int32_t
rowSize
;
// row size of the schema
}
STableComInfo
;
typedef
struct
SQueryExecRes
{
int32_t
msgType
;
void
*
res
;
}
SQueryExecRes
;
typedef
struct
SIndexMeta
{
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
...
...
@@ -71,6 +68,13 @@ typedef struct SIndexMeta {
}
SIndexMeta
;
typedef
struct
SExecResult
{
int32_t
code
;
uint64_t
numOfRows
;
int32_t
msgType
;
void
*
res
;
}
SExecResult
;
typedef
struct
STbVerInfo
{
char
tbFName
[
TSDB_TABLE_FNAME_LEN
];
int32_t
sversion
;
...
...
@@ -210,7 +214,7 @@ char* jobTaskStatusStr(int32_t status);
SSchema
createSchema
(
int8_t
type
,
int32_t
bytes
,
col_id_t
colId
,
const
char
*
name
);
void
destroyQueryExecRes
(
S
QueryExecRes
*
pRes
);
void
destroyQueryExecRes
(
S
ExecResult
*
pRes
);
int32_t
dataConverToStr
(
char
*
str
,
int
type
,
void
*
buf
,
int32_t
bufSize
,
int32_t
*
len
);
char
*
parseTagDatatoJson
(
void
*
p
);
int32_t
cloneTableMeta
(
STableMeta
*
pSrc
,
STableMeta
**
pDst
);
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
8b896332
...
...
@@ -53,12 +53,6 @@ typedef struct SQueryProfileSummary {
uint64_t
resultSize
;
// generated result size in Kb.
}
SQueryProfileSummary
;
typedef
struct
SQueryResult
{
int32_t
code
;
uint64_t
numOfRows
;
SQueryExecRes
res
;
}
SQueryResult
;
typedef
struct
STaskInfo
{
SQueryNodeAddr
addr
;
SSubQueryMsg
*
msg
;
...
...
@@ -69,50 +63,34 @@ typedef struct SSchdFetchParam {
int32_t
*
code
;
}
SSchdFetchParam
;
typedef
void
(
*
schedulerExecFp
)(
S
Query
Result
*
pResult
,
void
*
param
,
int32_t
code
);
typedef
void
(
*
schedulerExecFp
)(
S
Exec
Result
*
pResult
,
void
*
param
,
int32_t
code
);
typedef
void
(
*
schedulerFetchFp
)(
void
*
pResult
,
void
*
param
,
int32_t
code
);
typedef
bool
(
*
schedulerChkKillFp
)(
void
*
param
);
typedef
struct
SSchedulerReq
{
bool
syncReq
;
SRequestConnInfo
*
pConn
;
SArray
*
pNodeList
;
SQueryPlan
*
pDag
;
const
char
*
sql
;
int64_t
startTs
;
schedulerExecFp
execFp
;
void
*
execParam
;
schedulerFetchFp
fetchFp
;
void
*
cbParam
;
schedulerChkKillFp
chkKillFp
;
void
*
chkKillParam
;
SExecResult
*
pExecRes
;
void
**
pFetchRes
;
}
SSchedulerReq
;
int32_t
schedulerInit
(
SSchedulerCfg
*
cfg
);
/**
* Process the query job, generated according to the query physical plan.
* This is a synchronized API, and is also thread-safety.
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t
schedulerExecJob
(
SSchedulerReq
*
pReq
,
int64_t
*
pJob
,
SQueryResult
*
pRes
);
/**
* Process the query job, generated according to the query physical plan.
* This is a asynchronized API, and is also thread-safety.
* @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t
schedulerAsyncExecJob
(
SSchedulerReq
*
pReq
,
int64_t
*
pJob
);
int32_t
schedulerExecJob
(
SSchedulerReq
*
pReq
,
int64_t
*
pJob
);
/**
* Fetch query result from the remote query executor
* @param pJob
* @param data
* @return
*/
int32_t
schedulerFetchRows
(
int64_t
job
,
void
**
data
);
int32_t
schedulerFetchRows
(
int64_t
jobId
,
SSchedulerReq
*
pReq
);
void
scheduler
AsyncFetchRows
(
int64_t
job
,
schedulerFetchFp
fp
,
void
*
param
);
void
scheduler
FetchRowsA
(
int64_t
job
,
schedulerFetchFp
fp
,
void
*
param
);
int32_t
schedulerGetTasksStatus
(
int64_t
job
,
SArray
*
pSub
);
...
...
@@ -134,7 +112,7 @@ void schedulerFreeJob(int64_t* job, int32_t errCode);
void
schedulerDestroy
(
void
);
void
schdExecCallback
(
S
Query
Result
*
pResult
,
void
*
param
,
int32_t
code
);
void
schdExecCallback
(
S
Exec
Result
*
pResult
,
void
*
param
,
int32_t
code
);
#ifdef __cplusplus
}
...
...
include/util/taoserror.h
浏览文件 @
8b896332
...
...
@@ -389,10 +389,10 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719)
#define TSDB_CODE_QRY_JOB_FREED TAOS_DEF_ERROR_CODE(0, 0x071A)
#define TSDB_CODE_QRY_TASK_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x071B)
//json
#define TSDB_CODE_QRY_JSON_IN_ERROR TAOS_DEF_ERROR_CODE(0, 0x071C)
#define TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR TAOS_DEF_ERROR_CODE(0, 0x071D)
#define TSDB_CODE_QRY_JSON_IN_GROUP_ERROR TAOS_DEF_ERROR_CODE(0, 0x071E)
#define TSDB_CODE_QRY_JOB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x071F)
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800)
...
...
source/client/inc/clientInt.h
浏览文件 @
8b896332
...
...
@@ -156,7 +156,7 @@ typedef struct SResultColumn {
}
SResultColumn
;
typedef
struct
SReqResultInfo
{
S
QueryExecRes
execRes
;
S
ExecResult
execRes
;
const
char
*
pRspMsg
;
const
char
*
pData
;
TAOS_FIELD
*
fields
;
// todo, column names are not needed.
...
...
source/client/src/clientImpl.c
浏览文件 @
8b896332
...
...
@@ -627,22 +627,26 @@ _return:
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryPlan
*
pDag
,
SArray
*
pNodeList
)
{
void
*
pTransporter
=
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
;
S
Query
Result
res
=
{
0
};
S
Exec
Result
res
=
{
0
};
SRequestConnInfo
conn
=
{.
pTrans
=
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
,
.
requestId
=
pRequest
->
requestId
,
.
requestObjRefId
=
pRequest
->
self
};
SSchedulerReq
req
=
{.
pConn
=
&
conn
,
.
pNodeList
=
pNodeList
,
.
pDag
=
pDag
,
.
sql
=
pRequest
->
sqlstr
,
.
startTs
=
pRequest
->
metric
.
start
,
.
execFp
=
NULL
,
.
execParam
=
NULL
,
.
chkKillFp
=
chkRequestKilled
,
.
chkKillParam
=
(
void
*
)
pRequest
->
self
};
int32_t
code
=
schedulerExecJob
(
&
req
,
&
pRequest
->
body
.
queryJob
,
&
res
);
pRequest
->
body
.
resInfo
.
execRes
=
res
.
res
;
SSchedulerReq
req
=
{
.
syncReq
=
true
,
.
pConn
=
&
conn
,
.
pNodeList
=
pNodeList
,
.
pDag
=
pDag
,
.
sql
=
pRequest
->
sqlstr
,
.
startTs
=
pRequest
->
metric
.
start
,
.
execFp
=
NULL
,
.
cbParam
=
NULL
,
.
chkKillFp
=
chkRequestKilled
,
.
chkKillParam
=
(
void
*
)
pRequest
->
self
,
.
pExecRes
=
&
res
,
};
int32_t
code
=
schedulerExecJob
(
&
req
,
&
pRequest
->
body
.
queryJob
);
memcpy
(
&
pRequest
->
body
.
resInfo
.
execRes
,
&
res
,
sizeof
(
res
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
schedulerFreeJob
(
&
pRequest
->
body
.
queryJob
,
0
);
...
...
@@ -753,7 +757,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
}
SEpSet
epset
=
getEpSet_s
(
&
pAppInfo
->
mgmtEp
);
S
QueryExecRes
*
pRes
=
&
pRequest
->
body
.
resInfo
.
execRes
;
S
ExecResult
*
pRes
=
&
pRequest
->
body
.
resInfo
.
execRes
;
switch
(
pRes
->
msgType
)
{
case
TDMT_VND_ALTER_TABLE
:
...
...
@@ -779,10 +783,10 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
return
code
;
}
void
schedulerExecCb
(
S
Query
Result
*
pResult
,
void
*
param
,
int32_t
code
)
{
void
schedulerExecCb
(
S
Exec
Result
*
pResult
,
void
*
param
,
int32_t
code
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
param
;
pRequest
->
code
=
code
;
pRequest
->
body
.
resInfo
.
execRes
=
pResult
->
res
;
memcpy
(
&
pRequest
->
body
.
resInfo
.
execRes
,
pResult
,
sizeof
(
*
pResult
))
;
if
(
TDMT_VND_SUBMIT
==
pRequest
->
type
||
TDMT_VND_DELETE
==
pRequest
->
type
||
TDMT_VND_CREATE_TABLE
==
pRequest
->
type
)
{
...
...
@@ -939,16 +943,20 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
SRequestConnInfo
conn
=
{
.
pTrans
=
pAppInfo
->
pTransporter
,
.
requestId
=
pRequest
->
requestId
,
.
requestObjRefId
=
pRequest
->
self
};
SSchedulerReq
req
=
{.
pConn
=
&
conn
,
.
pNodeList
=
pNodeList
,
.
pDag
=
pDag
,
.
sql
=
pRequest
->
sqlstr
,
.
startTs
=
pRequest
->
metric
.
start
,
.
execFp
=
schedulerExecCb
,
.
execParam
=
pRequest
,
.
chkKillFp
=
chkRequestKilled
,
.
chkKillParam
=
(
void
*
)
pRequest
->
self
};
code
=
schedulerAsyncExecJob
(
&
req
,
&
pRequest
->
body
.
queryJob
);
SSchedulerReq
req
=
{
.
syncReq
=
false
,
.
pConn
=
&
conn
,
.
pNodeList
=
pNodeList
,
.
pDag
=
pDag
,
.
sql
=
pRequest
->
sqlstr
,
.
startTs
=
pRequest
->
metric
.
start
,
.
execFp
=
schedulerExecCb
,
.
cbParam
=
pRequest
,
.
chkKillFp
=
chkRequestKilled
,
.
chkKillParam
=
(
void
*
)
pRequest
->
self
,
.
pExecRes
=
NULL
,
};
code
=
schedulerExecJob
(
&
req
,
&
pRequest
->
body
.
queryJob
);
taosArrayDestroy
(
pNodeList
);
}
else
{
tscDebug
(
"0x%"
PRIx64
" plan not executed, code:%s 0x%"
PRIx64
,
pRequest
->
self
,
tstrerror
(
code
),
...
...
@@ -1387,7 +1395,11 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
}
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
pRequest
->
code
=
schedulerFetchRows
(
pRequest
->
body
.
queryJob
,
(
void
**
)
&
pResInfo
->
pData
);
SSchedulerReq
req
=
{
.
syncReq
=
true
,
.
pFetchRes
=
(
void
**
)
&
pResInfo
->
pData
,
};
pRequest
->
code
=
schedulerFetchRows
(
pRequest
->
body
.
queryJob
,
&
req
);
if
(
pRequest
->
code
!=
TSDB_CODE_SUCCESS
)
{
pResultInfo
->
numOfRows
=
0
;
return
NULL
;
...
...
source/client/src/clientMain.c
浏览文件 @
8b896332
...
...
@@ -858,7 +858,12 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
}
}
schedulerAsyncFetchRows
(
pRequest
->
body
.
queryJob
,
fetchCallback
,
pRequest
);
SSchedulerReq
req
=
{
.
syncReq
=
false
,
.
fetchFp
=
fetchCallback
,
.
cbParam
=
pRequest
,
};
schedulerFetchRows
(
pRequest
->
body
.
queryJob
,
&
req
);
}
void
taos_fetch_raw_block_a
(
TAOS_RES
*
res
,
__taos_async_fn_t
fp
,
void
*
param
)
{
...
...
source/client/src/clientMsgHandler.c
浏览文件 @
8b896332
...
...
@@ -266,7 +266,7 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
}
if
(
pRequest
->
body
.
queryFp
!=
NULL
)
{
S
QueryExecRes
*
pRes
=
&
pRequest
->
body
.
resInfo
.
execRes
;
S
ExecResult
*
pRes
=
&
pRequest
->
body
.
resInfo
.
execRes
;
if
(
code
==
TSDB_CODE_SUCCESS
)
{
SCatalog
*
pCatalog
=
NULL
;
...
...
source/libs/executor/src/dataInserter.c
0 → 100644
浏览文件 @
8b896332
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "executorimpl.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tqueue.h"
extern
SDataSinkStat
gDataSinkStat
;
typedef
struct
SDataInserterBuf
{
int32_t
useSize
;
int32_t
allocSize
;
char
*
pData
;
}
SDataInserterBuf
;
typedef
struct
SDataCacheEntry
{
int32_t
dataLen
;
int32_t
numOfRows
;
int32_t
numOfCols
;
int8_t
compressed
;
char
data
[];
}
SDataCacheEntry
;
typedef
struct
SDataInserterHandle
{
SDataSinkHandle
sink
;
SDataSinkManager
*
pManager
;
SDataBlockDescNode
*
pSchema
;
SDataDeleterNode
*
pDeleter
;
SDeleterParam
*
pParam
;
STaosQueue
*
pDataBlocks
;
SDataInserterBuf
nextOutput
;
int32_t
status
;
bool
queryEnd
;
uint64_t
useconds
;
uint64_t
cachedSize
;
TdThreadMutex
mutex
;
}
SDataInserterHandle
;
static
bool
needCompress
(
const
SSDataBlock
*
pData
,
int32_t
numOfCols
)
{
if
(
tsCompressColData
<
0
||
0
==
pData
->
info
.
rows
)
{
return
false
;
}
for
(
int32_t
col
=
0
;
col
<
numOfCols
;
++
col
)
{
SColumnInfoData
*
pColRes
=
taosArrayGet
(
pData
->
pDataBlock
,
col
);
int32_t
colSize
=
pColRes
->
info
.
bytes
*
pData
->
info
.
rows
;
if
(
NEEDTO_COMPRESS_QUERY
(
colSize
))
{
return
true
;
}
}
return
false
;
}
static
void
toDataCacheEntry
(
SDataInserterHandle
*
pHandle
,
const
SInputData
*
pInput
,
SDataInserterBuf
*
pBuf
)
{
int32_t
numOfCols
=
LIST_LENGTH
(
pHandle
->
pSchema
->
pSlots
);
SDataCacheEntry
*
pEntry
=
(
SDataCacheEntry
*
)
pBuf
->
pData
;
pEntry
->
compressed
=
0
;
pEntry
->
numOfRows
=
pInput
->
pData
->
info
.
rows
;
pEntry
->
numOfCols
=
taosArrayGetSize
(
pInput
->
pData
->
pDataBlock
);
pEntry
->
dataLen
=
sizeof
(
SDeleterRes
);
ASSERT
(
1
==
pEntry
->
numOfRows
);
ASSERT
(
1
==
pEntry
->
numOfCols
);
pBuf
->
useSize
=
sizeof
(
SDataCacheEntry
);
SColumnInfoData
*
pColRes
=
(
SColumnInfoData
*
)
taosArrayGet
(
pInput
->
pData
->
pDataBlock
,
0
);
SDeleterRes
*
pRes
=
(
SDeleterRes
*
)
pEntry
->
data
;
pRes
->
suid
=
pHandle
->
pParam
->
suid
;
pRes
->
uidList
=
pHandle
->
pParam
->
pUidList
;
pRes
->
skey
=
pHandle
->
pDeleter
->
deleteTimeRange
.
skey
;
pRes
->
ekey
=
pHandle
->
pDeleter
->
deleteTimeRange
.
ekey
;
pRes
->
affectedRows
=
*
(
int64_t
*
)
pColRes
->
pData
;
pBuf
->
useSize
+=
pEntry
->
dataLen
;
atomic_add_fetch_64
(
&
pHandle
->
cachedSize
,
pEntry
->
dataLen
);
atomic_add_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pEntry
->
dataLen
);
}
static
bool
allocBuf
(
SDataInserterHandle
*
pDeleter
,
const
SInputData
*
pInput
,
SDataInserterBuf
*
pBuf
)
{
uint32_t
capacity
=
pDeleter
->
pManager
->
cfg
.
maxDataBlockNumPerQuery
;
if
(
taosQueueItemSize
(
pDeleter
->
pDataBlocks
)
>
capacity
)
{
qError
(
"SinkNode queue is full, no capacity, max:%d, current:%d, no capacity"
,
capacity
,
taosQueueItemSize
(
pDeleter
->
pDataBlocks
));
return
false
;
}
pBuf
->
allocSize
=
sizeof
(
SDataCacheEntry
)
+
sizeof
(
SDeleterRes
);
pBuf
->
pData
=
taosMemoryMalloc
(
pBuf
->
allocSize
);
if
(
pBuf
->
pData
==
NULL
)
{
qError
(
"SinkNode failed to malloc memory, size:%d, code:%d"
,
pBuf
->
allocSize
,
TAOS_SYSTEM_ERROR
(
errno
));
}
return
NULL
!=
pBuf
->
pData
;
}
static
int32_t
updateStatus
(
SDataInserterHandle
*
pDeleter
)
{
taosThreadMutexLock
(
&
pDeleter
->
mutex
);
int32_t
blockNums
=
taosQueueItemSize
(
pDeleter
->
pDataBlocks
);
int32_t
status
=
(
0
==
blockNums
?
DS_BUF_EMPTY
:
(
blockNums
<
pDeleter
->
pManager
->
cfg
.
maxDataBlockNumPerQuery
?
DS_BUF_LOW
:
DS_BUF_FULL
));
pDeleter
->
status
=
status
;
taosThreadMutexUnlock
(
&
pDeleter
->
mutex
);
return
status
;
}
static
int32_t
getStatus
(
SDataInserterHandle
*
pDeleter
)
{
taosThreadMutexLock
(
&
pDeleter
->
mutex
);
int32_t
status
=
pDeleter
->
status
;
taosThreadMutexUnlock
(
&
pDeleter
->
mutex
);
return
status
;
}
static
int32_t
putDataBlock
(
SDataSinkHandle
*
pHandle
,
const
SInputData
*
pInput
,
bool
*
pContinue
)
{
SDataInserterHandle
*
pDeleter
=
(
SDataInserterHandle
*
)
pHandle
;
SDataInserterBuf
*
pBuf
=
taosAllocateQitem
(
sizeof
(
SDataInserterBuf
),
DEF_QITEM
);
if
(
NULL
==
pBuf
||
!
allocBuf
(
pDeleter
,
pInput
,
pBuf
))
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
toDataCacheEntry
(
pDeleter
,
pInput
,
pBuf
);
taosWriteQitem
(
pDeleter
->
pDataBlocks
,
pBuf
);
*
pContinue
=
(
DS_BUF_LOW
==
updateStatus
(
pDeleter
)
?
true
:
false
);
return
TSDB_CODE_SUCCESS
;
}
static
void
endPut
(
struct
SDataSinkHandle
*
pHandle
,
uint64_t
useconds
)
{
SDataInserterHandle
*
pDeleter
=
(
SDataInserterHandle
*
)
pHandle
;
taosThreadMutexLock
(
&
pDeleter
->
mutex
);
pDeleter
->
queryEnd
=
true
;
pDeleter
->
useconds
=
useconds
;
taosThreadMutexUnlock
(
&
pDeleter
->
mutex
);
}
static
void
getDataLength
(
SDataSinkHandle
*
pHandle
,
int32_t
*
pLen
,
bool
*
pQueryEnd
)
{
SDataInserterHandle
*
pDeleter
=
(
SDataInserterHandle
*
)
pHandle
;
if
(
taosQueueEmpty
(
pDeleter
->
pDataBlocks
))
{
*
pQueryEnd
=
pDeleter
->
queryEnd
;
*
pLen
=
0
;
return
;
}
SDataInserterBuf
*
pBuf
=
NULL
;
taosReadQitem
(
pDeleter
->
pDataBlocks
,
(
void
**
)
&
pBuf
);
memcpy
(
&
pDeleter
->
nextOutput
,
pBuf
,
sizeof
(
SDataInserterBuf
));
taosFreeQitem
(
pBuf
);
*
pLen
=
((
SDataCacheEntry
*
)(
pDeleter
->
nextOutput
.
pData
))
->
dataLen
;
*
pQueryEnd
=
pDeleter
->
queryEnd
;
qDebug
(
"got data len %d, row num %d in sink"
,
*
pLen
,
((
SDataCacheEntry
*
)(
pDeleter
->
nextOutput
.
pData
))
->
numOfRows
);
}
static
int32_t
getDataBlock
(
SDataSinkHandle
*
pHandle
,
SOutputData
*
pOutput
)
{
SDataInserterHandle
*
pDeleter
=
(
SDataInserterHandle
*
)
pHandle
;
if
(
NULL
==
pDeleter
->
nextOutput
.
pData
)
{
assert
(
pDeleter
->
queryEnd
);
pOutput
->
useconds
=
pDeleter
->
useconds
;
pOutput
->
precision
=
pDeleter
->
pSchema
->
precision
;
pOutput
->
bufStatus
=
DS_BUF_EMPTY
;
pOutput
->
queryEnd
=
pDeleter
->
queryEnd
;
return
TSDB_CODE_SUCCESS
;
}
SDataCacheEntry
*
pEntry
=
(
SDataCacheEntry
*
)(
pDeleter
->
nextOutput
.
pData
);
memcpy
(
pOutput
->
pData
,
pEntry
->
data
,
pEntry
->
dataLen
);
pOutput
->
numOfRows
=
pEntry
->
numOfRows
;
pOutput
->
numOfCols
=
pEntry
->
numOfCols
;
pOutput
->
compressed
=
pEntry
->
compressed
;
atomic_sub_fetch_64
(
&
pDeleter
->
cachedSize
,
pEntry
->
dataLen
);
atomic_sub_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pEntry
->
dataLen
);
taosMemoryFreeClear
(
pDeleter
->
nextOutput
.
pData
);
// todo persistent
pOutput
->
bufStatus
=
updateStatus
(
pDeleter
);
taosThreadMutexLock
(
&
pDeleter
->
mutex
);
pOutput
->
queryEnd
=
pDeleter
->
queryEnd
;
pOutput
->
useconds
=
pDeleter
->
useconds
;
pOutput
->
precision
=
pDeleter
->
pSchema
->
precision
;
taosThreadMutexUnlock
(
&
pDeleter
->
mutex
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
destroyDataSinker
(
SDataSinkHandle
*
pHandle
)
{
SDataInserterHandle
*
pDeleter
=
(
SDataInserterHandle
*
)
pHandle
;
atomic_sub_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pDeleter
->
cachedSize
);
taosMemoryFreeClear
(
pDeleter
->
nextOutput
.
pData
);
while
(
!
taosQueueEmpty
(
pDeleter
->
pDataBlocks
))
{
SDataInserterBuf
*
pBuf
=
NULL
;
taosReadQitem
(
pDeleter
->
pDataBlocks
,
(
void
**
)
&
pBuf
);
taosMemoryFreeClear
(
pBuf
->
pData
);
taosFreeQitem
(
pBuf
);
}
taosCloseQueue
(
pDeleter
->
pDataBlocks
);
taosThreadMutexDestroy
(
&
pDeleter
->
mutex
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
getCacheSize
(
struct
SDataSinkHandle
*
pHandle
,
uint64_t
*
size
)
{
SDataInserterHandle
*
pDispatcher
=
(
SDataInserterHandle
*
)
pHandle
;
*
size
=
atomic_load_64
(
&
pDispatcher
->
cachedSize
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
createDataInserter
(
SDataSinkManager
*
pManager
,
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
,
void
*
pParam
)
{
SDataInserterHandle
*
inserter
=
taosMemoryCalloc
(
1
,
sizeof
(
SDataInserterHandle
));
if
(
NULL
==
inserter
)
{
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
SDataDeleterNode
*
pDeleterNode
=
(
SDataDeleterNode
*
)
pDataSink
;
inserter
->
sink
.
fPut
=
putDataBlock
;
inserter
->
sink
.
fEndPut
=
endPut
;
inserter
->
sink
.
fGetLen
=
getDataLength
;
inserter
->
sink
.
fGetData
=
getDataBlock
;
inserter
->
sink
.
fDestroy
=
destroyDataSinker
;
inserter
->
sink
.
fGetCacheSize
=
getCacheSize
;
inserter
->
pManager
=
pManager
;
inserter
->
pDeleter
=
pDeleterNode
;
inserter
->
pSchema
=
pDataSink
->
pInputDataBlockDesc
;
inserter
->
pParam
=
pParam
;
inserter
->
status
=
DS_BUF_EMPTY
;
inserter
->
queryEnd
=
false
;
inserter
->
pDataBlocks
=
taosOpenQueue
();
taosThreadMutexInit
(
&
inserter
->
mutex
,
NULL
);
if
(
NULL
==
inserter
->
pDataBlocks
)
{
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
*
pHandle
=
inserter
;
return
TSDB_CODE_SUCCESS
;
}
source/libs/qcom/src/queryUtil.c
浏览文件 @
8b896332
...
...
@@ -171,17 +171,17 @@ char* jobTaskStatusStr(int32_t status) {
switch
(
status
)
{
case
JOB_TASK_STATUS_NULL
:
return
"NULL"
;
case
JOB_TASK_STATUS_
NOT_STAR
T
:
return
"
NOT_STAR
T"
;
case
JOB_TASK_STATUS_EXEC
UTING
:
case
JOB_TASK_STATUS_
INI
T
:
return
"
INI
T"
;
case
JOB_TASK_STATUS_EXEC
:
return
"EXECUTING"
;
case
JOB_TASK_STATUS_PART
IAL_SUCCEED
:
case
JOB_TASK_STATUS_PART
_SUCC
:
return
"PARTIAL_SUCCEED"
;
case
JOB_TASK_STATUS_SUCC
EED
:
case
JOB_TASK_STATUS_SUCC
:
return
"SUCCEED"
;
case
JOB_TASK_STATUS_FAIL
ED
:
case
JOB_TASK_STATUS_FAIL
:
return
"FAILED"
;
case
JOB_TASK_STATUS_DROP
PING
:
case
JOB_TASK_STATUS_DROP
:
return
"DROPPING"
;
default:
break
;
...
...
@@ -200,7 +200,7 @@ SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* nam
return
s
;
}
void
destroyQueryExecRes
(
S
QueryExecRes
*
pRes
)
{
void
destroyQueryExecRes
(
S
ExecResult
*
pRes
)
{
if
(
NULL
==
pRes
||
NULL
==
pRes
->
res
)
{
return
;
}
...
...
source/libs/qworker/inc/qwInt.h
浏览文件 @
8b896332
...
...
@@ -226,8 +226,8 @@ typedef struct SQWorkerMgmt {
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_READY(status) \
(status == JOB_TASK_STATUS_SUCC
EED || status == JOB_TASK_STATUS_FAILED
|| status == JOB_TASK_STATUS_CANCELLED || \
status == JOB_TASK_STATUS_PART
IAL_SUCCEED
)
(status == JOB_TASK_STATUS_SUCC
|| status == JOB_TASK_STATUS_FAIL
|| status == JOB_TASK_STATUS_CANCELLED || \
status == JOB_TASK_STATUS_PART
_SUCC
)
#define QW_SET_QTID(id, qId, tId, eId) \
do { \
*(uint64_t *)(id) = (qId); \
...
...
source/libs/qworker/src/qwDbg.c
浏览文件 @
8b896332
...
...
@@ -19,7 +19,7 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
int32_t
code
=
0
;
if
(
oriStatus
==
newStatus
)
{
if
(
newStatus
==
JOB_TASK_STATUS_EXEC
UTING
||
newStatus
==
JOB_TASK_STATUS_FAILED
)
{
if
(
newStatus
==
JOB_TASK_STATUS_EXEC
||
newStatus
==
JOB_TASK_STATUS_FAIL
)
{
*
ignore
=
true
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -29,47 +29,47 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
switch
(
oriStatus
)
{
case
JOB_TASK_STATUS_NULL
:
if
(
newStatus
!=
JOB_TASK_STATUS_EXEC
UTING
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
&&
newStatus
!=
JOB_TASK_STATUS_
NOT_STAR
T
)
{
if
(
newStatus
!=
JOB_TASK_STATUS_EXEC
&&
newStatus
!=
JOB_TASK_STATUS_FAIL
&&
newStatus
!=
JOB_TASK_STATUS_
INI
T
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_
NOT_STAR
T
:
if
(
newStatus
!=
JOB_TASK_STATUS_DROP
PING
&&
newStatus
!=
JOB_TASK_STATUS_EXECUTING
&&
newStatus
!=
JOB_TASK_STATUS_FAIL
ED
)
{
case
JOB_TASK_STATUS_
INI
T
:
if
(
newStatus
!=
JOB_TASK_STATUS_DROP
&&
newStatus
!=
JOB_TASK_STATUS_EXEC
&&
newStatus
!=
JOB_TASK_STATUS_FAIL
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_EXEC
UTING
:
if
(
newStatus
!=
JOB_TASK_STATUS_PART
IAL_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_FAIL
ED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
case
JOB_TASK_STATUS_EXEC
:
if
(
newStatus
!=
JOB_TASK_STATUS_PART
_SUCC
&&
newStatus
!=
JOB_TASK_STATUS_SUCC
&&
newStatus
!=
JOB_TASK_STATUS_FAIL
&&
newStatus
!=
JOB_TASK_STATUS_DROP
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_PART
IAL_SUCCEED
:
if
(
newStatus
!=
JOB_TASK_STATUS_EXEC
UTING
&&
newStatus
!=
JOB_TASK_STATUS_SUCCEED
&&
newStatus
!=
JOB_TASK_STATUS_FAIL
ED
&&
newStatus
!=
JOB_TASK_STATUS_DROPPING
)
{
case
JOB_TASK_STATUS_PART
_SUCC
:
if
(
newStatus
!=
JOB_TASK_STATUS_EXEC
&&
newStatus
!=
JOB_TASK_STATUS_SUCC
&&
newStatus
!=
JOB_TASK_STATUS_FAIL
&&
newStatus
!=
JOB_TASK_STATUS_DROP
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_SUCC
EED
:
if
(
newStatus
!=
JOB_TASK_STATUS_DROP
PING
&&
newStatus
!=
JOB_TASK_STATUS_FAILED
)
{
case
JOB_TASK_STATUS_SUCC
:
if
(
newStatus
!=
JOB_TASK_STATUS_DROP
&&
newStatus
!=
JOB_TASK_STATUS_FAIL
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_FAIL
ED
:
if
(
newStatus
!=
JOB_TASK_STATUS_DROP
PING
)
{
case
JOB_TASK_STATUS_FAIL
:
if
(
newStatus
!=
JOB_TASK_STATUS_DROP
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_DROP
PING
:
if
(
newStatus
!=
JOB_TASK_STATUS_FAIL
ED
&&
newStatus
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
case
JOB_TASK_STATUS_DROP
:
if
(
newStatus
!=
JOB_TASK_STATUS_FAIL
&&
newStatus
!=
JOB_TASK_STATUS_PART_SUCC
)
{
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
break
;
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
8b896332
...
...
@@ -206,7 +206,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
QW_TASK_DLOG_E
(
"no data in sink and query end"
);
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_SUCC
EED
);
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_SUCC
);
QW_ERR_RET
(
qwMallocFetchRsp
(
len
,
&
rsp
));
*
rspMsg
=
rsp
;
...
...
@@ -236,7 +236,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
if
(
DS_BUF_EMPTY
==
pOutput
->
bufStatus
&&
pOutput
->
queryEnd
)
{
QW_TASK_DLOG_E
(
"task all data fetched, done"
);
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_SUCC
EED
);
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_SUCC
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -330,7 +330,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
break
;
}
QW_ERR_JRET
(
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_EXEC
UTING
));
QW_ERR_JRET
(
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_EXEC
));
break
;
}
case
QW_PHASE_PRE_FETCH
:
{
...
...
@@ -447,7 +447,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
_return:
if
(
TSDB_CODE_SUCCESS
==
code
&&
QW_PHASE_POST_QUERY
==
phase
)
{
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_PART
IAL_SUCCEED
);
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_PART
_SUCC
);
}
if
(
rspConnection
)
{
...
...
@@ -467,7 +467,7 @@ _return:
}
if
(
code
)
{
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_FAIL
ED
);
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_FAIL
);
}
QW_TASK_DLOG
(
"end to handle event at phase %s, code:%x - %s"
,
qwPhaseStr
(
phase
),
code
,
tstrerror
(
code
));
...
...
@@ -499,7 +499,7 @@ int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
ctx
->
ctrlConnInfo
=
qwMsg
->
connInfo
;
QW_ERR_JRET
(
qwAddTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_
NOT_STAR
T
));
QW_ERR_JRET
(
qwAddTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_
INI
T
));
_return:
...
...
@@ -698,7 +698,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if
(
QW_IS_QUERY_RUNNING
(
ctx
))
{
atomic_store_8
((
int8_t
*
)
&
ctx
->
queryContinue
,
1
);
}
else
if
(
0
==
atomic_load_8
((
int8_t
*
)
&
ctx
->
queryInQueue
))
{
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_EXEC
UTING
);
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_EXEC
);
atomic_store_8
((
int8_t
*
)
&
ctx
->
queryInQueue
,
1
);
...
...
@@ -749,7 +749,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if
(
QW_IS_QUERY_RUNNING
(
ctx
))
{
QW_ERR_JRET
(
qwKillTaskHandle
(
QW_FPARAMS
(),
ctx
));
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_DROP
PING
);
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_DROP
);
}
else
if
(
ctx
->
phase
>
0
)
{
QW_ERR_JRET
(
qwDropTask
(
QW_FPARAMS
()));
rsped
=
true
;
...
...
@@ -770,7 +770,7 @@ _return:
QW_UPDATE_RSP_CODE
(
ctx
,
code
);
}
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_FAIL
ED
);
qwUpdateTaskStatus
(
QW_FPARAMS
(),
JOB_TASK_STATUS_FAIL
);
}
if
(
locked
)
{
...
...
source/libs/scheduler/inc/sch
eduler
Int.h
→
source/libs/scheduler/inc/schInt.h
浏览文件 @
8b896332
...
...
@@ -52,6 +52,7 @@ typedef enum {
SCH_OP_NULL
=
0
,
SCH_OP_EXEC
,
SCH_OP_FETCH
,
SCH_OP_GET_STATUS
,
}
SCH_OP_TYPE
;
typedef
struct
SSchTrans
{
...
...
@@ -97,13 +98,30 @@ typedef struct SSchStat {
}
SSchStat
;
typedef
struct
SSchResInfo
{
S
QueryResult
*
query
Res
;
S
ExecResult
*
exec
Res
;
void
**
fetchRes
;
schedulerExecFp
execFp
;
schedulerFetchFp
fetchFp
;
void
*
user
Param
;
void
*
cb
Param
;
}
SSchResInfo
;
typedef
struct
SSchOpEvent
{
SCH_OP_TYPE
type
;
bool
begin
;
SSchedulerReq
*
pReq
;
}
SSchOpEvent
;
typedef
int32_t
(
*
schStatusEnterFp
)(
void
*
pHandle
,
void
*
pParam
);
typedef
int32_t
(
*
schStatusLeaveFp
)(
void
*
pHandle
,
void
*
pParam
);
typedef
int32_t
(
*
schStatusEventFp
)(
void
*
pHandle
,
void
*
pParam
,
void
*
pEvent
);
typedef
struct
SSchStatusFps
{
EJobTaskType
status
;
schStatusEnterFp
enterFp
;
schStatusLeaveFp
leaveFp
;
schStatusEventFp
eventFp
;
}
SSchStatusFps
;
typedef
struct
SSchedulerMgmt
{
uint64_t
taskId
;
// sequential taksId
uint64_t
sId
;
// schedulerId
...
...
@@ -157,7 +175,7 @@ typedef struct SSchLevel {
int32_t
taskNum
;
int32_t
taskLaunchedNum
;
int32_t
taskDoneNum
;
SArray
*
subTasks
;
// Element is S
Query
Task
SArray
*
subTasks
;
// Element is S
Sch
Task
}
SSchLevel
;
typedef
struct
SSchTaskProfile
{
...
...
@@ -195,12 +213,13 @@ typedef struct SSchTask {
typedef
struct
SSchJobAttr
{
EExplainMode
explainMode
;
bool
queryJob
;
bool
needFetch
;
bool
needFlowCtrl
;
}
SSchJobAttr
;
typedef
struct
{
int32_t
op
;
bool
sync
;
bool
sync
Req
;
}
SSchOpStatus
;
typedef
struct
SSchJob
{
...
...
@@ -231,7 +250,7 @@ typedef struct SSchJob {
SSchTask
*
fetchTask
;
int32_t
errCode
;
SRWLatch
resLock
;
S
QueryExecRes
execRes
;
S
ExecResult
execRes
;
void
*
resData
;
//TODO free it or not
int32_t
resNumOfRows
;
SSchResInfo
userRes
;
...
...
@@ -292,19 +311,19 @@ extern SSchedulerMgmt schMgmt;
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
#define SCH_GET_JOB_STATUS_STR(job) jobTaskStatusStr(SCH_GET_JOB_STATUS(job))
#define SCH_JOB_IN_SYNC_OP(job) ((job)->opStatus.op && (job)->opStatus.sync)
#define SCH_JOB_IN_ASYNC_EXEC_OP(job) ((
(job)->opStatus.op == SCH_OP_EXEC) && (!(job)->opStatus.sync
))
#define SCH_JOB_IN_ASYNC_FETCH_OP(job) ((
(job)->opStatus.op == SCH_OP_FETCH) && (!(job)->opStatus.sync
))
#define SCH_JOB_IN_SYNC_OP(job) ((job)->opStatus.op && (job)->opStatus.sync
Req
)
#define SCH_JOB_IN_ASYNC_EXEC_OP(job) ((
SCH_OP_EXEC == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_EXEC, SCH_OP_NULL)) && (!(job)->opStatus.syncReq
))
#define SCH_JOB_IN_ASYNC_FETCH_OP(job) ((
SCH_OP_FETCH == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_FETCH, SCH_OP_NULL)) && (!(job)->opStatus.syncReq
))
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_SET_JOB_TYPE(_job, type)
(_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY
)
#define SCH_SET_JOB_TYPE(_job, type)
do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0
)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
#define SCH_JOB_NEED_FETCH(_job)
SCH_IS_QUERY_JOB(_job
)
#define SCH_
IS_WAIT_ALL_JOB
(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_
IS_NEED_DROP_JOB
(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_JOB_NEED_FETCH(_job)
((_job)->attr.needFetch
)
#define SCH_
JOB_NEED_WAIT
(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_
JOB_NEED_DROP
(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0))
...
...
@@ -329,9 +348,10 @@ extern SSchedulerMgmt schMgmt;
#define SCH_TASK_WLOG(param, ...) \
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define SCH_SET_ERRNO(_err) do { if (TSDB_CODE_SCH_IGNORE_ERROR != (_err)) { terrno = (_err); } } while (0)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); } return _code; } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(code); goto _return; } } while (0)
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
...
...
@@ -349,7 +369,7 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t
schCheckIncTaskFlowQuota
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
bool
*
enough
);
int32_t
schLaunchTasksInFlowCtrlList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schLaunchTaskImpl
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
sch
FetchFromRemote
(
SSchJob
*
pJob
);
int32_t
sch
LaunchFetchTask
(
SSchJob
*
pJob
);
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
);
int32_t
schBuildAndSendHbMsg
(
SQueryNodeEpId
*
nodeEpId
,
SArray
*
taskAction
);
int32_t
schCloneSMsgSendInfo
(
void
*
src
,
void
**
dst
);
...
...
@@ -371,25 +391,45 @@ void schFreeRpcCtxVal(const void *arg);
int32_t
schMakeBrokenLinkVal
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRpcBrokenlinkVal
*
brokenVal
,
bool
isHb
);
int32_t
schAppendTaskExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryNodeAddr
*
addr
,
int32_t
execId
);
int32_t
schExecStaticExplainJob
(
SSchedulerReq
*
pReq
,
int64_t
*
job
,
bool
sync
);
int32_t
schExecJobImpl
(
SSchedulerReq
*
pReq
,
SSchJob
*
pJob
,
bool
sync
);
int32_t
schUpdateJobStatus
(
SSchJob
*
pJob
,
int8_t
newStatus
);
int32_t
schCancelJob
(
SSchJob
*
pJob
);
int32_t
schProcessOnJobDropped
(
SSchJob
*
pJob
,
int32_t
errCode
);
uint64_t
schGenTaskId
(
void
);
void
schCloseJobRef
(
void
);
int32_t
schExecJob
(
SSchedulerReq
*
pReq
,
int64_t
*
pJob
,
SQueryResult
*
pRes
);
int32_t
schAsyncExecJob
(
SSchedulerReq
*
pReq
,
int64_t
*
pJob
);
int32_t
schFetchRows
(
SSchJob
*
pJob
);
int32_t
sch
AsyncFetchRows
(
SSchJob
*
pJob
);
int32_t
sch
Job
FetchRows
(
SSchJob
*
pJob
);
int32_t
sch
JobFetchRowsA
(
SSchJob
*
pJob
);
int32_t
schUpdateTaskHandle
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
bool
dropExecNode
,
void
*
handle
,
int32_t
execId
);
int32_t
schProcessOnTaskStatusRsp
(
SQueryNodeEpId
*
pEpId
,
SArray
*
pStatusList
);
void
schFreeSMsgSendInfo
(
SMsgSendInfo
*
msgSendInfo
);
char
*
schGetOpStr
(
SCH_OP_TYPE
type
);
int32_t
schBeginOperation
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
bool
sync
);
int32_t
schInitJob
(
SSchedulerReq
*
pReq
,
SSchJob
**
pSchJob
);
int32_t
schSetJobQueryRes
(
SSchJob
*
pJob
,
SQueryResult
*
pRes
);
int32_t
schInitJob
(
int64_t
*
pJobId
,
SSchedulerReq
*
pReq
);
int32_t
schExecJob
(
SSchJob
*
pJob
,
SSchedulerReq
*
pReq
);
int32_t
schDumpJobExecRes
(
SSchJob
*
pJob
,
SExecResult
*
pRes
);
int32_t
schUpdateTaskCandidateAddr
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SEpSet
*
pEpSet
);
int32_t
schHandleRedirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
);
void
schProcessOnOpEnd
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
,
int32_t
errCode
);
int32_t
schProcessOnOpBegin
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
);
void
schProcessOnCbEnd
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
);
int32_t
schProcessOnCbBegin
(
SSchJob
**
job
,
SSchTask
**
task
,
uint64_t
qId
,
int64_t
rId
,
uint64_t
tId
);
void
schDropTaskOnExecNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
bool
schJobDone
(
SSchJob
*
pJob
);
int32_t
schRemoveTaskFromExecList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schLaunchJobLowerLevel
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schSwitchJobStatus
(
SSchJob
*
pJob
,
int32_t
status
,
void
*
param
);
int32_t
schHandleOpBeginEvent
(
int64_t
jobId
,
SSchJob
**
job
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
);
int32_t
schHandleOpEndEvent
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
,
int32_t
errCode
);
int32_t
schHandleTaskRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
void
schUpdateJobErrCode
(
SSchJob
*
pJob
,
int32_t
errCode
);
int32_t
schTaskCheckSetRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
,
bool
*
needRetry
);
int32_t
schProcessOnJobFailure
(
SSchJob
*
pJob
,
int32_t
errCode
);
int32_t
schProcessOnJobPartialSuccess
(
SSchJob
*
pJob
);
void
schFreeTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
void
schDropTaskInHashList
(
SSchJob
*
pJob
,
SHashObj
*
list
);
int32_t
schLaunchLevelTasks
(
SSchJob
*
pJob
,
SSchLevel
*
level
);
int32_t
schGetTaskFromList
(
SHashObj
*
pTaskList
,
uint64_t
taskId
,
SSchTask
**
pTask
);
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
);
#ifdef __cplusplus
...
...
source/libs/scheduler/src/schDbg.c
浏览文件 @
8b896332
...
...
@@ -14,16 +14,16 @@
*/
#include "query.h"
#include "sch
eduler
Int.h"
#include "schInt.h"
tsem_t
schdRspSem
;
void
schdExecCallback
(
S
Query
Result
*
pResult
,
void
*
param
,
int32_t
code
)
{
void
schdExecCallback
(
S
Exec
Result
*
pResult
,
void
*
param
,
int32_t
code
)
{
if
(
code
)
{
pResult
->
code
=
code
;
}
*
(
S
Query
Result
*
)
param
=
*
pResult
;
*
(
S
Exec
Result
*
)
param
=
*
pResult
;
taosMemoryFree
(
pResult
);
...
...
source/libs/scheduler/src/schFlowCtrl.c
浏览文件 @
8b896332
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sch
eduler
Int.h"
#include "schInt.h"
#include "tmsg.h"
#include "query.h"
#include "catalog.h"
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
8b896332
此差异已折叠。
点击以展开。
source/libs/scheduler/src/schRemote.c
浏览文件 @
8b896332
...
...
@@ -16,7 +16,7 @@
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "sch
eduler
Int.h"
#include "schInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
...
...
@@ -37,7 +37,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
TMSG_INFO
(
msgType
));
}
if
(
taskStatus
!=
JOB_TASK_STATUS_EXEC
UTING
&&
taskStatus
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
if
(
taskStatus
!=
JOB_TASK_STATUS_EXEC
&&
taskStatus
!=
JOB_TASK_STATUS_PART_SUCC
)
{
SCH_TASK_DLOG
(
"rsp msg conflicted with task status, status:%s, rspType:%s"
,
jobTaskStatusStr
(
taskStatus
),
TMSG_INFO
(
msgType
));
}
...
...
@@ -51,7 +51,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
taskStatus
!=
JOB_TASK_STATUS_EXEC
UTING
&&
taskStatus
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
if
(
taskStatus
!=
JOB_TASK_STATUS_EXEC
&&
taskStatus
!=
JOB_TASK_STATUS_PART_SUCC
)
{
SCH_TASK_ELOG
(
"rsp msg conflicted with task status, status:%s, rspType:%s"
,
jobTaskStatusStr
(
taskStatus
),
TMSG_INFO
(
msgType
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
...
...
@@ -76,7 +76,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
if
(
taskStatus
!=
JOB_TASK_STATUS_EXEC
UTING
&&
taskStatus
!=
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
if
(
taskStatus
!=
JOB_TASK_STATUS_EXEC
&&
taskStatus
!=
JOB_TASK_STATUS_PART_SUCC
)
{
SCH_TASK_ELOG
(
"rsp msg conflicted with task status, status:%s, rspType:%s"
,
jobTaskStatusStr
(
taskStatus
),
TMSG_INFO
(
msgType
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
...
...
@@ -88,9 +88,21 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
}
// Note: no more task error processing, handled in function internal
int32_t
schHandleResponseMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
)
{
int32_t
schHandleResponseMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
execId
,
SDataBuf
*
pMsg
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
char
*
msg
=
pMsg
->
pData
;
int32_t
msgSize
=
pMsg
->
len
;
int32_t
msgType
=
pMsg
->
msgType
;
bool
dropExecNode
=
(
msgType
==
TDMT_SCH_LINK_BROKEN
||
SCH_NETWORK_ERR
(
rspCode
));
SCH_ERR_JRET
(
schUpdateTaskHandle
(
pJob
,
pTask
,
dropExecNode
,
pMsg
->
handle
,
execId
));
SCH_ERR_JRET
(
schValidateReceivedMsgType
(
pJob
,
pTask
,
msgType
));
int32_t
reqType
=
IsReq
(
pMsg
)
?
pMsg
->
msgType
:
(
pMsg
->
msgType
-
1
);
if
(
SCH_NEED_REDIRECT
(
reqType
,
rspCode
,
pMsg
->
len
))
{
SCH_RET
(
schHandleRedirect
(
pJob
,
pTask
,
(
SDataBuf
*
)
pMsg
,
rspCode
));
}
switch
(
msgType
)
{
case
TDMT_VND_CREATE_TABLE_RSP
:
{
...
...
@@ -308,7 +320,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
return
TSDB_CODE_SUCCESS
;
}
SCH_ERR_JRET
(
sch
FetchFromRemote
(
pJob
));
SCH_ERR_JRET
(
sch
LaunchFetchTask
(
pJob
));
taosMemoryFreeClear
(
msg
);
...
...
@@ -325,7 +337,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
atomic_add_fetch_32
(
&
pJob
->
resNumOfRows
,
htonl
(
rsp
->
numOfRows
));
if
(
rsp
->
completed
)
{
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_SUCC
EED
);
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_SUCC
);
}
SCH_TASK_DLOG
(
"got fetch rsp, rows:%d, complete:%d"
,
htonl
(
rsp
->
numOfRows
),
rsp
->
completed
);
...
...
@@ -362,65 +374,24 @@ _return:
int32_t
schHandleCallback
(
void
*
param
,
SDataBuf
*
pMsg
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
int32_t
msgType
=
pMsg
->
msgType
;
SSchTaskCallbackParam
*
pParam
=
(
SSchTaskCallbackParam
*
)
param
;
SSchTask
*
pTask
=
NULL
;
SSchJob
*
pJob
=
NULL
;
SSchJob
*
pJob
=
schAcquireJob
(
pParam
->
refId
);
if
(
NULL
==
pJob
)
{
qWarn
(
"QID:0x%"
PRIx64
",TID:0x%"
PRIx64
"taosAcquireRef job failed, may be dropped, refId:0x%"
PRIx64
,
pParam
->
queryId
,
pParam
->
taskId
,
pParam
->
refId
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_JOB_FREED
);
}
SCH_ERR_JRET
(
schGetTaskInJob
(
pJob
,
pParam
->
taskId
,
&
pTask
));
SCH_LOCK_TASK
(
pTask
);
SCH_TASK_DLOG
(
"rsp msg received, type:%s, handle:%p, code:%s"
,
TMSG_INFO
(
msgType
),
pMsg
->
handle
,
tstrerror
(
rspCode
));
if
(
pParam
->
execId
!=
pTask
->
execId
)
{
SCH_TASK_DLOG
(
"execId %d mis-match current execId %d"
,
pParam
->
execId
,
pTask
->
execId
);
goto
_return
;
}
bool
dropExecNode
=
(
msgType
==
TDMT_SCH_LINK_BROKEN
||
SCH_NETWORK_ERR
(
rspCode
));
SCH_ERR_JRET
(
schUpdateTaskHandle
(
pJob
,
pTask
,
dropExecNode
,
pMsg
->
handle
,
pParam
->
execId
));
int8_t
status
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_TASK_ELOG
(
"rsp will not be processed cause of job status %s, rspCode:0x%x"
,
jobTaskStatusStr
(
status
),
rspCode
);
code
=
atomic_load_32
(
&
pJob
->
errCode
);
goto
_return
;
}
SCH_ERR_JRET
(
schValidateReceivedMsgType
(
pJob
,
pTask
,
msgType
));
qDebug
(
"begin to handle rsp msg, type:%s, handle:%p, code:%s"
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
handle
,
tstrerror
(
rspCode
));
int32_t
reqType
=
IsReq
(
pMsg
)
?
pMsg
->
msgType
:
(
pMsg
->
msgType
-
1
);
if
(
SCH_NEED_REDIRECT
(
reqType
,
rspCode
,
pMsg
->
len
))
{
code
=
schHandleRedirect
(
pJob
,
pTask
,
(
SDataBuf
*
)
pMsg
,
rspCode
);
goto
_return
;
}
SCH_ERR_RET
(
schProcessOnCbBegin
(
&
pJob
,
&
pTask
,
pParam
->
queryId
,
pParam
->
refId
,
pParam
->
taskId
));
schHandleResponseMsg
(
pJob
,
pTask
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
);
code
=
schHandleResponseMsg
(
pJob
,
pTask
,
pParam
->
execId
,
pMsg
,
rspCode
);
pMsg
->
pData
=
NULL
;
_return:
if
(
pTask
)
{
if
(
code
)
{
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
);
}
SCH_UNLOCK_TASK
(
pTask
);
}
if
(
pJob
)
{
schReleaseJob
(
pParam
->
refId
);
}
schProcessOnCbEnd
(
pJob
,
pTask
,
code
);
taosMemoryFreeClear
(
pMsg
->
pData
);
taosMemoryFreeClear
(
param
);
qDebug
(
"end to handle rsp msg, type:%s, handle:%p, code:%s"
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
handle
,
tstrerror
(
rspCode
));
SCH_RET
(
code
);
}
...
...
@@ -451,6 +422,37 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
}
int32_t
schHandleHbCallback
(
void
*
param
,
SDataBuf
*
pMsg
,
int32_t
code
)
{
SSchedulerHbRsp
rsp
=
{
0
};
SSchTaskCallbackParam
*
pParam
=
(
SSchTaskCallbackParam
*
)
param
;
if
(
code
)
{
qError
(
"hb rsp error:%s"
,
tstrerror
(
code
));
SCH_ERR_JRET
(
code
);
}
if
(
tDeserializeSSchedulerHbRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
rsp
))
{
qError
(
"invalid hb rsp msg, size:%d"
,
pMsg
->
len
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SSchTrans
trans
=
{
0
};
trans
.
pTrans
=
pParam
->
pTrans
;
trans
.
pHandle
=
pMsg
->
handle
;
SCH_ERR_JRET
(
schUpdateHbConnection
(
&
rsp
.
epId
,
&
trans
));
SCH_ERR_JRET
(
schProcessOnTaskStatusRsp
(
&
rsp
.
epId
,
rsp
.
taskStatus
));
_return:
tFreeSSchedulerHbRsp
(
&
rsp
);
taosMemoryFree
(
param
);
SCH_RET
(
code
);
}
int32_t
schMakeCallbackParam
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
msgType
,
bool
isHb
,
SSchTrans
*
trans
,
void
**
pParam
)
{
if
(
!
isHb
)
{
SSchTaskCallbackParam
*
param
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchTaskCallbackParam
));
...
...
@@ -692,36 +694,6 @@ _return:
SCH_RET
(
code
);
}
int32_t
schHandleHbCallback
(
void
*
param
,
SDataBuf
*
pMsg
,
int32_t
code
)
{
SSchedulerHbRsp
rsp
=
{
0
};
SSchTaskCallbackParam
*
pParam
=
(
SSchTaskCallbackParam
*
)
param
;
if
(
code
)
{
qError
(
"hb rsp error:%s"
,
tstrerror
(
code
));
SCH_ERR_JRET
(
code
);
}
if
(
tDeserializeSSchedulerHbRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
rsp
))
{
qError
(
"invalid hb rsp msg, size:%d"
,
pMsg
->
len
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SSchTrans
trans
=
{
0
};
trans
.
pTrans
=
pParam
->
pTrans
;
trans
.
pHandle
=
pMsg
->
handle
;
SCH_ERR_JRET
(
schUpdateHbConnection
(
&
rsp
.
epId
,
&
trans
));
SCH_ERR_JRET
(
schProcessOnTaskStatusRsp
(
&
rsp
.
epId
,
rsp
.
taskStatus
));
_return:
tFreeSSchedulerHbRsp
(
&
rsp
);
taosMemoryFree
(
param
);
SCH_RET
(
code
);
}
int32_t
schMakeBrokenLinkVal
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRpcBrokenlinkVal
*
brokenVal
,
bool
isHb
)
{
int32_t
code
=
0
;
int32_t
msgType
=
TDMT_SCH_LINK_BROKEN
;
...
...
source/libs/scheduler/src/schStatus.c
0 → 100644
浏览文件 @
8b896332
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "schInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
int32_t
schSwitchJobStatus
(
SSchJob
*
pJob
,
int32_t
status
,
void
*
param
)
{
int32_t
code
=
0
;
SCH_ERR_JRET
(
schUpdateJobStatus
(
pJob
,
status
));
switch
(
status
)
{
case
JOB_TASK_STATUS_INIT
:
break
;
case
JOB_TASK_STATUS_EXEC
:
SCH_ERR_JRET
(
schExecJob
(
pJob
,
(
SSchedulerReq
*
)
param
));
break
;
case
JOB_TASK_STATUS_PART_SUCC
:
SCH_ERR_JRET
(
schProcessOnJobPartialSuccess
(
pJob
));
break
;
case
JOB_TASK_STATUS_SUCC
:
break
;
case
JOB_TASK_STATUS_FAIL
:
SCH_RET
(
schProcessOnJobFailure
(
pJob
,
(
param
?
*
(
int32_t
*
)
param
:
0
)));
break
;
case
JOB_TASK_STATUS_DROP
:
SCH_ERR_JRET
(
schProcessOnJobDropped
(
pJob
,
*
(
int32_t
*
)
param
));
if
(
taosRemoveRef
(
schMgmt
.
jobRef
,
pJob
->
refId
))
{
SCH_JOB_ELOG
(
"remove job from job list failed, refId:0x%"
PRIx64
,
pJob
->
refId
);
}
else
{
SCH_JOB_DLOG
(
"job removed from jobRef list, refId:0x%"
PRIx64
,
pJob
->
refId
);
}
break
;
default:
{
SCH_JOB_ELOG
(
"unknown job status %d"
,
status
);
SCH_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
}
return
TSDB_CODE_SUCCESS
;
_return:
SCH_RET
(
schProcessOnJobFailure
(
pJob
,
code
));
}
int32_t
schHandleOpBeginEvent
(
int64_t
jobId
,
SSchJob
**
job
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
)
{
SSchJob
*
pJob
=
schAcquireJob
(
jobId
);
if
(
NULL
==
pJob
)
{
qError
(
"Acquire sch job failed, may be dropped, jobId:0x%"
PRIx64
,
jobId
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
*
job
=
pJob
;
SCH_RET
(
schProcessOnOpBegin
(
pJob
,
type
,
pReq
));
}
int32_t
schHandleOpEndEvent
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
,
int32_t
errCode
)
{
int32_t
code
=
errCode
;
if
(
NULL
==
pJob
)
{
SCH_RET
(
code
);
}
schProcessOnOpEnd
(
pJob
,
type
,
pReq
,
errCode
);
if
(
TSDB_CODE_SCH_IGNORE_ERROR
==
errCode
)
{
code
=
pJob
->
errCode
;
}
schReleaseJob
(
pJob
->
refId
);
return
code
;
}
source/libs/scheduler/src/schTask.c
0 → 100644
浏览文件 @
8b896332
此差异已折叠。
点击以展开。
source/libs/scheduler/src/schUtil.c
浏览文件 @
8b896332
...
...
@@ -16,11 +16,25 @@
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "sch
eduler
Int.h"
#include "schInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
FORCE_INLINE
SSchJob
*
schAcquireJob
(
int64_t
refId
)
{
qDebug
(
"sch acquire jobId:0x%"
PRIx64
,
refId
);
return
(
SSchJob
*
)
taosAcquireRef
(
schMgmt
.
jobRef
,
refId
);
}
FORCE_INLINE
int32_t
schReleaseJob
(
int64_t
refId
)
{
if
(
0
==
refId
)
{
return
TSDB_CODE_SUCCESS
;
}
qDebug
(
"sch release jobId:0x%"
PRIx64
,
refId
);
return
taosReleaseRef
(
schMgmt
.
jobRef
,
refId
);
}
char
*
schGetOpStr
(
SCH_OP_TYPE
type
)
{
switch
(
type
)
{
case
SCH_OP_NULL
:
...
...
@@ -29,6 +43,8 @@ char* schGetOpStr(SCH_OP_TYPE type) {
return
"EXEC"
;
case
SCH_OP_FETCH
:
return
"FETCH"
;
case
SCH_OP_GET_STATUS
:
return
"GET STATUS"
;
default:
return
"UNKNOWN"
;
}
...
...
@@ -283,3 +299,20 @@ void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo) {
taosMemoryFree
(
msgSendInfo
);
}
int32_t
schGetTaskFromList
(
SHashObj
*
pTaskList
,
uint64_t
taskId
,
SSchTask
**
pTask
)
{
int32_t
s
=
taosHashGetSize
(
pTaskList
);
if
(
s
<=
0
)
{
return
TSDB_CODE_SUCCESS
;
}
SSchTask
**
task
=
taosHashGet
(
pTaskList
,
&
taskId
,
sizeof
(
taskId
));
if
(
NULL
==
task
||
NULL
==
(
*
task
))
{
return
TSDB_CODE_SUCCESS
;
}
*
pTask
=
*
task
;
return
TSDB_CODE_SUCCESS
;
}
source/libs/scheduler/src/scheduler.c
浏览文件 @
8b896332
...
...
@@ -16,7 +16,7 @@
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "sch
eduler
Int.h"
#include "schInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
...
...
@@ -67,121 +67,45 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
schedulerExecJob
(
SSchedulerReq
*
pReq
,
int64_t
*
pJobId
,
SQueryResult
*
pRes
)
{
qDebug
(
"scheduler
sync exec job start
"
);
int32_t
schedulerExecJob
(
SSchedulerReq
*
pReq
,
int64_t
*
pJobId
)
{
qDebug
(
"scheduler
%s exec job start"
,
pReq
->
syncReq
?
"SYNC"
:
"ASYNC
"
);
int32_t
code
=
0
;
SSchJob
*
pJob
=
NULL
;
SCH_ERR_JRET
(
schInitJob
(
pReq
,
&
pJob
));
*
pJobId
=
pJob
->
refId
;
SCH_ERR_JRET
(
schExecJobImpl
(
pReq
,
pJob
,
true
));
SCH_ERR_JRET
(
schInitJob
(
pJobId
,
pReq
));
_return:
SCH_ERR_JRET
(
schHandleOpBeginEvent
(
*
pJobId
,
&
pJob
,
SCH_OP_EXEC
,
pReq
));
if
(
code
&&
NULL
==
pJob
)
{
qDestroyQueryPlan
(
pReq
->
pDag
);
}
if
(
pJob
)
{
schSetJobQueryRes
(
pJob
,
pRes
);
schReleaseJob
(
pJob
->
refId
);
}
return
code
;
}
SCH_ERR_JRET
(
schSwitchJobStatus
(
pJob
,
JOB_TASK_STATUS_INIT
,
pReq
));
int32_t
schedulerAsyncExecJob
(
SSchedulerReq
*
pReq
,
int64_t
*
pJobId
)
{
qDebug
(
"scheduler async exec job start"
);
int32_t
code
=
0
;
SSchJob
*
pJob
=
NULL
;
SCH_ERR_JRET
(
schInitJob
(
pReq
,
&
pJob
));
*
pJobId
=
pJob
->
refId
;
SCH_ERR_JRET
(
schExecJobImpl
(
pReq
,
pJob
,
false
));
SCH_ERR_JRET
(
schSwitchJobStatus
(
pJob
,
JOB_TASK_STATUS_EXEC
,
pReq
));
_return:
if
(
code
&&
NULL
==
pJob
)
{
qDestroyQueryPlan
(
pReq
->
pDag
);
}
if
(
pJob
)
{
schReleaseJob
(
pJob
->
refId
);
}
return
code
;
SCH_RET
(
schHandleOpEndEvent
(
pJob
,
SCH_OP_EXEC
,
pReq
,
code
));
}
int32_t
schedulerFetchRows
(
int64_t
job
,
void
**
pData
)
{
qDebug
(
"scheduler sync fetch rows start"
);
if
(
NULL
==
pData
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
schedulerFetchRows
(
int64_t
jobId
,
SSchedulerReq
*
pReq
)
{
qDebug
(
"scheduler %s fetch rows start"
,
pReq
->
syncReq
?
"SYNC"
:
"ASYNC"
);
int32_t
code
=
0
;
SSchJob
*
pJob
=
schAcquireJob
(
job
);
if
(
NULL
==
pJob
)
{
qError
(
"acquire job from jobRef list failed, may be dropped, jobId:0x%"
PRIx64
,
job
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SCH_ERR_RET
(
schBeginOperation
(
pJob
,
SCH_OP_FETCH
,
true
));
pJob
->
userRes
.
fetchRes
=
pData
;
code
=
schFetchRows
(
pJob
);
schReleaseJob
(
job
);
SCH_RET
(
code
);
}
void
schedulerAsyncFetchRows
(
int64_t
job
,
schedulerFetchFp
fp
,
void
*
param
)
{
qDebug
(
"scheduler async fetch rows start"
);
SSchJob
*
pJob
=
NULL
;
int32_t
code
=
0
;
if
(
NULL
==
fp
||
NULL
==
param
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SCH_ERR_JRET
(
schHandleOpBeginEvent
(
jobId
,
&
pJob
,
SCH_OP_FETCH
,
pReq
));
SSchJob
*
pJob
=
schAcquireJob
(
job
);
if
(
NULL
==
pJob
)
{
qError
(
"acquire sch job from job list failed, may be dropped, jobId:0x%"
PRIx64
,
job
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SCH_ERR_JRET
(
schBeginOperation
(
pJob
,
SCH_OP_FETCH
,
false
));
pJob
->
userRes
.
fetchFp
=
fp
;
pJob
->
userRes
.
userParam
=
param
;
SCH_ERR_JRET
(
schAsyncFetchRows
(
pJob
));
SCH_ERR_JRET
(
schJobFetchRows
(
pJob
));
_return:
if
(
code
)
{
fp
(
NULL
,
param
,
code
);
}
schReleaseJob
(
job
);
SCH_RET
(
schHandleOpEndEvent
(
pJob
,
SCH_OP_FETCH
,
pReq
,
code
));
}
int32_t
schedulerGetTasksStatus
(
int64_t
job
,
SArray
*
pSub
)
{
int32_t
schedulerGetTasksStatus
(
int64_t
job
Id
,
SArray
*
pSub
)
{
int32_t
code
=
0
;
SSchJob
*
pJob
=
schAcquireJob
(
job
);
if
(
NULL
==
pJob
)
{
qDebug
(
"acquire job from jobRef list failed, may not started or dropped, refId:0x%"
PRIx64
,
job
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SSchJob
*
pJob
=
NULL
;
if
(
pJob
->
status
<
JOB_TASK_STATUS_NOT_START
||
pJob
->
levelNum
<=
0
||
NULL
==
pJob
->
levels
)
{
qDebug
(
"job not initialized or not executable job, refId:0x%"
PRIx64
,
job
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
SCH_ERR_JRET
(
schHandleOpBeginEvent
(
jobId
,
&
pJob
,
SCH_OP_GET_STATUS
,
NULL
));
for
(
int32_t
i
=
pJob
->
levelNum
-
1
;
i
>=
0
;
--
i
)
{
SSchLevel
*
pLevel
=
taosArrayGet
(
pJob
->
levels
,
i
);
...
...
@@ -198,23 +122,7 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
_return:
schReleaseJob
(
job
);
SCH_RET
(
code
);
}
int32_t
scheduleCancelJob
(
int64_t
job
)
{
SSchJob
*
pJob
=
schAcquireJob
(
job
);
if
(
NULL
==
pJob
)
{
qError
(
"acquire job from jobRef list failed, may be dropped, jobId:0x%"
PRIx64
,
job
);
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
int32_t
code
=
schCancelJob
(
pJob
);
schReleaseJob
(
job
);
SCH_RET
(
code
);
SCH_RET
(
schHandleOpEndEvent
(
pJob
,
SCH_OP_GET_STATUS
,
NULL
,
code
));
}
void
schedulerStopQueryHb
(
void
*
pTrans
)
{
...
...
@@ -225,33 +133,23 @@ void schedulerStopQueryHb(void *pTrans) {
schCleanClusterHb
(
pTrans
);
}
void
schedulerFreeJob
(
int64_t
*
job
,
int32_t
errCode
)
{
if
(
0
==
*
job
)
{
void
schedulerFreeJob
(
int64_t
*
job
Id
,
int32_t
errCode
)
{
if
(
0
==
*
job
Id
)
{
return
;
}
SSchJob
*
pJob
=
schAcquireJob
(
*
job
);
SSchJob
*
pJob
=
schAcquireJob
(
*
job
Id
);
if
(
NULL
==
pJob
)
{
qError
(
"acquire sch job failed, may be dropped, jobId:0x%"
PRIx64
,
*
job
);
*
job
=
0
;
qError
(
"Acquire sch job failed, may be dropped, jobId:0x%"
PRIx64
,
*
jobId
);
return
;
}
int32_t
code
=
schProcessOnJobDropped
(
pJob
,
errCode
);
if
(
TSDB_CODE_SCH_JOB_IS_DROPPING
==
code
)
{
SCH_JOB_DLOG
(
"sch job is already dropping, refId:0x%"
PRIx64
,
*
job
);
*
job
=
0
;
if
(
schJobDone
(
pJob
))
{
return
;
}
SCH_JOB_DLOG
(
"start to remove job from jobRef list, refId:0x%"
PRIx64
,
*
job
);
if
(
taosRemoveRef
(
schMgmt
.
jobRef
,
*
job
))
{
SCH_JOB_ELOG
(
"remove job from job list failed, refId:0x%"
PRIx64
,
*
job
);
}
schReleaseJob
(
*
job
);
*
job
=
0
;
schSwitchJobStatus
(
pJob
,
JOB_TASK_STATUS_DROP
,
(
void
*
)
&
errCode
);
*
jobId
=
0
;
}
void
schedulerDestroy
(
void
)
{
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
8b896332
...
...
@@ -50,7 +50,7 @@
#pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat"
#include "sch
eduler
Int.h"
#include "schInt.h"
#include "stub.h"
#include "tref.h"
...
...
@@ -87,7 +87,7 @@ void schtInitLogFile() {
}
void
schtQueryCb
(
S
Query
Result
*
pResult
,
void
*
param
,
int32_t
code
)
{
void
schtQueryCb
(
S
Exec
Result
*
pResult
,
void
*
param
,
int32_t
code
)
{
assert
(
TSDB_CODE_SUCCESS
==
code
);
*
(
int32_t
*
)
param
=
1
;
}
...
...
@@ -507,14 +507,15 @@ void* schtRunJobThread(void *aa) {
SRequestConnInfo
conn
=
{
0
};
conn
.
pTrans
=
mockPointer
;
SSchedulerReq
req
=
{
0
};
req
.
syncReq
=
false
;
req
.
pConn
=
&
conn
;
req
.
pNodeList
=
qnodeList
;
req
.
pDag
=
&
dag
;
req
.
sql
=
"select * from tb"
;
req
.
execFp
=
schtQueryCb
;
req
.
exec
Param
=
&
queryDone
;
req
.
cb
Param
=
&
queryDone
;
code
=
scheduler
Async
ExecJob
(
&
req
,
&
queryJobRefId
);
code
=
schedulerExecJob
(
&
req
,
&
queryJobRefId
);
assert
(
code
==
0
);
pJob
=
schAcquireJob
(
queryJobRefId
);
...
...
@@ -584,7 +585,10 @@ void* schtRunJobThread(void *aa) {
atomic_store_32
(
&
schtStartFetch
,
1
);
void
*
data
=
NULL
;
code
=
schedulerFetchRows
(
queryJobRefId
,
&
data
);
req
.
syncReq
=
true
;
req
.
pFetchRes
=
&
data
;
code
=
schedulerFetchRows
(
queryJobRefId
,
&
req
);
assert
(
code
==
0
||
code
);
if
(
0
==
code
)
{
...
...
@@ -594,7 +598,7 @@ void* schtRunJobThread(void *aa) {
}
data
=
NULL
;
code
=
schedulerFetchRows
(
queryJobRefId
,
&
data
);
code
=
schedulerFetchRows
(
queryJobRefId
,
&
req
);
assert
(
code
==
0
||
code
);
schtFreeQueryJob
(
0
);
...
...
@@ -658,15 +662,15 @@ TEST(queryTest, normalCase) {
SRequestConnInfo
conn
=
{
0
};
conn
.
pTrans
=
mockPointer
;
SSchedulerReq
req
=
{
0
};
SSchedulerReq
req
=
{
0
};
req
.
pConn
=
&
conn
;
req
.
pNodeList
=
qnodeList
;
req
.
pDag
=
&
dag
;
req
.
sql
=
"select * from tb"
;
req
.
execFp
=
schtQueryCb
;
req
.
exec
Param
=
&
queryDone
;
req
.
cb
Param
=
&
queryDone
;
code
=
scheduler
Async
ExecJob
(
&
req
,
&
job
);
code
=
schedulerExecJob
(
&
req
,
&
job
);
ASSERT_EQ
(
code
,
0
);
...
...
@@ -709,7 +713,10 @@ TEST(queryTest, normalCase) {
taosThreadCreate
(
&
(
thread1
),
&
thattr
,
schtCreateFetchRspThread
,
&
job
);
void
*
data
=
NULL
;
code
=
schedulerFetchRows
(
job
,
&
data
);
req
.
syncReq
=
true
;
req
.
pFetchRes
=
&
data
;
code
=
schedulerFetchRows
(
job
,
&
req
);
ASSERT_EQ
(
code
,
0
);
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
data
;
...
...
@@ -718,7 +725,7 @@ TEST(queryTest, normalCase) {
taosMemoryFreeClear
(
data
);
data
=
NULL
;
code
=
schedulerFetchRows
(
job
,
&
data
);
code
=
schedulerFetchRows
(
job
,
&
req
);
ASSERT_EQ
(
code
,
0
);
ASSERT_TRUE
(
data
==
NULL
);
...
...
@@ -768,8 +775,8 @@ TEST(queryTest, readyFirstCase) {
req
.
pDag
=
&
dag
;
req
.
sql
=
"select * from tb"
;
req
.
execFp
=
schtQueryCb
;
req
.
exec
Param
=
&
queryDone
;
code
=
scheduler
Async
ExecJob
(
&
req
,
&
job
);
req
.
cb
Param
=
&
queryDone
;
code
=
schedulerExecJob
(
&
req
,
&
job
);
ASSERT_EQ
(
code
,
0
);
...
...
@@ -813,7 +820,9 @@ TEST(queryTest, readyFirstCase) {
taosThreadCreate
(
&
(
thread1
),
&
thattr
,
schtCreateFetchRspThread
,
&
job
);
void
*
data
=
NULL
;
code
=
schedulerFetchRows
(
job
,
&
data
);
req
.
syncReq
=
true
;
req
.
pFetchRes
=
&
data
;
code
=
schedulerFetchRows
(
job
,
&
req
);
ASSERT_EQ
(
code
,
0
);
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
data
;
...
...
@@ -822,7 +831,7 @@ TEST(queryTest, readyFirstCase) {
taosMemoryFreeClear
(
data
);
data
=
NULL
;
code
=
schedulerFetchRows
(
job
,
&
data
);
code
=
schedulerFetchRows
(
job
,
&
req
);
ASSERT_EQ
(
code
,
0
);
ASSERT_TRUE
(
data
==
NULL
);
...
...
@@ -875,9 +884,9 @@ TEST(queryTest, flowCtrlCase) {
req
.
pDag
=
&
dag
;
req
.
sql
=
"select * from tb"
;
req
.
execFp
=
schtQueryCb
;
req
.
exec
Param
=
&
queryDone
;
req
.
cb
Param
=
&
queryDone
;
code
=
scheduler
Async
ExecJob
(
&
req
,
&
job
);
code
=
schedulerExecJob
(
&
req
,
&
job
);
ASSERT_EQ
(
code
,
0
);
...
...
@@ -925,7 +934,9 @@ TEST(queryTest, flowCtrlCase) {
taosThreadCreate
(
&
(
thread1
),
&
thattr
,
schtCreateFetchRspThread
,
&
job
);
void
*
data
=
NULL
;
code
=
schedulerFetchRows
(
job
,
&
data
);
req
.
syncReq
=
true
;
req
.
pFetchRes
=
&
data
;
code
=
schedulerFetchRows
(
job
,
&
req
);
ASSERT_EQ
(
code
,
0
);
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
data
;
...
...
@@ -934,7 +945,7 @@ TEST(queryTest, flowCtrlCase) {
taosMemoryFreeClear
(
data
);
data
=
NULL
;
code
=
schedulerFetchRows
(
job
,
&
data
);
code
=
schedulerFetchRows
(
job
,
&
req
);
ASSERT_EQ
(
code
,
0
);
ASSERT_TRUE
(
data
==
NULL
);
...
...
@@ -978,7 +989,7 @@ TEST(insertTest, normalCase) {
TdThread
thread1
;
taosThreadCreate
(
&
(
thread1
),
&
thattr
,
schtSendRsp
,
&
insertJobRefId
);
S
Query
Result
res
=
{
0
};
S
Exec
Result
res
=
{
0
};
SRequestConnInfo
conn
=
{
0
};
conn
.
pTrans
=
mockPointer
;
...
...
@@ -988,9 +999,9 @@ TEST(insertTest, normalCase) {
req
.
pDag
=
&
dag
;
req
.
sql
=
"insert into tb values(now,1)"
;
req
.
execFp
=
schtQueryCb
;
req
.
exec
Param
=
NULL
;
req
.
cb
Param
=
NULL
;
code
=
schedulerExecJob
(
&
req
,
&
insertJobRefId
,
&
res
);
code
=
schedulerExecJob
(
&
req
,
&
insertJobRefId
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
res
.
numOfRows
,
20
);
...
...
source/util/src/terror.c
浏览文件 @
8b896332
...
...
@@ -394,6 +394,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPING, "Task dropping")
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_DUPLICATTED_OPERATION
,
"Duplicatted operation"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_MSG_ERROR
,
"Task message error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_JOB_FREED
,
"Job already freed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_JOB_NOT_EXIST
,
"Job not exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_STATUS_ERROR
,
"Task status error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_JSON_IN_ERROR
,
"Json not support in in/notin operator"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR
,
"Json not support in this place"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录