Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c91d8a59
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
c91d8a59
编写于
4月 04, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'main' into feature/3_liaohj
上级
d48be5c0
dace3393
变更
26
隐藏空白更改
内联
并排
Showing
26 changed file
with
344 addition
and
102 deletion
+344
-102
cmake/taostools_CMakeLists.txt.in
cmake/taostools_CMakeLists.txt.in
+1
-1
include/libs/qcom/query.h
include/libs/qcom/query.h
+1
-0
source/client/inc/clientLog.h
source/client/inc/clientLog.h
+1
-0
source/client/inc/clientStmt.h
source/client/inc/clientStmt.h
+4
-0
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+1
-0
source/client/src/clientStmt.c
source/client/src/clientStmt.c
+19
-3
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-1
source/dnode/mnode/impl/src/mndIndex.c
source/dnode/mnode/impl/src/mndIndex.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+4
-9
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+5
-5
source/libs/index/src/indexFstFile.c
source/libs/index/src/indexFstFile.c
+0
-2
source/libs/parser/src/parCalcConst.c
source/libs/parser/src/parCalcConst.c
+6
-0
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+21
-2
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+1
-1
source/libs/qcom/src/queryUtil.c
source/libs/qcom/src/queryUtil.c
+2
-0
source/libs/qworker/src/qwDbg.c
source/libs/qworker/src/qwDbg.c
+14
-3
source/libs/scheduler/inc/schInt.h
source/libs/scheduler/inc/schInt.h
+33
-11
source/libs/scheduler/src/schFlowCtrl.c
source/libs/scheduler/src/schFlowCtrl.c
+0
-1
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+91
-5
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+34
-15
source/libs/scheduler/src/schStatus.c
source/libs/scheduler/src/schStatus.c
+3
-0
source/libs/scheduler/src/schTask.c
source/libs/scheduler/src/schTask.c
+45
-37
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+1
-1
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+2
-0
tests/script/api/batchprepare.c
tests/script/api/batchprepare.c
+50
-1
tests/script/tsim/query/interval-offset.sim
tests/script/tsim/query/interval-offset.sim
+2
-2
未找到文件。
cmake/taostools_CMakeLists.txt.in
浏览文件 @
c91d8a59
...
...
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG
d194dc9
GIT_TAG
273a3fe
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
...
...
include/libs/qcom/query.h
浏览文件 @
c91d8a59
...
...
@@ -33,6 +33,7 @@ typedef enum {
JOB_TASK_STATUS_INIT
,
JOB_TASK_STATUS_EXEC
,
JOB_TASK_STATUS_PART_SUCC
,
JOB_TASK_STATUS_FETCH
,
JOB_TASK_STATUS_SUCC
,
JOB_TASK_STATUS_FAIL
,
JOB_TASK_STATUS_DROP
,
...
...
source/client/inc/clientLog.h
浏览文件 @
c91d8a59
...
...
@@ -26,6 +26,7 @@ extern "C" {
#define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", DEBUG_FATAL, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscError(...) do { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", DEBUG_ERROR, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscWarn(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLog("TSC WARN ", DEBUG_WARN, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscWarnL(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLongString("TSC WARN ", DEBUG_WARN, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscInfo(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", DEBUG_INFO, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", DEBUG_TRACE, cDebugFlag, __VA_ARGS__); }} while(0)
...
...
source/client/inc/clientStmt.h
浏览文件 @
c91d8a59
...
...
@@ -102,6 +102,7 @@ typedef struct STscStmt {
SStmtBindInfo
bInfo
;
int64_t
reqid
;
int32_t
errCode
;
}
STscStmt
;
extern
char
*
gStmtStatusStr
[];
...
...
@@ -121,6 +122,7 @@ extern char *gStmtStatusStr[];
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
pStmt->errCode = _code; \
return _code; \
} \
} while (0)
...
...
@@ -129,6 +131,7 @@ extern char *gStmtStatusStr[];
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
pStmt->errCode = _code; \
} \
return _code; \
} while (0)
...
...
@@ -137,6 +140,7 @@ extern char *gStmtStatusStr[];
code = c; \
if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \
pStmt->errCode = code; \
goto _return; \
} \
} while (0)
...
...
source/client/src/clientEnv.c
浏览文件 @
c91d8a59
...
...
@@ -103,6 +103,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
if
(
duration
>=
SLOW_QUERY_INTERVAL
)
{
atomic_add_fetch_64
((
int64_t
*
)
&
pActivity
->
numOfSlowQueries
,
1
);
tscWarnL
(
"slow query: %s, duration:%"
PRId64
,
pRequest
->
sqlstr
,
duration
);
}
releaseTscObj
(
pTscObj
->
id
);
...
...
source/client/src/clientStmt.c
浏览文件 @
c91d8a59
...
...
@@ -32,8 +32,14 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
STMT_LOG_SEQ
(
newStatus
);
}
if
(
pStmt
->
errCode
&&
newStatus
!=
STMT_PREPARE
)
{
STMT_DLOG
(
"stmt already failed with err: %s"
,
tstrerror
(
pStmt
->
errCode
));
return
pStmt
->
errCode
;
}
switch
(
newStatus
)
{
case
STMT_PREPARE
:
pStmt
->
errCode
=
0
;
break
;
case
STMT_SETTBNAME
:
if
(
STMT_STATUS_EQ
(
INIT
)
||
STMT_STATUS_EQ
(
BIND
)
||
STMT_STATUS_EQ
(
BIND_COL
))
{
...
...
@@ -197,7 +203,10 @@ int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHa
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
*
pVgHash
=
pStmt
->
sql
.
pVgHash
;
pStmt
->
sql
.
pVgHash
=
NULL
;
*
pBlockHash
=
pStmt
->
exec
.
pBlockHash
;
pStmt
->
exec
.
pBlockHash
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -325,6 +334,8 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
}
int32_t
stmtCleanSQLInfo
(
STscStmt
*
pStmt
)
{
STMT_DLOG_E
(
"start to free SQL info"
);
taosMemoryFree
(
pStmt
->
sql
.
queryRes
.
fields
);
taosMemoryFree
(
pStmt
->
sql
.
queryRes
.
userFields
);
taosMemoryFree
(
pStmt
->
sql
.
sqlStr
);
...
...
@@ -351,6 +362,8 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
memset
(
&
pStmt
->
sql
,
0
,
sizeof
(
pStmt
->
sql
));
STMT_DLOG_E
(
"end to free SQL info"
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -441,11 +454,10 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
.
mgmtEps
=
getEpSet_s
(
&
pStmt
->
taos
->
pAppInfo
->
mgmtEp
)};
int32_t
code
=
catalogGetTableMeta
(
pStmt
->
pCatalog
,
&
conn
,
&
pStmt
->
bInfo
.
sname
,
&
pTableMeta
);
if
(
TSDB_CODE_PAR_TABLE_NOT_EXIST
==
code
)
{
STMT_ERR_RET
(
stmtCleanBindInfo
(
pStmt
));
tscDebug
(
"tb %s not exist"
,
pStmt
->
bInfo
.
tbFName
);
stmtCleanBindInfo
(
pStmt
);
return
TSDB_CODE_SUCCESS
;
STMT_ERR_RET
(
code
)
;
}
STMT_ERR_RET
(
code
);
...
...
@@ -922,9 +934,13 @@ _return:
int
stmtClose
(
TAOS_STMT
*
stmt
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STMT_DLOG_E
(
"start to free stmt"
);
stmtCleanSQLInfo
(
pStmt
);
taosMemoryFree
(
stmt
);
STMT_DLOG_E
(
"stmt freed"
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
c91d8a59
...
...
@@ -392,7 +392,7 @@ typedef struct {
}
SSmaObj
;
typedef
struct
{
char
name
[
TSDB_
TABLE
_FNAME_LEN
];
char
name
[
TSDB_
INDEX
_FNAME_LEN
];
char
stb
[
TSDB_TABLE_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
char
dstTbName
[
TSDB_TABLE_FNAME_LEN
];
...
...
source/dnode/mnode/impl/src/mndIndex.c
浏览文件 @
c91d8a59
...
...
@@ -138,7 +138,7 @@ static void *mndBuildDropIdxReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStbOb
mInfo
(
"idx: %s start to build drop index req"
,
pIdx
->
name
);
len
=
tSerializeSDropIdxReq
(
NULL
,
0
,
&
req
);
if
(
ret
<
0
)
{
if
(
len
<
0
)
{
goto
_err
;
}
...
...
@@ -672,7 +672,7 @@ _OVER:
static
int32_t
mndAddIndex
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SCreateTagIndexReq
*
req
,
SDbObj
*
pDb
,
SStbObj
*
pStb
)
{
int32_t
code
=
-
1
;
SIdxObj
idxObj
=
{
0
};
memcpy
(
idxObj
.
name
,
req
->
idxName
,
TSDB_
TABLE
_FNAME_LEN
);
memcpy
(
idxObj
.
name
,
req
->
idxName
,
TSDB_
INDEX
_FNAME_LEN
);
memcpy
(
idxObj
.
stb
,
pStb
->
name
,
TSDB_TABLE_FNAME_LEN
);
memcpy
(
idxObj
.
db
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
memcpy
(
idxObj
.
colName
,
req
->
colName
,
TSDB_COL_NAME_LEN
);
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
c91d8a59
...
...
@@ -3905,7 +3905,6 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
return
code
;
}
pSchema
=
doGetSchemaForTSRow
(
TSDBROW_SVERSION
(
pRow
),
pReader
,
pBlockScanInfo
->
uid
);
tsdbRowMergerAdd
(
&
merge
,
pRow
,
pSchema
);
code
=
doMergeRowsInBuf
(
&
pBlockScanInfo
->
iter
,
pBlockScanInfo
->
uid
,
k
.
ts
,
pBlockScanInfo
->
delSkyline
,
&
merge
,
pReader
);
...
...
@@ -4246,10 +4245,6 @@ static void freeSchemaFunc(void* param) {
int32_t
tsdbReaderOpen
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
void
*
pTableList
,
int32_t
numOfTables
,
SSDataBlock
*
pResBlock
,
STsdbReader
**
ppReader
,
const
char
*
idstr
,
bool
countOnly
)
{
STimeWindow
window
=
pCond
->
twindows
;
if
(
pCond
->
type
==
TIMEWINDOW_RANGE_EXTERNAL
)
{
pCond
->
twindows
.
skey
+=
1
;
pCond
->
twindows
.
ekey
-=
1
;
}
int32_t
capacity
=
pVnode
->
config
.
tsdbCfg
.
maxRows
;
if
(
pResBlock
!=
NULL
)
{
...
...
@@ -4272,11 +4267,11 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
// update the SQueryTableDataCond to create inner reader
int32_t
order
=
pCond
->
order
;
if
(
order
==
TSDB_ORDER_ASC
)
{
pCond
->
twindows
.
ekey
=
window
.
skey
;
pCond
->
twindows
.
ekey
=
window
.
skey
-
1
;
pCond
->
twindows
.
skey
=
INT64_MIN
;
pCond
->
order
=
TSDB_ORDER_DESC
;
}
else
{
pCond
->
twindows
.
skey
=
window
.
ekey
;
pCond
->
twindows
.
skey
=
window
.
ekey
+
1
;
pCond
->
twindows
.
ekey
=
INT64_MAX
;
pCond
->
order
=
TSDB_ORDER_ASC
;
}
...
...
@@ -4288,11 +4283,11 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
}
if
(
order
==
TSDB_ORDER_ASC
)
{
pCond
->
twindows
.
skey
=
window
.
ekey
;
pCond
->
twindows
.
skey
=
window
.
ekey
+
1
;
pCond
->
twindows
.
ekey
=
INT64_MAX
;
}
else
{
pCond
->
twindows
.
skey
=
INT64_MIN
;
pCond
->
twindows
.
ekey
=
window
.
ekey
;
pCond
->
twindows
.
ekey
=
window
.
ekey
-
1
;
}
pCond
->
order
=
order
;
...
...
source/libs/function/src/builtins.c
浏览文件 @
c91d8a59
...
...
@@ -480,14 +480,16 @@ static int32_t translateNowToday(SFunctionNode* pFunc, char* pErrBuf, int32_t le
return
code
;
}
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_TIMESTAMP
].
bytes
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
};
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_TIMESTAMP
].
bytes
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
};
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
translateTimePseudoColumn
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
// pseudo column do not need to check parameters
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_TIMESTAMP
].
bytes
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
};
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_TIMESTAMP
].
bytes
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
};
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -509,13 +511,11 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return
invaildFuncParaNumErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
uint8_t
para1Type
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
))
->
resType
.
type
;
if
(
!
IS_NUMERIC_TYPE
(
para1Type
))
{
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
for
(
int32_t
i
=
1
;
i
<
numOfParams
;
++
i
)
{
SValueNode
*
pValue
=
(
SValueNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
i
);
pValue
->
notReserved
=
true
;
...
...
@@ -2491,7 +2491,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"last_row"
,
.
type
=
FUNCTION_TYPE_LAST_ROW
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_MULTI_RES_FUNC
|
FUNC_MGT_SELECT_FUNC
|
FUNC_MGT_IMPLICIT_TS_FUNC
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_MULTI_RES_FUNC
|
FUNC_MGT_SELECT_FUNC
|
FUNC_MGT_IMPLICIT_TS_FUNC
|
FUNC_MGT_KEEP_ORDER_FUNC
,
.
translateFunc
=
translateFirstLast
,
.
dynDataRequiredFunc
=
lastDynDataReq
,
.
getEnvFunc
=
getFirstLastFuncEnv
,
...
...
source/libs/index/src/indexFstFile.c
浏览文件 @
c91d8a59
...
...
@@ -127,8 +127,6 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of
blk
->
blockId
=
blkId
;
blk
->
nread
=
taosPReadFile
(
ctx
->
file
.
pFile
,
blk
->
buf
,
kBlockSize
,
blkId
*
kBlockSize
);
ASSERTS
(
blk
->
nread
<=
kBlockSize
,
"index read incomplete data"
);
if
(
blk
->
nread
>
kBlockSize
)
break
;
if
(
blk
->
nread
<
kBlockSize
&&
blk
->
nread
<
len
)
{
taosMemoryFree
(
blk
);
break
;
...
...
source/libs/parser/src/parCalcConst.c
浏览文件 @
c91d8a59
...
...
@@ -337,8 +337,14 @@ static SNodeList* getChildProjection(SNode* pStmt) {
static
void
eraseSetOpChildProjection
(
SSetOperator
*
pSetOp
,
int32_t
index
)
{
SNodeList
*
pLeftProjs
=
getChildProjection
(
pSetOp
->
pLeft
);
nodesListErase
(
pLeftProjs
,
nodesListGetCell
(
pLeftProjs
,
index
));
if
(
QUERY_NODE_SET_OPERATOR
==
nodeType
(
pSetOp
->
pLeft
))
{
eraseSetOpChildProjection
((
SSetOperator
*
)
pSetOp
->
pLeft
,
index
);
}
SNodeList
*
pRightProjs
=
getChildProjection
(
pSetOp
->
pRight
);
nodesListErase
(
pRightProjs
,
nodesListGetCell
(
pRightProjs
,
index
));
if
(
QUERY_NODE_SET_OPERATOR
==
nodeType
(
pSetOp
->
pRight
))
{
eraseSetOpChildProjection
((
SSetOperator
*
)
pSetOp
->
pRight
,
index
);
}
}
typedef
struct
SNotRefByOrderByCxt
{
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
c91d8a59
...
...
@@ -755,9 +755,11 @@ static bool isPrimaryKeyImpl(SNode* pExpr) {
}
else
if
(
QUERY_NODE_FUNCTION
==
nodeType
(
pExpr
))
{
SFunctionNode
*
pFunc
=
(
SFunctionNode
*
)
pExpr
;
if
(
FUNCTION_TYPE_SELECT_VALUE
==
pFunc
->
funcType
||
FUNCTION_TYPE_GROUP_KEY
==
pFunc
->
funcType
||
FUNCTION_TYPE_FIRST
==
pFunc
->
funcType
||
FUNCTION_TYPE_LAST
==
pFunc
->
funcType
)
{
FUNCTION_TYPE_FIRST
==
pFunc
->
funcType
||
FUNCTION_TYPE_LAST
==
pFunc
->
funcType
||
FUNCTION_TYPE_LAST_ROW
==
pFunc
->
funcType
)
{
return
isPrimaryKeyImpl
(
nodesListGetNode
(
pFunc
->
pParameterList
,
0
));
}
else
if
(
FUNCTION_TYPE_WSTART
==
pFunc
->
funcType
||
FUNCTION_TYPE_WEND
==
pFunc
->
funcType
||
FUNCTION_TYPE_IROWTS
==
pFunc
->
funcType
)
{
}
else
if
(
FUNCTION_TYPE_WSTART
==
pFunc
->
funcType
||
FUNCTION_TYPE_WEND
==
pFunc
->
funcType
||
FUNCTION_TYPE_IROWTS
==
pFunc
->
funcType
)
{
return
true
;
}
}
...
...
@@ -3119,6 +3121,19 @@ static const char* getPrecisionStr(uint8_t precision) {
return
"unknown"
;
}
static
void
convertVarDuration
(
SValueNode
*
pOffset
,
uint8_t
precision
)
{
const
int64_t
factors
[
3
]
=
{
NANOSECOND_PER_MSEC
,
NANOSECOND_PER_USEC
,
1
};
const
int8_t
units
[
3
]
=
{
TIME_UNIT_MILLISECOND
,
TIME_UNIT_MICROSECOND
,
TIME_UNIT_NANOSECOND
};
if
(
pOffset
->
unit
==
'n'
)
{
pOffset
->
datum
.
i
=
pOffset
->
datum
.
i
*
31
*
(
NANOSECOND_PER_DAY
/
factors
[
precision
]);
}
else
{
pOffset
->
datum
.
i
=
pOffset
->
datum
.
i
*
365
*
(
NANOSECOND_PER_DAY
/
factors
[
precision
]);
}
pOffset
->
unit
=
units
[
precision
];
}
static
int32_t
checkIntervalWindow
(
STranslateContext
*
pCxt
,
SIntervalWindowNode
*
pInterval
)
{
uint8_t
precision
=
((
SColumnNode
*
)
pInterval
->
pCol
)
->
node
.
resType
.
precision
;
...
...
@@ -3143,6 +3158,10 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode*
getMonthsFromTimeVal
(
pInter
->
datum
.
i
,
precision
,
pInter
->
unit
)))
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG
);
}
if
(
pOffset
->
unit
==
'n'
||
pOffset
->
unit
==
'y'
)
{
convertVarDuration
(
pOffset
,
precision
);
}
}
if
(
NULL
!=
pInterval
->
pSliding
)
{
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
c91d8a59
...
...
@@ -2279,7 +2279,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
if
(
NULL
!=
cxt
.
pLastCols
)
{
cxt
.
doAgg
=
false
;
lastRowScanOptSetLastTargets
(
pScan
->
pScanCols
,
cxt
.
pLastCols
);
nodesWalkExprs
(
pScan
->
pScanPseudoCols
,
lastRowScanOptSetColDataType
,
&
cxt
);
NODES_DESTORY_LIST
(
pScan
->
pScanPseudoCols
);
lastRowScanOptSetLastTargets
(
pScan
->
node
.
pTargets
,
cxt
.
pLastCols
);
nodesClearList
(
cxt
.
pLastCols
);
}
...
...
source/libs/qcom/src/queryUtil.c
浏览文件 @
c91d8a59
...
...
@@ -194,6 +194,8 @@ char* jobTaskStatusStr(int32_t status) {
return
"EXECUTING"
;
case
JOB_TASK_STATUS_PART_SUCC
:
return
"PARTIAL_SUCCEED"
;
case
JOB_TASK_STATUS_FETCH
:
return
"FETCHING"
;
case
JOB_TASK_STATUS_SUCC
:
return
"SUCCEED"
;
case
JOB_TASK_STATUS_FAIL
:
...
...
source/libs/qworker/src/qwDbg.c
浏览文件 @
c91d8a59
...
...
@@ -259,15 +259,26 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) {
static
int32_t
ignoreTime
=
0
;
if
(
++
ignoreTime
>
10
&&
0
==
taosRand
()
%
9
)
{
if
(
ctx
->
msgType
==
TDMT_SCH_FETCH
)
{
qwBuildAndSendErrorRsp
(
TDMT_SCH_LINK_BROKEN
,
&
ctx
->
ctrlConnInfo
,
TSDB_CODE_RPC_BROKEN_LINK
);
qwBuildAndSendErrorRsp
(
ctx
->
msgType
+
1
,
&
ctx
->
dataConnInfo
,
TSDB_CODE_QRY_TASK_CTX_NOT_EXIST
);
*
rsped
=
true
;
taosSsleep
(
3
);
return
;
}
#if 0
SRpcHandleInfo *pConn =
((ctx->msgType == TDMT_SCH_FETCH || ctx->msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo
:
&
ctx
->
ctrlConnInfo
);
: &ctx->ctrlConnInfo);
qwBuildAndSendErrorRsp(ctx->msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK);
qwBuildAndSendDropMsg(QW_FPARAMS(), pConn);
*rsped = true;
return;
#endif
}
}
...
...
source/libs/scheduler/inc/schInt.h
浏览文件 @
c91d8a59
...
...
@@ -193,7 +193,7 @@ typedef struct SSchLevel {
int32_t
taskSucceed
;
int32_t
taskNum
;
int32_t
taskLaunchedNum
;
int32_t
taskDoneNum
;
int32_t
task
Exec
DoneNum
;
SArray
*
subTasks
;
// Element is SSchTask
}
SSchLevel
;
...
...
@@ -299,6 +299,7 @@ typedef struct SSchJob {
SExecResult
execRes
;
void
*
fetchRes
;
// TODO free it or not
bool
fetched
;
bool
noMoreRetry
;
int64_t
resNumOfRows
;
// from int32_t to int64_t
SSchResInfo
userRes
;
char
*
sql
;
...
...
@@ -333,13 +334,16 @@ extern SSchedulerMgmt schMgmt;
((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && \
(!SCH_IS_DATA_BIND_QRY_TASK(_task)))
#define SCH_UPDATE_REDICT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code)
#define SCH_GET_REDICT_CODE(job, _code) (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode)
#define SCH_UPDATE_REDI
RE
CT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code)
#define SCH_GET_REDI
RE
CT_CODE(job, _code) (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode)
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
#define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task))
#define SCH_TASK_ALREADY_LAUNCHED(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_EXEC)
#define SCH_TASK_EXEC_DONE(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_PART_SUCC)
#define SCH_GET_TASK_HANDLE(_task) ((_task) ? (_task)->handle : NULL)
#define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle))
...
...
@@ -361,6 +365,7 @@ extern SSchedulerMgmt schMgmt;
(SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
#define SCH_MULTI_LEVEL_LAUNCHED(_job) ((_job)->levelIdx != ((_job)->levelNum - 1))
#define SCH_SET_JOB_TYPE(_job, type) \
do { \
...
...
@@ -377,16 +382,24 @@ extern SSchedulerMgmt schMgmt;
#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED)
#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \
(SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)) || (_task)->redirectCtx.inRedirect))
#define SCH_REDIRECT_MSGTYPE(_msgType) \
((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \
(_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) \
(SCH_REDIRECT_MSGTYPE(_msgType) && \
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen))))
#define SCH_NEED_RETRY(_msgType, _code) \
((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define SCH_LOW_LEVEL_NETWORK_ERR(_job, _task, _code) \
(SCH_NETWORK_ERR(_code) && ((_task)->level->level == (_job)->levelIdx))
#define SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code) \
(SCH_NETWORK_ERR(_code) && ((_task)->level->level > (_job)->levelIdx))
#define SCH_TASK_RETRY_NETWORK_ERR(_task, _code) \
(SCH_NETWORK_ERR(_code) && (_task)->redirectCtx.inRedirect)
#define SCH_JOB_NEED_RETRY(_job, _task, _msgType, _code) \
(SCH_REDIRECT_MSGTYPE(_msgType) && SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code))
#define SCH_TASKSET_NEED_RETRY(_job, _task, _msgType, _code) \
(SCH_REDIRECT_MSGTYPE(_msgType) && \
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_LOW_LEVEL_NETWORK_ERR((_job), (_task), (_code)) || SCH_TASK_RETRY_NETWORK_ERR((_task), (_code))))
#define SCH_TASK_NEED_RETRY(_msgType, _code) \
((SCH_REDIRECT_MSGTYPE(_msgType) && SCH_NETWORK_ERR(_code)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
...
...
@@ -510,6 +523,11 @@ extern SSchedulerMgmt schMgmt;
} \
} while (0)
#define SCH_RESET_JOB_LEVEL_IDX(_job) do { \
(_job)->levelIdx = (_job)->levelNum - 1; \
SCH_JOB_DLOG("set job levelIdx to %d", (_job)->levelIdx); \
} while (0)
void
schDeregisterTaskHb
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
void
schCleanClusterHb
(
void
*
pTrans
);
int32_t
schLaunchTask
(
SSchJob
*
job
,
SSchTask
*
task
);
...
...
@@ -562,7 +580,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
int32_t
schExecJob
(
SSchJob
*
pJob
,
SSchedulerReq
*
pReq
);
int32_t
schDumpJobExecRes
(
SSchJob
*
pJob
,
SExecResult
*
pRes
);
int32_t
schUpdateTaskCandidateAddr
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SEpSet
*
pEpSet
);
int32_t
schHandle
Redirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
);
int32_t
schHandle
TaskSetRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
);
void
schProcessOnOpEnd
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
,
int32_t
errCode
);
int32_t
schProcessOnOpBegin
(
SSchJob
*
pJob
,
SCH_OP_TYPE
type
,
SSchedulerReq
*
pReq
);
void
schProcessOnCbEnd
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
);
...
...
@@ -591,6 +609,10 @@ int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode);
bool
schChkCurrentOp
(
SSchJob
*
pJob
,
int32_t
op
,
int8_t
sync
);
int32_t
schProcessFetchRsp
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
char
*
msg
,
int32_t
rspCode
);
int32_t
schProcessExplainRsp
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SExplainRsp
*
rsp
);
int32_t
schHandleJobRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pMsg
,
int32_t
rspCode
);
int32_t
schChkResetJobRetry
(
SSchJob
*
pJob
,
int32_t
rspCode
);
void
schResetTaskForRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schChkUpdateRedirectCtx
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SEpSet
*
pEpSet
,
int32_t
rspCode
);
extern
SSchDebug
gSCHDebug
;
...
...
source/libs/scheduler/src/schFlowCtrl.c
浏览文件 @
c91d8a59
...
...
@@ -282,7 +282,6 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
}
int32_t
code
=
schLaunchTasksInFlowCtrlListImpl
(
pJob
,
ctrl
);
;
SCH_ERR_RET
(
code
);
return
code
;
// to avoid compiler error
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
c91d8a59
...
...
@@ -83,6 +83,10 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
oriStatus
=
SCH_GET_JOB_STATUS
(
pJob
);
if
(
oriStatus
==
newStatus
)
{
if
(
JOB_TASK_STATUS_FETCH
==
newStatus
)
{
return
code
;
}
SCH_ERR_JRET
(
TSDB_CODE_SCH_IGNORE_ERROR
);
}
...
...
@@ -108,10 +112,19 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break
;
case
JOB_TASK_STATUS_PART_SUCC
:
if
(
newStatus
!=
JOB_TASK_STATUS_FAIL
&&
newStatus
!=
JOB_TASK_STATUS_SUCC
&&
newStatus
!=
JOB_TASK_STATUS_DROP
&&
newStatus
!=
JOB_TASK_STATUS_EXEC
)
{
newStatus
!=
JOB_TASK_STATUS_DROP
&&
newStatus
!=
JOB_TASK_STATUS_EXEC
&&
newStatus
!=
JOB_TASK_STATUS_FETCH
)
{
SCH_ERR_JRET
(
TSDB_CODE_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_FETCH
:
if
(
newStatus
!=
JOB_TASK_STATUS_FAIL
&&
newStatus
!=
JOB_TASK_STATUS_SUCC
&&
newStatus
!=
JOB_TASK_STATUS_DROP
&&
newStatus
!=
JOB_TASK_STATUS_EXEC
&&
newStatus
!=
JOB_TASK_STATUS_FETCH
)
{
SCH_ERR_JRET
(
TSDB_CODE_APP_ERROR
);
}
break
;
case
JOB_TASK_STATUS_SUCC
:
case
JOB_TASK_STATUS_FAIL
:
...
...
@@ -288,7 +301,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
}
pJob
->
levelNum
=
levelNum
;
pJob
->
levelIdx
=
levelNum
-
1
;
SCH_RESET_JOB_LEVEL_IDX
(
pJob
)
;
SSchLevel
level
=
{
0
};
SNodeListNode
*
plans
=
NULL
;
...
...
@@ -550,9 +563,9 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
}
SSchLevel
*
pLevel
=
pTask
->
level
;
int32_t
doneNum
=
atomic_add_fetch_32
(
&
pLevel
->
taskDoneNum
,
1
);
int32_t
doneNum
=
atomic_add_fetch_32
(
&
pLevel
->
task
Exec
DoneNum
,
1
);
if
(
doneNum
==
pLevel
->
taskNum
)
{
pJob
->
levelIdx
--
;
atomic_sub_fetch_32
(
&
pJob
->
levelIdx
,
1
)
;
pLevel
=
taosArrayGet
(
pJob
->
levels
,
pJob
->
levelIdx
);
for
(
int32_t
i
=
0
;
i
<
pLevel
->
taskNum
;
++
i
)
{
...
...
@@ -562,6 +575,10 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
continue
;
}
if
(
SCH_TASK_ALREADY_LAUNCHED
(
pTask
))
{
continue
;
}
SCH_ERR_RET
(
schLaunchTask
(
pJob
,
pTask
));
}
}
...
...
@@ -811,6 +828,75 @@ void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode) {
}
}
int32_t
schChkResetJobRetry
(
SSchJob
*
pJob
,
int32_t
rspCode
)
{
if
(
pJob
->
status
>=
JOB_TASK_STATUS_PART_SUCC
)
{
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
if
(
pJob
->
fetched
)
{
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
pJob
->
noMoreRetry
=
true
;
SCH_JOB_ELOG
(
"already fetched while got error %s"
,
tstrerror
(
rspCode
));
SCH_ERR_RET
(
rspCode
);
}
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
schUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_EXEC
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schResetJobForRetry
(
SSchJob
*
pJob
,
int32_t
rspCode
)
{
SCH_ERR_RET
(
schChkResetJobRetry
(
pJob
,
rspCode
));
int32_t
numOfLevels
=
taosArrayGetSize
(
pJob
->
levels
);
for
(
int32_t
i
=
0
;
i
<
numOfLevels
;
++
i
)
{
SSchLevel
*
pLevel
=
taosArrayGet
(
pJob
->
levels
,
i
);
pLevel
->
taskExecDoneNum
=
0
;
pLevel
->
taskLaunchedNum
=
0
;
int32_t
numOfTasks
=
taosArrayGetSize
(
pLevel
->
subTasks
);
for
(
int32_t
j
=
0
;
j
<
numOfTasks
;
++
j
)
{
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
j
);
SCH_LOCK_TASK
(
pTask
);
SCH_ERR_RET
(
schChkUpdateRedirectCtx
(
pJob
,
pTask
,
NULL
,
rspCode
));
qClearSubplanExecutionNode
(
pTask
->
plan
);
schResetTaskForRetry
(
pJob
,
pTask
);
SCH_UNLOCK_TASK
(
pTask
);
}
}
SCH_RESET_JOB_LEVEL_IDX
(
pJob
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
schHandleJobRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pMsg
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
taosMemoryFreeClear
(
pMsg
->
pData
);
taosMemoryFreeClear
(
pMsg
->
pEpSet
);
SCH_UNLOCK_TASK
(
pTask
);
SCH_TASK_DLOG
(
"start to redirect all job tasks cause of error: %s"
,
tstrerror
(
rspCode
));
SCH_ERR_JRET
(
schResetJobForRetry
(
pJob
,
rspCode
));
SCH_ERR_JRET
(
schLaunchJob
(
pJob
));
SCH_LOCK_TASK
(
pTask
);
SCH_RET
(
code
);
_return:
SCH_LOCK_TASK
(
pTask
);
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
));
}
bool
schChkCurrentOp
(
SSchJob
*
pJob
,
int32_t
op
,
int8_t
sync
)
{
bool
r
=
false
;
SCH_LOCK
(
SCH_READ
,
&
pJob
->
opStatus
.
lock
);
...
...
@@ -907,7 +993,7 @@ int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq
SCH_ERR_RET
(
TSDB_CODE_APP_ERROR
);
}
if
(
status
!=
JOB_TASK_STATUS_PART_SUCC
)
{
if
(
status
!=
JOB_TASK_STATUS_PART_SUCC
&&
status
!=
JOB_TASK_STATUS_FETCH
)
{
SCH_JOB_ELOG
(
"job status error for fetch, status:%s"
,
jobTaskStatusStr
(
status
));
SCH_ERR_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
c91d8a59
...
...
@@ -36,7 +36,7 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
TMSG_INFO
(
msgType
));
SCH_ERR_RET
(
TSDB_CODE_QW_MSG_ERROR
);
}
if
(
taskStatus
!=
JOB_TASK_STATUS_
PART_SUCC
)
{
if
(
taskStatus
!=
JOB_TASK_STATUS_
FETCH
)
{
SCH_TASK_ELOG
(
"rsp msg conflicted with task status, status:%s, rspType:%s"
,
jobTaskStatusStr
(
taskStatus
),
TMSG_INFO
(
msgType
));
SCH_ERR_RET
(
TSDB_CODE_QW_MSG_ERROR
);
...
...
@@ -137,25 +137,12 @@ int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) {
return
TSDB_CODE_SUCCESS
;
}
// Note: no more task error processing, handled in function internal
int32_t
schHandleResponseMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
execId
,
SDataBuf
*
pMsg
,
int32_t
rspCode
)
{
int32_t
schProcessResponseMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
execId
,
SDataBuf
*
pMsg
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
char
*
msg
=
pMsg
->
pData
;
int32_t
msgSize
=
pMsg
->
len
;
int32_t
msgType
=
pMsg
->
msgType
;
bool
dropExecNode
=
(
msgType
==
TDMT_SCH_LINK_BROKEN
||
SCH_NETWORK_ERR
(
rspCode
));
if
(
SCH_IS_QUERY_JOB
(
pJob
))
{
SCH_ERR_JRET
(
schUpdateTaskHandle
(
pJob
,
pTask
,
dropExecNode
,
pMsg
->
handle
,
execId
));
}
SCH_ERR_JRET
(
schValidateRspMsgType
(
pJob
,
pTask
,
msgType
));
int32_t
reqType
=
IsReq
(
pMsg
)
?
pMsg
->
msgType
:
(
pMsg
->
msgType
-
1
);
if
(
SCH_TASK_NEED_REDIRECT
(
pTask
,
reqType
,
rspCode
,
pMsg
->
len
))
{
SCH_RET
(
schHandleRedirect
(
pJob
,
pTask
,
(
SDataBuf
*
)
pMsg
,
rspCode
));
}
pTask
->
redirectCtx
.
inRedirect
=
false
;
switch
(
msgType
)
{
...
...
@@ -423,6 +410,38 @@ _return:
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
));
}
// Note: no more task error processing, handled in function internal
int32_t
schHandleResponseMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
execId
,
SDataBuf
*
pMsg
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
int32_t
msgType
=
pMsg
->
msgType
;
char
*
msg
=
pMsg
->
pData
;
bool
dropExecNode
=
(
msgType
==
TDMT_SCH_LINK_BROKEN
||
SCH_NETWORK_ERR
(
rspCode
));
if
(
SCH_IS_QUERY_JOB
(
pJob
))
{
SCH_ERR_JRET
(
schUpdateTaskHandle
(
pJob
,
pTask
,
dropExecNode
,
pMsg
->
handle
,
execId
));
}
SCH_ERR_JRET
(
schValidateRspMsgType
(
pJob
,
pTask
,
msgType
));
int32_t
reqType
=
IsReq
(
pMsg
)
?
pMsg
->
msgType
:
(
pMsg
->
msgType
-
1
);
if
(
SCH_JOB_NEED_RETRY
(
pJob
,
pTask
,
reqType
,
rspCode
))
{
SCH_RET
(
schHandleJobRetry
(
pJob
,
pTask
,
(
SDataBuf
*
)
pMsg
,
rspCode
));
}
else
if
(
SCH_TASKSET_NEED_RETRY
(
pJob
,
pTask
,
reqType
,
rspCode
))
{
SCH_RET
(
schHandleTaskSetRetry
(
pJob
,
pTask
,
(
SDataBuf
*
)
pMsg
,
rspCode
));
}
pTask
->
redirectCtx
.
inRedirect
=
false
;
SCH_RET
(
schProcessResponseMsg
(
pJob
,
pTask
,
execId
,
pMsg
,
rspCode
));
_return:
taosMemoryFreeClear
(
msg
);
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
));
}
int32_t
schHandleCallback
(
void
*
param
,
SDataBuf
*
pMsg
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
SSchTaskCallbackParam
*
pParam
=
(
SSchTaskCallbackParam
*
)
param
;
...
...
source/libs/scheduler/src/schStatus.c
浏览文件 @
c91d8a59
...
...
@@ -34,6 +34,9 @@ int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) {
case
JOB_TASK_STATUS_PART_SUCC
:
SCH_ERR_JRET
(
schProcessOnJobPartialSuccess
(
pJob
));
break
;
case
JOB_TASK_STATUS_FETCH
:
SCH_ERR_JRET
(
schJobFetchRows
(
pJob
));
break
;
case
JOB_TASK_STATUS_SUCC
:
break
;
case
JOB_TASK_STATUS_FAIL
:
...
...
source/libs/scheduler/src/schTask.c
浏览文件 @
c91d8a59
...
...
@@ -378,7 +378,8 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet,
if
(
lastTime
>
tsMaxRetryWaitTime
)
{
SCH_TASK_DLOG
(
"task no more redirect retry since timeout, now:%"
PRId64
", start:%"
PRId64
", max:%d, total:%d"
,
nowTs
,
pCtx
->
startTs
,
tsMaxRetryWaitTime
,
pCtx
->
totalTimes
);
SCH_ERR_RET
(
SCH_GET_REDICT_CODE
(
pJob
,
rspCode
));
pJob
->
noMoreRetry
=
true
;
SCH_ERR_RET
(
SCH_GET_REDIRECT_CODE
(
pJob
,
rspCode
));
}
pCtx
->
periodMs
*=
tsRedirectFactor
;
...
...
@@ -404,32 +405,35 @@ _return:
return
TSDB_CODE_SUCCESS
;
}
int32_t
schDoTaskRedirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
SCH_TASK_DLOG
(
"task will be redirected now, status:%s, code:%s"
,
SCH_GET_TASK_STATUS_STR
(
pTask
),
tstrerror
(
rspCode
));
if
(
NULL
==
pData
)
{
pTask
->
retryTimes
=
0
;
}
if
(
!
NO_RET_REDIRECT_ERROR
(
rspCode
))
{
SCH_UPDATE_REDICT_CODE
(
pJob
,
rspCode
);
}
SCH_ERR_JRET
(
schChkUpdateRedirectCtx
(
pJob
,
pTask
,
pData
?
pData
->
pEpSet
:
NULL
,
rspCode
));
void
schResetTaskForRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
pTask
->
waitRetry
=
true
;
schDropTaskOnExecNode
(
pJob
,
pTask
);
if
(
pTask
->
delayTimer
)
{
taosTmrStopA
(
&
pTask
->
delayTimer
);
}
taosHashClear
(
pTask
->
execNodes
);
schRemoveTaskFromExecList
(
pJob
,
pTask
);
schDeregisterTaskHb
(
pJob
,
pTask
);
atomic_sub_fetch_32
(
&
pTask
->
level
->
taskLaunchedNum
,
1
);
taosMemoryFreeClear
(
pTask
->
msg
);
pTask
->
msgLen
=
0
;
pTask
->
lastMsgType
=
0
;
pTask
->
childReady
=
0
;
memset
(
&
pTask
->
succeedAddr
,
0
,
sizeof
(
pTask
->
succeedAddr
));
}
int32_t
schDoTaskRedirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
SCH_TASK_DLOG
(
"task will be redirected now, status:%s, code:%s"
,
SCH_GET_TASK_STATUS_STR
(
pTask
),
tstrerror
(
rspCode
));
if
(
!
NO_RET_REDIRECT_ERROR
(
rspCode
))
{
SCH_UPDATE_REDIRECT_CODE
(
pJob
,
rspCode
);
}
SCH_ERR_JRET
(
schChkUpdateRedirectCtx
(
pJob
,
pTask
,
pData
?
pData
->
pEpSet
:
NULL
,
rspCode
));
schResetTaskForRetry
(
pJob
,
pTask
);
if
(
SCH_IS_DATA_BIND_TASK
(
pTask
))
{
if
(
pData
&&
pData
->
pEpSet
)
{
...
...
@@ -445,12 +449,6 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
SCH_TASK_DLOG
(
"switch task target node %d epset to %d/%d"
,
addr
->
nodeId
,
addr
->
epSet
.
inUse
,
addr
->
epSet
.
numOfEps
);
}
if
(
SCH_TASK_NEED_FLOW_CTRL
(
pJob
,
pTask
))
{
if
(
JOB_TASK_STATUS_EXEC
==
SCH_GET_TASK_STATUS
(
pTask
))
{
SCH_ERR_JRET
(
schLaunchTasksInFlowCtrlList
(
pJob
,
pTask
));
}
}
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_INIT
);
SCH_ERR_JRET
(
schDelayLaunchTask
(
pJob
,
pTask
));
...
...
@@ -486,20 +484,10 @@ _return:
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
));
}
int32_t
schHandle
Redirect
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
)
{
int32_t
schHandle
TaskSetRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SDataBuf
*
pData
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
if
(
JOB_TASK_STATUS_PART_SUCC
==
pJob
->
status
)
{
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
if
(
pJob
->
fetched
)
{
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
SCH_TASK_ELOG
(
"already fetched while got error %s"
,
tstrerror
(
rspCode
));
SCH_ERR_JRET
(
rspCode
);
}
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
schUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_EXEC
);
}
SCH_ERR_JRET
(
schChkResetJobRetry
(
pJob
,
rspCode
));
if
(
SYNC_OTHER_LEADER_REDIRECT_ERROR
(
rspCode
))
{
if
(
NULL
==
pData
->
pEpSet
)
{
...
...
@@ -509,7 +497,19 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
}
}
SCH_TASK_DLOG
(
"start to redirect current task set cause of error: %s"
,
tstrerror
(
rspCode
));
for
(
int32_t
i
=
0
;
i
<
pJob
->
levelNum
;
++
i
)
{
SSchLevel
*
pLevel
=
taosArrayGet
(
pJob
->
levels
,
i
);
pLevel
->
taskExecDoneNum
=
0
;
pLevel
->
taskLaunchedNum
=
0
;
}
SCH_RESET_JOB_LEVEL_IDX
(
pJob
);
code
=
schDoTaskRedirect
(
pJob
,
pTask
,
pData
,
rspCode
);
taosMemoryFreeClear
(
pData
->
pData
);
taosMemoryFreeClear
(
pData
->
pEpSet
);
...
...
@@ -621,6 +621,13 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
*/
int32_t
schTaskCheckSetRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
,
bool
*
needRetry
)
{
if
(
pJob
->
noMoreRetry
)
{
*
needRetry
=
false
;
SCH_TASK_DLOG
(
"task no more retry since job no more retry, retryTimes:%d/%d"
,
pTask
->
retryTimes
,
pTask
->
maxRetryTimes
);
return
TSDB_CODE_SUCCESS
;
}
if
(
TSDB_CODE_SCH_TIMEOUT_ERROR
==
errCode
)
{
pTask
->
maxExecTimes
++
;
pTask
->
maxRetryTimes
++
;
...
...
@@ -645,7 +652,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return
TSDB_CODE_SUCCESS
;
}
if
(
!
SCH_NEED_RETRY
(
pTask
->
lastMsgType
,
errCode
))
{
if
(
!
SCH_
TASK_
NEED_RETRY
(
pTask
->
lastMsgType
,
errCode
))
{
*
needRetry
=
false
;
SCH_TASK_DLOG
(
"task no more retry cause of errCode, errCode:%x - %s"
,
errCode
,
tstrerror
(
errCode
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1067,7 +1074,6 @@ int32_t schLaunchTaskImpl(void *param) {
SCH_ERR_JRET
(
TSDB_CODE_SCH_IGNORE_ERROR
);
}
// NOTE: race condition: the task should be put into the hash table before send msg to server
if
(
SCH_GET_TASK_STATUS
(
pTask
)
!=
JOB_TASK_STATUS_EXEC
)
{
SCH_ERR_JRET
(
schPushTaskToExecList
(
pJob
,
pTask
));
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_EXEC
);
...
...
@@ -1272,6 +1278,8 @@ int32_t schLaunchFetchTask(SSchJob *pJob) {
return
TSDB_CODE_SUCCESS
;
}
SCH_SET_TASK_STATUS
(
pJob
->
fetchTask
,
JOB_TASK_STATUS_FETCH
);
if
(
SCH_IS_LOCAL_EXEC_TASK
(
pJob
,
pJob
->
fetchTask
))
{
SCH_ERR_JRET
(
schExecLocalFetch
(
pJob
,
pJob
->
fetchTask
));
}
else
{
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
c91d8a59
...
...
@@ -91,7 +91,7 @@ int32_t schedulerFetchRows(int64_t jobId, SSchedulerReq *pReq) {
SCH_ERR_JRET
(
schHandleOpBeginEvent
(
jobId
,
&
pJob
,
SCH_OP_FETCH
,
pReq
));
SCH_ERR_JRET
(
sch
JobFetchRows
(
pJob
));
SCH_ERR_JRET
(
sch
SwitchJobStatus
(
pJob
,
JOB_TASK_STATUS_FETCH
,
pReq
));
_return:
...
...
source/libs/transport/src/transCli.c
浏览文件 @
c91d8a59
...
...
@@ -462,6 +462,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
if
(
transQueueEmpty
(
&
pConn
->
cliMsgs
))
{
if
(
pConn
->
broken
==
true
&&
CONN_NO_PERSIST_BY_APP
(
pConn
))
{
tTrace
(
"%s conn %p handle except, persist:0"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
if
(
T_REF_VAL_GET
(
pConn
)
>
1
)
transUnrefCliHandle
(
pConn
);
transUnrefCliHandle
(
pConn
);
return
;
}
...
...
@@ -521,6 +522,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
destroyCmsg
(
pMsg
);
tTrace
(
"%s conn %p start to destroy, ref:%d"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
T_REF_VAL_GET
(
pConn
));
}
while
(
!
transQueueEmpty
(
&
pConn
->
cliMsgs
));
if
(
T_REF_VAL_GET
(
pConn
)
>
1
)
transUnrefCliHandle
(
pConn
);
transUnrefCliHandle
(
pConn
);
}
void
cliHandleExcept
(
SCliConn
*
conn
)
{
...
...
tests/script/api/batchprepare.c
浏览文件 @
c91d8a59
...
...
@@ -122,9 +122,11 @@ int insertAUTOTest2(TAOS_STMT *stmt, TAOS *taos);
int
insertAUTOTest3
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
);
int
queryColumnTest
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
);
int
queryMiscTest
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
);
int
insertNonExistsTb
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
);
enum
{
TTYPE_INSERT
=
1
,
TTYPE_INSERT_NG
,
TTYPE_QUERY
,
};
...
...
@@ -187,6 +189,8 @@ CaseCfg gCase[] = {
{
"query:SUBT-COLUMN"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_QUERY
,
0
,
false
,
false
,
queryColumnTest
,
10
,
10
,
1
,
3
,
0
,
0
,
1
,
2
},
{
"query:SUBT-MISC"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_QUERY
,
0
,
false
,
false
,
queryMiscTest
,
10
,
10
,
1
,
3
,
0
,
0
,
1
,
2
},
{
"query:NG-TBNEXISTS"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT_NG
,
0
,
false
,
false
,
insertNonExistsTb
,
10
,
10
,
1
,
3
,
0
,
0
,
1
,
-
1
},
// {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2},
// {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, false, queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2},
...
...
@@ -250,7 +254,7 @@ CaseCtrl gCaseCtrl = {
.funcIdxList = NULL,
.checkParamNum = false,
.runTimes = 0,
.caseIdx = 2
4
,
.caseIdx = 2
6
,
.caseNum = 1,
.caseRunIdx = -1,
.caseRunNum = -1,
...
...
@@ -2191,6 +2195,47 @@ int queryMiscTest(TAOS_STMT *stmt, TAOS *taos) {
}
int
insertNonExistsTb
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
)
{
BindData
data
=
{
0
};
prepareInsertData
(
&
data
);
int
code
=
taos_stmt_prepare
(
stmt
,
data
.
sql
,
0
);
if
(
code
!=
0
){
printf
(
"!!!failed to execute taos_stmt_prepare. error:%s
\n
"
,
taos_stmt_errstr
(
stmt
));
exit
(
1
);
}
bpCheckIsInsert
(
stmt
,
1
);
char
*
buf
=
"tbnexist"
;
code
=
bpSetTableNameTags
(
&
data
,
0
,
buf
,
stmt
);
if
(
code
==
0
){
printf
(
"!!!taos_stmt_set_tbname expected error not occurred
\n
"
);
exit
(
1
);
}
if
(
0
==
taos_stmt_bind_param_batch
(
stmt
,
data
.
pBind
))
{
printf
(
"!!!taos_stmt_bind_param_batch expected error not occurred
\n
"
);
exit
(
1
);
}
if
(
0
==
taos_stmt_add_batch
(
stmt
))
{
printf
(
"!!!taos_stmt_add_batch expected error not occurred
\n
"
);
exit
(
1
);
}
if
(
0
==
taos_stmt_execute
(
stmt
))
{
printf
(
"!!!taos_stmt_execute expected error not occurred
\n
"
);
exit
(
1
);
}
destroyData
(
&
data
);
return
0
;
}
int
errorSQLTest1
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
)
{
BindData
data
=
{
0
};
...
...
@@ -2213,6 +2258,10 @@ int errorSQLTest1(TAOS_STMT *stmt, TAOS *taos) {
}
void
prepareCheckResultImpl
(
TAOS
*
taos
,
char
*
tname
,
bool
printr
,
int
expected
,
bool
silent
)
{
if
(
TTYPE_INSERT_NG
==
gCurCase
->
testType
)
{
return
;
}
char
sql
[
255
]
=
"SELECT * FROM "
;
int32_t
rows
=
0
;
...
...
tests/script/tsim/query/interval-offset.sim
浏览文件 @
c91d8a59
...
...
@@ -212,10 +212,10 @@ print ===> rows2: $data20 $data21 $data22 $data23 $data24
if $rows != 3 then
return -1
endi
if $data01 !=
2
then
if $data01 !=
4
then
return -1
endi
if $data04 !=
2
then
if $data04 !=
4
then
return -1
endi
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录