Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
76567312
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
76567312
编写于
5月 19, 2022
作者:
D
dapan1121
提交者:
GitHub
5月 19, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12676 from taosdata/feature/qnode
feat: update table meta based on sversion
上级
c04890c5
fb944e85
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
229 addition
and
41 deletion
+229
-41
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+7
-0
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+1
-1
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+10
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+65
-5
source/common/src/tmsg.c
source/common/src/tmsg.c
+6
-10
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+2
-0
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+3
-0
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+104
-0
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+7
-1
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+23
-23
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+1
-1
未找到文件。
include/libs/catalog/catalog.h
浏览文件 @
76567312
...
...
@@ -59,6 +59,11 @@ typedef struct SMetaData {
SArray
*
pQnodeList
;
// qnode list, SArray<SQueryNodeAddr>
}
SMetaData
;
typedef
struct
STbSVersion
{
char
*
tbFName
;
int32_t
sver
;
}
STbSVersion
;
typedef
struct
SCatalogCfg
{
uint32_t
maxTblCacheNum
;
uint32_t
maxDBCacheNum
;
...
...
@@ -165,6 +170,8 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg);
*/
int32_t
catalogRefreshDBVgInfo
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbFName
);
int32_t
catalogChkTbMetaVersion
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
SArray
*
pTables
);
/**
* Force refresh a table's local cached meta data.
* @param pCatalog (input, got with catalogGetHandle)
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
76567312
...
...
@@ -72,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t
schedulerExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
int64_t
startTs
,
bool
needRes
,
SQueryResult
*
pRes
);
int32_t
schedulerExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
int64_t
startTs
,
SQueryResult
*
pRes
);
/**
* Process the query job, generated according to the query physical plan.
...
...
source/client/src/clientEnv.c
浏览文件 @
76567312
...
...
@@ -83,6 +83,15 @@ void closeTransporter(STscObj *pTscObj) {
rpcClose
(
pTscObj
->
pAppInfo
->
pTransporter
);
}
static
bool
clientRpcRfp
(
int32_t
code
)
{
if
(
code
==
TSDB_CODE_RPC_REDIRECT
)
{
return
true
;
}
else
{
return
false
;
}
}
// TODO refactor
void
*
openTransporter
(
const
char
*
user
,
const
char
*
auth
,
int32_t
numOfThread
)
{
SRpcInit
rpcInit
;
...
...
@@ -91,6 +100,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit
.
label
=
"TSC"
;
rpcInit
.
numOfThreads
=
numOfThread
;
rpcInit
.
cfp
=
processMsgFromServer
;
rpcInit
.
rfp
=
clientRpcRfp
;
rpcInit
.
sessions
=
1024
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
user
=
(
char
*
)
user
;
...
...
source/client/src/clientImpl.c
浏览文件 @
76567312
...
...
@@ -291,7 +291,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
SQueryResult
res
=
{.
code
=
0
,
.
numOfRows
=
0
,
.
msgSize
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
msg
=
pRequest
->
msgBuf
};
int32_t
code
=
schedulerExecJob
(
pTransporter
,
pNodeList
,
pDag
,
&
pRequest
->
body
.
queryJob
,
pRequest
->
sqlstr
,
pRequest
->
metric
.
start
,
NULL
!=
pRes
,
&
res
);
pRequest
->
metric
.
start
,
&
res
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
...
...
@@ -310,9 +310,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
}
}
if
(
pRes
)
{
*
pRes
=
res
.
res
;
}
*
pRes
=
res
.
res
;
pRequest
->
code
=
res
.
code
;
terrno
=
res
.
code
;
...
...
@@ -324,7 +322,60 @@ int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList)
return
getPlan
(
pRequest
,
pQuery
,
&
pRequest
->
body
.
pDag
,
*
pNodeList
);
}
int32_t
validateSversion
(
SRequestObj
*
pRequest
,
void
*
res
)
{
SArray
*
pArray
=
NULL
;
int32_t
code
=
0
;
if
(
TDMT_VND_SUBMIT
==
pRequest
->
type
)
{
SSubmitRsp
*
pRsp
=
(
SSubmitRsp
*
)
res
;
if
(
pRsp
->
nBlocks
<=
0
)
{
return
TSDB_CODE_SUCCESS
;
}
pArray
=
taosArrayInit
(
pRsp
->
nBlocks
,
sizeof
(
STbSVersion
));
if
(
NULL
==
pArray
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
for
(
int32_t
i
=
0
;
i
<
pRsp
->
nBlocks
;
++
i
)
{
SSubmitBlkRsp
*
blk
=
pRsp
->
pBlocks
+
i
;
STbSVersion
tbSver
=
{.
tbFName
=
blk
->
tblFName
,
.
sver
=
blk
->
sver
};
taosArrayPush
(
pArray
,
&
tbSver
);
}
}
else
if
(
TDMT_VND_QUERY
==
pRequest
->
type
)
{
}
SCatalog
*
pCatalog
=
NULL
;
CHECK_CODE_GOTO
(
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
),
_return
);
SEpSet
epset
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
);
code
=
catalogChkTbMetaVersion
(
pCatalog
,
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
epset
,
pArray
);
_return:
taosArrayDestroy
(
pArray
);
return
code
;
}
void
freeRequestRes
(
SRequestObj
*
pRequest
,
void
*
res
)
{
if
(
NULL
==
res
)
{
return
;
}
if
(
TDMT_VND_SUBMIT
==
pRequest
->
type
)
{
tFreeSSubmitRsp
((
SSubmitRsp
*
)
res
);
}
else
if
(
TDMT_VND_QUERY
==
pRequest
->
type
)
{
}
}
SRequestObj
*
launchQueryImpl
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
int32_t
code
,
bool
keepQuery
,
void
**
res
)
{
void
*
pRes
=
NULL
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
switch
(
pQuery
->
execMode
)
{
case
QUERY_EXEC_MODE_LOCAL
:
...
...
@@ -337,7 +388,10 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
SArray
*
pNodeList
=
taosArrayInit
(
4
,
sizeof
(
struct
SQueryNodeAddr
));
code
=
getPlan
(
pRequest
,
pQuery
,
&
pRequest
->
body
.
pDag
,
pNodeList
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
scheduleQuery
(
pRequest
,
pRequest
->
body
.
pDag
,
pNodeList
,
res
);
code
=
scheduleQuery
(
pRequest
,
pRequest
->
body
.
pDag
,
pNodeList
,
&
pRes
);
if
(
NULL
!=
pRes
)
{
code
=
validateSversion
(
pRequest
,
pRes
);
}
}
taosArrayDestroy
(
pNodeList
);
break
;
...
...
@@ -356,6 +410,12 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
if
(
NULL
!=
pRequest
&&
TSDB_CODE_SUCCESS
!=
code
)
{
pRequest
->
code
=
terrno
;
freeRequestRes
(
pRequest
,
pRes
);
pRes
=
NULL
;
}
if
(
res
)
{
*
res
=
pRes
;
}
return
pRequest
;
...
...
source/common/src/tmsg.c
浏览文件 @
76567312
...
...
@@ -4087,10 +4087,8 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl
if
(
tEncodeI32
(
pEncoder
,
pBlock
->
code
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pBlock
->
hashMeta
)
<
0
)
return
-
1
;
if
(
pBlock
->
hashMeta
)
{
if
(
tEncodeI64
(
pEncoder
,
pBlock
->
uid
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pBlock
->
tblFName
)
<
0
)
return
-
1
;
}
if
(
tEncodeI64
(
pEncoder
,
pBlock
->
uid
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pBlock
->
tblFName
)
<
0
)
return
-
1
;
if
(
tEncodeI32v
(
pEncoder
,
pBlock
->
numOfRows
)
<
0
)
return
-
1
;
if
(
tEncodeI32v
(
pEncoder
,
pBlock
->
affectedRows
)
<
0
)
return
-
1
;
if
(
tEncodeI64v
(
pEncoder
,
pBlock
->
sver
)
<
0
)
return
-
1
;
...
...
@@ -4104,12 +4102,10 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
if
(
tDecodeI32
(
pDecoder
,
&
pBlock
->
code
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pBlock
->
hashMeta
)
<
0
)
return
-
1
;
if
(
pBlock
->
hashMeta
)
{
if
(
tDecodeI64
(
pDecoder
,
&
pBlock
->
uid
)
<
0
)
return
-
1
;
pBlock
->
tblFName
=
taosMemoryCalloc
(
TSDB_TABLE_FNAME_LEN
,
1
);
if
(
NULL
==
pBlock
->
tblFName
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
pBlock
->
tblFName
)
<
0
)
return
-
1
;
}
if
(
tDecodeI64
(
pDecoder
,
&
pBlock
->
uid
)
<
0
)
return
-
1
;
pBlock
->
tblFName
=
taosMemoryCalloc
(
TSDB_TABLE_FNAME_LEN
,
1
);
if
(
NULL
==
pBlock
->
tblFName
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
pBlock
->
tblFName
)
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pDecoder
,
&
pBlock
->
numOfRows
)
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pDecoder
,
&
pBlock
->
affectedRows
)
<
0
)
return
-
1
;
if
(
tDecodeI64v
(
pDecoder
,
&
pBlock
->
sver
)
<
0
)
return
-
1
;
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
76567312
...
...
@@ -320,6 +320,8 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
terrno
=
TSDB_CODE_PAR_TABLE_NOT_EXIST
;
return
-
1
;
}
strcat
(
pRsp
->
tblFName
,
mr
.
me
.
name
);
if
(
mr
.
me
.
type
==
TSDB_NORMAL_TABLE
)
{
sverNew
=
mr
.
me
.
ntbEntry
.
schema
.
sver
;
}
else
{
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
76567312
...
...
@@ -686,6 +686,9 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
vnodeDebugPrintSingleSubmitMsg
(
pVnode
->
pMeta
,
pBlock
,
&
msgIter
,
"real uid"
);
tDecoderClear
(
&
decoder
);
}
else
{
submitBlkRsp
.
tblFName
=
taosMemoryMalloc
(
TSDB_TABLE_FNAME_LEN
);
sprintf
(
submitBlkRsp
.
tblFName
,
"%s."
,
pVnode
->
config
.
dbname
);
}
if
(
tsdbInsertTableData
(
pVnode
->
pTsdb
,
&
msgIter
,
pBlock
,
&
submitBlkRsp
)
<
0
)
{
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
76567312
...
...
@@ -2883,6 +2883,110 @@ _return:
CTG_API_LEAVE
(
code
);
}
int32_t
ctgGetTbSverFromCache
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
int32_t
*
sver
)
{
*
sver
=
-
1
;
if
(
NULL
==
pCtg
->
dbCache
)
{
ctgDebug
(
"empty tbmeta cache, tbName:%s"
,
pTableName
->
tname
);
return
TSDB_CODE_SUCCESS
;
}
SCtgDBCache
*
dbCache
=
NULL
;
char
dbFName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
tNameGetFullDbName
(
pTableName
,
dbFName
);
ctgAcquireDBCache
(
pCtg
,
dbFName
,
&
dbCache
);
if
(
NULL
==
dbCache
)
{
ctgDebug
(
"db %s not in cache"
,
pTableName
->
tname
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
tbType
=
0
;
uint64_t
suid
=
0
;
CTG_LOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
STableMeta
*
tbMeta
=
taosHashGet
(
dbCache
->
tbCache
.
metaCache
,
pTableName
->
tname
,
strlen
(
pTableName
->
tname
));
if
(
tbMeta
)
{
tbType
=
tbMeta
->
tableType
;
suid
=
tbMeta
->
suid
;
if
(
tbType
!=
TSDB_CHILD_TABLE
)
{
*
sver
=
tbMeta
->
sversion
;
}
}
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
metaLock
);
if
(
NULL
==
tbMeta
)
{
ctgReleaseDBCache
(
pCtg
,
dbCache
);
return
TSDB_CODE_SUCCESS
;
}
if
(
tbType
!=
TSDB_CHILD_TABLE
)
{
ctgReleaseDBCache
(
pCtg
,
dbCache
);
ctgDebug
(
"Got sver %d from cache, type:%d, dbFName:%s, tbName:%s"
,
*
sver
,
tbType
,
dbFName
,
pTableName
->
tname
);
return
TSDB_CODE_SUCCESS
;
}
ctgDebug
(
"Got subtable meta from cache, dbFName:%s, tbName:%s, suid:%"
PRIx64
,
dbFName
,
pTableName
->
tname
,
suid
);
CTG_LOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
stbLock
);
STableMeta
**
stbMeta
=
taosHashGet
(
dbCache
->
tbCache
.
stbCache
,
&
suid
,
sizeof
(
suid
));
if
(
NULL
==
stbMeta
||
NULL
==
*
stbMeta
)
{
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
stbLock
);
ctgReleaseDBCache
(
pCtg
,
dbCache
);
ctgDebug
(
"stb not in stbCache, suid:%"
PRIx64
,
suid
);
return
TSDB_CODE_SUCCESS
;
}
if
((
*
stbMeta
)
->
suid
!=
suid
)
{
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
stbLock
);
ctgReleaseDBCache
(
pCtg
,
dbCache
);
ctgError
(
"stable suid in stbCache mis-match, expected suid:%"
PRIx64
",actual suid:%"
PRIx64
,
suid
,
(
*
stbMeta
)
->
suid
);
CTG_ERR_RET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
}
*
sver
=
(
*
stbMeta
)
->
sversion
;
CTG_UNLOCK
(
CTG_READ
,
&
dbCache
->
tbCache
.
stbLock
);
ctgReleaseDBCache
(
pCtg
,
dbCache
);
ctgDebug
(
"Got sver %d from cache, type:%d, dbFName:%s, tbName:%s"
,
*
sver
,
tbType
,
dbFName
,
pTableName
->
tname
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
catalogChkTbMetaVersion
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
SArray
*
pTables
)
{
CTG_API_ENTER
();
if
(
NULL
==
pCtg
||
NULL
==
pTrans
||
NULL
==
pMgmtEps
||
NULL
==
pTables
)
{
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
SName
name
;
int32_t
sver
=
0
;
int32_t
tbNum
=
taosArrayGetSize
(
pTables
);
for
(
int32_t
i
=
0
;
i
<
tbNum
;
++
i
)
{
STbSVersion
*
pTb
=
(
STbSVersion
*
)
taosArrayGet
(
pTables
,
i
);
tNameFromString
(
&
name
,
pTb
->
tbFName
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
if
(
CTG_IS_SYS_DBNAME
(
name
.
dbname
))
{
continue
;
}
ctgGetTbSverFromCache
(
pCtg
,
&
name
,
&
sver
);
if
(
sver
>=
0
&&
sver
<
pTb
->
sver
)
{
catalogRemoveTableMeta
(
pCtg
,
&
name
);
//TODO REMOVE STB FROM CACHE
}
}
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
}
int32_t
catalogRefreshDBVgInfo
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbFName
)
{
CTG_API_ENTER
();
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
76567312
...
...
@@ -39,6 +39,12 @@ enum {
SCH_WRITE
,
};
typedef
enum
{
SCH_RES_TYPE_QUERY
,
SCH_RES_TYPE_FETCH
,
}
SCH_RES_TYPE
;
typedef
struct
SSchTrans
{
void
*
transInst
;
void
*
transHandle
;
...
...
@@ -159,7 +165,6 @@ typedef struct SSchTask {
typedef
struct
SSchJobAttr
{
EExplainMode
explainMode
;
bool
needRes
;
bool
syncSchedule
;
bool
queryJob
;
bool
needFlowCtrl
;
...
...
@@ -192,6 +197,7 @@ typedef struct SSchJob {
int32_t
errCode
;
SArray
*
errList
;
// SArray<SQueryErrorInfo>
SRWLatch
resLock
;
SCH_RES_TYPE
resType
;
void
*
resData
;
//TODO free it or not
int32_t
resNumOfRows
;
const
char
*
sql
;
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
76567312
...
...
@@ -70,7 +70,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
}
int32_t
schInitJob
(
SSchJob
**
pSchJob
,
SQueryPlan
*
pDag
,
void
*
transport
,
SArray
*
pNodeList
,
const
char
*
sql
,
int64_t
startTs
,
bool
needRes
,
bool
syncSchedule
)
{
int64_t
startTs
,
bool
syncSchedule
)
{
int32_t
code
=
0
;
int64_t
refId
=
-
1
;
SSchJob
*
pJob
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchJob
));
...
...
@@ -81,7 +81,6 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray
pJob
->
attr
.
explainMode
=
pDag
->
explainInfo
.
mode
;
pJob
->
attr
.
syncSchedule
=
syncSchedule
;
pJob
->
attr
.
needRes
=
needRes
;
pJob
->
transport
=
transport
;
pJob
->
sql
=
sql
;
...
...
@@ -1059,6 +1058,8 @@ _return:
int32_t
schProcessOnExplainDone
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SRetrieveTableRsp
*
pRsp
)
{
SCH_TASK_DLOG
(
"got explain rsp, rows:%d, complete:%d"
,
htonl
(
pRsp
->
numOfRows
),
pRsp
->
completed
);
pJob
->
resType
=
SCH_RES_TYPE_FETCH
;
atomic_store_32
(
&
pJob
->
resNumOfRows
,
htonl
(
pRsp
->
numOfRows
));
atomic_store_ptr
(
&
pJob
->
resData
,
pRsp
);
...
...
@@ -1179,23 +1180,20 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
atomic_add_fetch_32
(
&
pJob
->
resNumOfRows
,
rsp
->
affectedRows
);
SCH_TASK_DLOG
(
"submit succeed, affectedRows:%d"
,
rsp
->
affectedRows
);
if
(
pJob
->
attr
.
needRes
)
{
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
if
(
pJob
->
resData
)
{
SSubmitRsp
*
sum
=
pJob
->
resData
;
sum
->
affectedRows
+=
rsp
->
affectedRows
;
sum
->
nBlocks
+=
rsp
->
nBlocks
;
sum
->
pBlocks
=
taosMemoryRealloc
(
sum
->
pBlocks
,
sum
->
nBlocks
*
sizeof
(
*
sum
->
pBlocks
));
memcpy
(
sum
->
pBlocks
+
sum
->
nBlocks
-
rsp
->
nBlocks
,
rsp
->
pBlocks
,
rsp
->
nBlocks
*
sizeof
(
*
sum
->
pBlocks
));
taosMemoryFree
(
rsp
->
pBlocks
);
taosMemoryFree
(
rsp
);
}
else
{
pJob
->
resData
=
rsp
;
}
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
pJob
->
resType
=
SCH_RES_TYPE_QUERY
;
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
if
(
pJob
->
resData
)
{
SSubmitRsp
*
sum
=
pJob
->
resData
;
sum
->
affectedRows
+=
rsp
->
affectedRows
;
sum
->
nBlocks
+=
rsp
->
nBlocks
;
sum
->
pBlocks
=
taosMemoryRealloc
(
sum
->
pBlocks
,
sum
->
nBlocks
*
sizeof
(
*
sum
->
pBlocks
));
memcpy
(
sum
->
pBlocks
+
sum
->
nBlocks
-
rsp
->
nBlocks
,
rsp
->
pBlocks
,
rsp
->
nBlocks
*
sizeof
(
*
sum
->
pBlocks
));
taosMemoryFree
(
rsp
->
pBlocks
);
taosMemoryFree
(
rsp
);
}
else
{
tFreeSSubmitRsp
(
rsp
)
;
pJob
->
resData
=
rsp
;
}
SCH_UNLOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
}
SCH_ERR_RET
(
schProcessOnTaskSuccess
(
pJob
,
pTask
));
...
...
@@ -2412,7 +2410,7 @@ void schFreeJobImpl(void *job) {
}
static
int32_t
schExecJobImpl
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
int64_t
startTs
,
bool
needRes
,
bool
syncSchedule
)
{
int64_t
startTs
,
bool
syncSchedule
)
{
qDebug
(
"QID:0x%"
PRIx64
" job started"
,
pDag
->
queryId
);
if
(
pNodeList
==
NULL
||
taosArrayGetSize
(
pNodeList
)
<=
0
)
{
...
...
@@ -2421,7 +2419,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD
int32_t
code
=
0
;
SSchJob
*
pJob
=
NULL
;
SCH_ERR_JRET
(
schInitJob
(
&
pJob
,
pDag
,
transport
,
pNodeList
,
sql
,
startTs
,
needRes
,
syncSchedule
));
SCH_ERR_JRET
(
schInitJob
(
&
pJob
,
pDag
,
transport
,
pNodeList
,
sql
,
startTs
,
syncSchedule
));
SCH_ERR_JRET
(
schLaunchJob
(
pJob
));
...
...
@@ -2463,6 +2461,8 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa
SCH_ERR_JRET
(
qExecStaticExplain
(
pDag
,
(
SRetrieveTableRsp
**
)
&
pJob
->
resData
));
pJob
->
resType
=
SCH_RES_TYPE_FETCH
;
int64_t
refId
=
taosAddRef
(
schMgmt
.
jobRef
,
pJob
);
if
(
refId
<
0
)
{
SCH_JOB_ELOG
(
"taosAddRef job failed, error:%s"
,
tstrerror
(
terrno
));
...
...
@@ -2535,7 +2535,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
}
int32_t
schedulerExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryPlan
*
pDag
,
int64_t
*
pJob
,
const
char
*
sql
,
int64_t
startTs
,
bool
needRes
,
SQueryResult
*
pRes
)
{
int64_t
startTs
,
SQueryResult
*
pRes
)
{
if
(
NULL
==
transport
||
NULL
==
pDag
||
NULL
==
pDag
->
pSubplans
||
NULL
==
pJob
||
NULL
==
pRes
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
...
...
@@ -2543,14 +2543,14 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
if
(
EXPLAIN_MODE_STATIC
==
pDag
->
explainInfo
.
mode
)
{
SCH_ERR_RET
(
schExecStaticExplain
(
transport
,
nodeList
,
pDag
,
pJob
,
sql
,
true
));
}
else
{
SCH_ERR_RET
(
schExecJobImpl
(
transport
,
nodeList
,
pDag
,
pJob
,
sql
,
startTs
,
needRes
,
true
));
SCH_ERR_RET
(
schExecJobImpl
(
transport
,
nodeList
,
pDag
,
pJob
,
sql
,
startTs
,
true
));
}
SSchJob
*
job
=
schAcquireJob
(
*
pJob
);
pRes
->
code
=
atomic_load_32
(
&
job
->
errCode
);
pRes
->
numOfRows
=
job
->
resNumOfRows
;
if
(
needRes
)
{
if
(
SCH_RES_TYPE_QUERY
==
job
->
resType
)
{
pRes
->
res
=
job
->
resData
;
job
->
resData
=
NULL
;
}
...
...
@@ -2568,7 +2568,7 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pD
if
(
EXPLAIN_MODE_STATIC
==
pDag
->
explainInfo
.
mode
)
{
SCH_ERR_RET
(
schExecStaticExplain
(
transport
,
pNodeList
,
pDag
,
pJob
,
sql
,
false
));
}
else
{
SCH_ERR_RET
(
schExecJobImpl
(
transport
,
pNodeList
,
pDag
,
pJob
,
sql
,
0
,
false
,
false
));
SCH_ERR_RET
(
schExecJobImpl
(
transport
,
pNodeList
,
pDag
,
pJob
,
sql
,
0
,
false
));
}
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
76567312
...
...
@@ -985,7 +985,7 @@ TEST(insertTest, normalCase) {
taosThreadCreate
(
&
(
thread1
),
&
thattr
,
schtSendRsp
,
&
insertJobRefId
);
SQueryResult
res
=
{
0
};
code
=
schedulerExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
insertJobRefId
,
"insert into tb values(now,1)"
,
0
,
false
,
&
res
);
code
=
schedulerExecJob
(
mockPointer
,
qnodeList
,
&
dag
,
&
insertJobRefId
,
"insert into tb values(now,1)"
,
0
,
&
res
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
res
.
numOfRows
,
20
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录