Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
dad8b812
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
dad8b812
编写于
3月 12, 2022
作者:
D
dapan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/scheduler
上级
260aad80
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
228 addition
and
51 deletion
+228
-51
include/common/tmsg.h
include/common/tmsg.h
+5
-2
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+2
-0
include/libs/qcom/query.h
include/libs/qcom/query.h
+5
-2
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+32
-12
source/common/src/tmsg.c
source/common/src/tmsg.c
+54
-0
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+13
-2
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+47
-18
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+5
-1
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+1
-1
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+64
-13
未找到文件。
include/common/tmsg.h
浏览文件 @
dad8b812
...
...
@@ -1375,13 +1375,16 @@ typedef struct {
SArray
*
pArray
;
}
SVCreateTbBatchReq
;
int32_t
tSerializeSVCreateTbBatchReq
(
void
**
buf
,
SVCreateTbBatchReq
*
pReq
);
void
*
tDeserializeSVCreateTbBatchReq
(
void
*
buf
,
SVCreateTbBatchReq
*
pReq
);
typedef
struct
{
SArray
*
rspList
;
// SArray<SVCreateTbRsp>
}
SVCreateTbBatchRsp
;
int32_t
tSerializeSVCreateTbBatchReq
(
void
**
buf
,
SVCreateTbBatchReq
*
pReq
);
void
*
tDeserializeSVCreateTbBatchReq
(
void
*
buf
,
SVCreateTbBatchReq
*
pReq
);
int32_t
tSerializeSVCreateTbBatchRsp
(
void
*
buf
,
int32_t
bufLen
,
SVCreateTbBatchRsp
*
pRsp
);
int32_t
tDeserializeSVCreateTbBatchRsp
(
void
*
buf
,
int32_t
bufLen
,
SVCreateTbBatchRsp
*
pRsp
);
typedef
struct
{
int64_t
ver
;
...
...
include/libs/catalog/catalog.h
浏览文件 @
dad8b812
...
...
@@ -112,6 +112,8 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t d
int32_t
catalogRemoveDB
(
SCatalog
*
pCatalog
,
const
char
*
dbName
,
uint64_t
dbId
);
int32_t
catalogRemoveTableMeta
(
SCatalog
*
pCtg
,
SName
*
pTableName
);
int32_t
catalogRemoveStbMeta
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
uint64_t
dbId
,
const
char
*
stbName
,
uint64_t
suid
);
/**
...
...
include/libs/qcom/query.h
浏览文件 @
dad8b812
...
...
@@ -181,8 +181,11 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
#define IS_CLIENT_RETRY_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH)
#define IS_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT)
#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_INVALID_TABLE_ID || (_code) == TSDB_CODE_VND_TB_NOT_EXIST)
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH)
#define NEED_CLIENT_HANDLE_ERROR(_code) (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code))
#define NEED_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT)
#define qFatal(...) \
do { \
...
...
source/client/src/clientImpl.c
浏览文件 @
dad8b812
...
...
@@ -299,9 +299,10 @@ int32_t clientProcessErrorList(SArray **pList) {
SRequestObj
*
execQuery
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
)
{
SRequestObj
*
pRequest
=
NULL
;
int32_t
code
=
0
;
bool
quit
=
false
;
int32_t
needRetryNum
=
0
;
int32_t
needRetryFailNum
=
0
;
while
(
!
quit
)
{
while
(
true
)
{
pRequest
=
execQueryImpl
(
pTscObj
,
sql
,
sqlLen
);
if
(
TSDB_CODE_SUCCESS
==
pRequest
->
code
||
NULL
==
pRequest
->
errList
)
{
break
;
...
...
@@ -315,30 +316,49 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
int32_t
errNum
=
(
int32_t
)
taosArrayGetSize
(
pRequest
->
errList
);
for
(
int32_t
i
=
0
;
i
<
errNum
;
++
i
)
{
SQueryErrorInfo
*
errInfo
=
taosArrayGet
(
pRequest
->
errList
,
i
);
int32_t
tcode
=
0
;
if
(
TSDB_CODE_VND_HASH_MISMATCH
==
errInfo
->
code
)
{
if
(
NEED_CLIENT_REFRESH_VG_ERROR
(
errInfo
->
code
))
{
++
needRetryNum
;
SCatalog
*
pCatalog
=
NULL
;
code
=
catalogGetHandle
(
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
quit
=
true
;
break
;
tcode
=
catalogGetHandle
(
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
tcode
!=
TSDB_CODE_SUCCESS
)
{
++
needRetryFailNum
;
code
=
tcode
;
continue
;
}
SEpSet
epset
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
);
char
dbFName
[
TSDB_DB_FNAME_LEN
];
tNameGetFullDbName
(
&
errInfo
->
tableName
,
dbFName
);
code
=
catalogRefreshDBVgInfo
(
pCatalog
,
pTscObj
->
pAppInfo
->
pTransporter
,
&
epset
,
dbFName
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
quit
=
true
;
break
;
tcode
=
catalogRefreshDBVgInfo
(
pCatalog
,
pTscObj
->
pAppInfo
->
pTransporter
,
&
epset
,
dbFName
);
if
(
tcode
!=
TSDB_CODE_SUCCESS
)
{
++
needRetryFailNum
;
code
=
tcode
;
continue
;
}
}
else
if
(
NEED_CLIENT_RM_TBLMETA_ERROR
(
errInfo
->
code
))
{
SCatalog
*
pCatalog
=
NULL
;
tcode
=
catalogGetHandle
(
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
tcode
!=
TSDB_CODE_SUCCESS
)
{
code
=
tcode
;
continue
;
}
catalogRemoveTableMeta
(
pCatalog
,
&
errInfo
->
tableName
);
}
}
if
(
!
quit
)
{
if
((
needRetryNum
&&
(
0
==
needRetryFailNum
)
&&
(
TDMT_VND_SUBMIT
!=
pRequest
->
type
&&
TDMT_VND_CREATE_TABLE
!=
pRequest
->
type
))
||
(
needRetryNum
&&
(
needRetryNum
>
needRetryFailNum
)
&&
(
TDMT_VND_SUBMIT
==
pRequest
->
type
&&
TDMT_VND_CREATE_TABLE
==
pRequest
->
type
)))
{
destroyRequest
(
pRequest
);
continue
;
}
break
;
}
if
(
code
)
{
...
...
source/common/src/tmsg.c
浏览文件 @
dad8b812
...
...
@@ -2656,6 +2656,60 @@ int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pR
return
0
;
}
int32_t
tSerializeSVCreateTbBatchRsp
(
void
*
buf
,
int32_t
bufLen
,
SVCreateTbBatchRsp
*
pRsp
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
pRsp
->
rspList
)
{
int32_t
num
=
taosArrayGetSize
(
pRsp
->
rspList
);
if
(
tEncodeI32
(
&
encoder
,
num
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SVCreateTbRsp
*
rsp
=
taosArrayGet
(
pRsp
->
rspList
,
i
);
if
(
tEncodeI32
(
&
encoder
,
rsp
->
code
)
<
0
)
return
-
1
;
if
(
tEncodeU8
(
&
encoder
,
rsp
->
tableName
.
type
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
rsp
->
tableName
.
acctId
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
rsp
->
tableName
.
dbname
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
rsp
->
tableName
.
tname
)
<
0
)
return
-
1
;
}
}
else
{
if
(
tEncodeI32
(
&
encoder
,
0
)
<
0
)
return
-
1
;
}
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tCoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSVCreateTbBatchRsp
(
void
*
buf
,
int32_t
bufLen
,
SVCreateTbBatchRsp
*
pRsp
)
{
SCoder
decoder
=
{
0
};
int32_t
num
=
0
;
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_DECODER
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
num
)
<
0
)
return
-
1
;
if
(
num
>
0
)
{
pRsp
->
rspList
=
taosArrayInit
(
num
,
sizeof
(
SVCreateTbRsp
));
if
(
NULL
==
pRsp
->
rspList
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SVCreateTbRsp
rsp
=
{
0
};
if
(
tDecodeI32
(
&
decoder
,
&
rsp
.
code
)
<
0
)
return
-
1
;
if
(
tDecodeU8
(
&
decoder
,
&
rsp
.
tableName
.
type
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
rsp
.
tableName
.
acctId
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
rsp
.
tableName
.
dbname
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
rsp
.
tableName
.
tname
)
<
0
)
return
-
1
;
if
(
NULL
==
taosArrayPush
(
pRsp
->
rspList
,
&
rsp
))
return
-
1
;
}
}
else
{
pRsp
->
rspList
=
NULL
;
}
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSVCreateTSmaReq
(
void
**
buf
,
SVCreateTSmaReq
*
pReq
)
{
int32_t
tlen
=
0
;
...
...
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
dad8b812
...
...
@@ -87,7 +87,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
#if 0
char tableFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&pCreateTbReq->name, tableFName);
#endif
int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName);
if (code) {
...
...
@@ -107,6 +106,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
taosArrayPush(vCreateTbBatchRsp.rspList, &rsp);
}
#endif
if
(
metaCreateTable
(
pVnode
->
pMeta
,
pCreateTbReq
)
<
0
)
{
// TODO: handle error
vError
(
"vgId:%d, failed to create table: %s"
,
pVnode
->
vgId
,
pCreateTbReq
->
name
);
...
...
@@ -125,7 +126,17 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vTrace
(
"vgId:%d process create %"
PRIzu
" tables"
,
pVnode
->
vgId
,
taosArrayGetSize
(
vCreateTbBatchReq
.
pArray
));
taosArrayDestroy
(
vCreateTbBatchReq
.
pArray
);
if
(
vCreateTbBatchRsp
.
rspList
)
{
int32_t
contLen
=
tSerializeSVCreateTbBatchRsp
(
NULL
,
0
,
&
vCreateTbBatchRsp
);
void
*
msg
=
rpcMallocCont
(
contLen
);
tSerializeSVCreateTbBatchRsp
(
msg
,
contLen
,
&
vCreateTbBatchRsp
);
taosArrayDestroy
(
vCreateTbBatchRsp
.
rspList
);
*
pRsp
=
calloc
(
1
,
sizeof
(
SRpcMsg
));
(
*
pRsp
)
->
msgType
=
TDMT_VND_CREATE_TABLE_RSP
;
(
*
pRsp
)
->
pCont
=
msg
;
(
*
pRsp
)
->
contLen
=
contLen
;
(
*
pRsp
)
->
handle
=
pMsg
->
handle
;
(
*
pRsp
)
->
ahandle
=
pMsg
->
ahandle
;
}
break
;
}
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
dad8b812
...
...
@@ -851,18 +851,11 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetTableTypeFromCache
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
int32_t
*
tbType
,
int32_t
flag
)
{
int32_t
ctgGetTableTypeFromCache
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
const
char
*
tableName
,
int32_t
*
tbType
)
{
if
(
NULL
==
pCtg
->
dbCache
)
{
ctgWarn
(
"empty db cache,
tbName:%s"
,
pTableName
->
tn
ame
);
ctgWarn
(
"empty db cache,
dbFName:%s, tbName:%s"
,
dbFName
,
tableN
ame
);
return
TSDB_CODE_SUCCESS
;
}
char
dbFName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
if
(
CTG_FLAG_IS_INF_DB
(
flag
))
{
strcpy
(
dbFName
,
pTableName
->
dbname
);
}
else
{
tNameGetFullDbName
(
pTableName
,
dbFName
);
}
SCtgDBCache
*
dbCache
=
NULL
;
ctgAcquireDBCache
(
pCtg
,
dbFName
,
&
dbCache
);
...
...
@@ -871,11 +864,11 @@ int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_
}
CTG_LOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
STableMeta
*
pTableMeta
=
(
STableMeta
*
)
taosHashAcquire
(
dbCache
->
tbCache
.
metaCache
,
pTableName
->
tname
,
strlen
(
pTableName
->
tn
ame
));
STableMeta
*
pTableMeta
=
(
STableMeta
*
)
taosHashAcquire
(
dbCache
->
tbCache
.
metaCache
,
tableName
,
strlen
(
tableN
ame
));
if
(
NULL
==
pTableMeta
)
{
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
ctgWarn
(
"tbl not in cache, dbFName:%s, tbName:%s"
,
dbFName
,
pTableName
->
tn
ame
);
ctgWarn
(
"tbl not in cache, dbFName:%s, tbName:%s"
,
dbFName
,
tableN
ame
);
ctgReleaseDBCache
(
pCtg
,
dbCache
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -889,7 +882,7 @@ int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_
ctgReleaseDBCache
(
pCtg
,
dbCache
);
ctgDebug
(
"Got tbtype from cache, dbFName:%s, tbName:%s, type:%d"
,
dbFName
,
pTableName
->
tn
ame
,
*
tbType
);
ctgDebug
(
"Got tbtype from cache, dbFName:%s, tbName:%s, type:%d"
,
dbFName
,
tableN
ame
,
*
tbType
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -2074,24 +2067,19 @@ int32_t ctgActRemoveStb(SCtgMetaAction *action) {
return
TSDB_CODE_SUCCESS
;
}
if
(
dbCache
->
dbId
!=
msg
->
dbId
)
{
if
(
msg
->
dbId
&&
(
dbCache
->
dbId
!=
msg
->
dbId
)
)
{
ctgDebug
(
"dbId already modified, dbFName:%s, current:%"
PRIx64
", dbId:%"
PRIx64
", stb:%s, suid:%"
PRIx64
,
msg
->
dbFName
,
dbCache
->
dbId
,
msg
->
dbId
,
msg
->
stbName
,
msg
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
if
(
taosHashRemove
(
dbCache
->
tbCache
.
stbCache
,
&
msg
->
suid
,
sizeof
(
msg
->
suid
)))
{
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
ctgDebug
(
"stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
CTG_LOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
if
(
taosHashRemove
(
dbCache
->
tbCache
.
metaCache
,
msg
->
stbName
,
strlen
(
msg
->
stbName
)))
{
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
tbCache
.
stbLock
);
ctgError
(
"stb not exist in cache, dbFName:%s, stb:%s, suid:%"
PRIx64
,
msg
->
dbFName
,
msg
->
stbName
,
msg
->
suid
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
...
...
@@ -2543,6 +2531,47 @@ _return:
CTG_API_LEAVE
(
code
);
}
int32_t
catalogRemoveTableMeta
(
SCatalog
*
pCtg
,
SName
*
pTableName
)
{
CTG_API_ENTER
();
int32_t
code
=
0
;
if
(
NULL
==
pCtg
||
NULL
==
pTableName
)
{
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
if
(
NULL
==
pCtg
->
dbCache
)
{
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
}
STableMeta
*
tblMeta
=
NULL
;
int32_t
exist
=
0
;
uint64_t
dbId
=
0
;
CTG_ERR_JRET
(
ctgGetTableMetaFromCache
(
pCtg
,
pTableName
,
&
tblMeta
,
&
exist
,
0
,
&
dbId
));
if
(
0
==
exist
)
{
ctgDebug
(
"table already not in cache, db:%s, tblName:%s"
,
pTableName
->
dbname
,
pTableName
->
tname
);
goto
_return
;
}
char
dbFName
[
TSDB_DB_FNAME_LEN
];
tNameGetFullDbName
(
pTableName
,
dbFName
);
if
(
TSDB_SUPER_TABLE
==
tblMeta
->
tableType
)
{
CTG_ERR_JRET
(
ctgPushRmStbMsgInQueue
(
pCtg
,
dbFName
,
dbId
,
pTableName
->
tname
,
tblMeta
->
suid
));
}
else
{
CTG_ERR_JRET
(
ctgPushRmTblMsgInQueue
(
pCtg
,
dbFName
,
dbId
,
pTableName
->
tname
));
}
_return:
tfree
(
tblMeta
);
CTG_API_LEAVE
(
code
);
}
int32_t
catalogRemoveStbMeta
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
uint64_t
dbId
,
const
char
*
stbName
,
uint64_t
suid
)
{
CTG_API_ENTER
();
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
dad8b812
...
...
@@ -8114,7 +8114,11 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
size_t
numOfCols
=
LIST_LENGTH
(
pScanPhyNode
->
pScanCols
);
tsdbReaderT
pDataReader
=
doCreateDataReader
((
STableScanPhysiNode
*
)
pPhyNode
,
pHandle
,
(
uint64_t
)
queryId
,
taskId
);
if
(
NULL
==
pDataReader
)
{
errInfo
->
code
=
terrno
;
errInfo
->
tableName
=
pScanPhyNode
->
tableName
;
return
NULL
;
}
int32_t
code
=
doCreateTableGroup
(
pHandle
->
meta
,
pScanPhyNode
->
tableType
,
pScanPhyNode
->
uid
,
pTableGroupInfo
,
queryId
,
taskId
);
return
createTableScanOperatorInfo
(
pDataReader
,
pScanPhyNode
->
order
,
numOfCols
,
pScanPhyNode
->
count
,
pScanPhyNode
->
reverse
,
pTaskInfo
);
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
dad8b812
...
...
@@ -223,7 +223,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t
schLaunchTasksInFlowCtrlList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schLaunchTaskImpl
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schFetchFromRemote
(
SSchJob
*
pJob
);
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
,
S
QueryErrorInfo
*
errInfo
);
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
,
S
Array
*
errList
);
#ifdef __cplusplus
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
dad8b812
...
...
@@ -729,8 +729,8 @@ int32_t schProcessOnDataFetched(SSchJob *job) {
tsem_post
(
&
job
->
rspSem
);
}
int32_t
schPushToErrInfoList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
S
QueryErrorInfo
*
errInfo
)
{
if
(
NULL
==
err
Info
||
!
SCH_IS_DATA_SRC_TASK
(
pTask
)
||
!
IS_CLIENT_RETRY_ERROR
(
errInfo
->
code
))
{
int32_t
schPushToErrInfoList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
S
Array
*
errList
)
{
if
(
NULL
==
err
List
||
!
SCH_IS_DATA_SRC_TASK
(
pTask
))
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -743,10 +743,20 @@ int32_t schPushToErrInfoList(SSchJob *pJob, SSchTask *pTask, SQueryErrorInfo *er
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
if
(
NULL
==
taosArrayPush
(
pJob
->
errList
,
errInfo
))
{
SCH_TASK_ELOG
(
"taosArrayPush errInfo to list failed, errCode:%x"
,
errInfo
->
code
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SQueryErrorInfo
*
errInfo
=
NULL
;
int32_t
errNum
=
taosArrayGetSize
(
errList
);
for
(
int32_t
i
=
0
;
i
<
errNum
;
++
i
)
{
errInfo
=
taosArrayGet
(
errList
,
i
);
if
(
!
NEED_CLIENT_HANDLE_ERROR
(
errInfo
->
code
))
{
continue
;
}
if
(
NULL
==
taosArrayPush
(
pJob
->
errList
,
errInfo
))
{
SCH_TASK_ELOG
(
"taosArrayPush errInfo to list failed, errCode:%x"
,
errInfo
->
code
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -754,12 +764,13 @@ int32_t schPushToErrInfoList(SSchJob *pJob, SSchTask *pTask, SQueryErrorInfo *er
// Note: no more task error processing, handled in function internal
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
,
S
QueryErrorInfo
*
errInfo
)
{
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
,
S
Array
*
errList
)
{
int8_t
status
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_TASK_DLOG
(
"task failed not processed cause of job status, job status:%d"
,
status
);
taosArrayDestroy
(
errList
);
SCH_RET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
...
...
@@ -784,7 +795,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode,
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_FAILED
);
SCH_ERR_JRET
(
schPushToErrInfoList
(
pJob
,
pTask
,
err
Info
));
SCH_ERR_JRET
(
schPushToErrInfoList
(
pJob
,
pTask
,
err
List
));
if
(
SCH_IS_WAIT_ALL_JOB
(
pJob
))
{
SCH_LOCK
(
SCH_WRITE
,
&
pTask
->
level
->
lock
);
...
...
@@ -795,11 +806,14 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode,
atomic_store_32
(
&
pJob
->
errCode
,
errCode
);
if
(
taskDone
<
pTask
->
level
->
taskNum
)
{
SCH_TASK_DLOG
(
"not all tasks done, done:%d, all:%d"
,
taskDone
,
pTask
->
level
->
taskNum
);
SCH_ERR_RET
(
errCode
);
SCH_TASK_DLOG
(
"need to wait other tasks, doneNum:%d, allNum:%d"
,
taskDone
,
pTask
->
level
->
taskNum
);
taosArrayDestroy
(
errList
);
SCH_RET
(
errCode
);
}
}
}
else
{
taosArrayDestroy
(
errList
);
SCH_ERR_JRET
(
schHandleTaskRetry
(
pJob
,
pTask
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -807,6 +821,8 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode,
_return:
taosArrayDestroy
(
errList
);
SCH_RET
(
schProcessOnJobFailure
(
pJob
,
errCode
));
}
...
...
@@ -924,6 +940,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
int8_t
status
=
0
;
bool
errInfoGot
=
false
;
SQueryErrorInfo
errInfo
=
{
0
};
SArray
*
errList
=
NULL
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_TASK_ELOG
(
"rsp not processed cause of job status, job status:%d"
,
status
);
...
...
@@ -935,6 +952,32 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
switch
(
msgType
)
{
case
TDMT_VND_CREATE_TABLE_RSP
:
{
SVCreateTbBatchRsp
batchRsp
=
{
0
};
if
(
msg
)
{
tDeserializeSVCreateTbBatchRsp
(
msg
,
msgSize
,
&
batchRsp
);
if
(
batchRsp
.
rspList
)
{
int32_t
num
=
taosArrayGetSize
(
batchRsp
.
rspList
);
errList
=
taosArrayInit
(
num
,
sizeof
(
SQueryErrorInfo
));
if
(
NULL
==
errList
)
{
SCH_TASK_ELOG
(
"taskArrayInit %d errInfo failed"
,
num
);
taosArrayDestroy
(
batchRsp
.
rspList
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SVCreateTbRsp
*
rsp
=
taosArrayGet
(
batchRsp
.
rspList
,
i
);
errInfo
.
code
=
rsp
->
code
;
errInfo
.
tableName
=
rsp
->
tableName
;
taosArrayPush
(
errList
,
&
errInfo
);
}
taosArrayDestroy
(
batchRsp
.
rspList
);
errInfoGot
=
true
;
}
}
SCH_ERR_JRET
(
rspCode
);
SCH_ERR_RET
(
schProcessOnTaskSuccess
(
pJob
,
pTask
));
...
...
@@ -942,7 +985,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
case
TDMT_VND_SUBMIT_RSP
:
{
#if 0 //TODO OPEN THIS
SS
hellSubmitRspMsg *rsp = (SShellSubmitRspMsg
*)msg;
SS
ubmitRsp *rsp = (SSubmitRsp
*)msg;
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
...
...
@@ -969,6 +1012,14 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
if
(
rsp
.
code
)
{
errInfo
.
code
=
rsp
.
code
;
errInfo
.
tableName
=
rsp
.
tableName
;
errList
=
taosArrayInit
(
1
,
sizeof
(
SQueryErrorInfo
));
if
(
NULL
==
errList
)
{
SCH_TASK_ELOG
(
"taskArrayInit %d errInfo failed"
,
1
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
taosArrayPush
(
errList
,
&
errInfo
);
errInfoGot
=
true
;
}
...
...
@@ -1039,7 +1090,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
_return:
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
,
errInfoGot
?
&
errInfo
:
NULL
));
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
,
errInfoGot
?
errList
:
NULL
));
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录