Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d7406cab
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1191
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d7406cab
编写于
3月 11, 2022
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/feature/scheduler' into feature/3.0_query_integrate_wxy
上级
028e6d16
732a0b2c
变更
35
展开全部
隐藏空白更改
内联
并排
Showing
35 changed file
with
812 addition
and
381 deletion
+812
-381
include/common/tmsg.h
include/common/tmsg.h
+7
-0
include/common/tname.h
include/common/tname.h
+2
-2
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+19
-10
include/libs/executor/executor.h
include/libs/executor/executor.h
+3
-1
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+1
-1
include/libs/qcom/query.h
include/libs/qcom/query.h
+10
-0
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+5
-4
include/util/taoserror.h
include/util/taoserror.h
+2
-0
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+1
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+96
-10
source/common/src/tmsg.c
source/common/src/tmsg.c
+34
-0
source/common/src/tname.c
source/common/src/tname.c
+21
-9
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+20
-2
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+3
-0
source/dnode/vnode/src/vnd/vnodeCfg.c
source/dnode/vnode/src/vnd/vnodeCfg.c
+20
-1
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+12
-4
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+18
-8
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+375
-264
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+2
-2
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+1
-1
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+2
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+23
-5
source/libs/executor/test/executorTests.cpp
source/libs/executor/test/executorTests.cpp
+1
-1
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+1
-1
source/libs/qcom/src/queryUtil.c
source/libs/qcom/src/queryUtil.c
+14
-0
source/libs/qcom/src/querymsg.c
source/libs/qcom/src/querymsg.c
+6
-0
source/libs/qworker/inc/qworkerMsg.h
source/libs/qworker/inc/qworkerMsg.h
+1
-1
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+4
-3
source/libs/qworker/src/qworkerMsg.c
source/libs/qworker/src/qworkerMsg.c
+11
-5
source/libs/qworker/test/qworkerTests.cpp
source/libs/qworker/test/qworkerTests.cpp
+1
-1
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+11
-7
source/libs/scheduler/src/schFlowCtrl.c
source/libs/scheduler/src/schFlowCtrl.c
+1
-1
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+80
-33
source/util/src/terror.c
source/util/src/terror.c
+3
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
d7406cab
...
...
@@ -24,6 +24,7 @@
#include "thash.h"
#include "tlist.h"
#include "trow.h"
#include "tname.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -459,8 +460,14 @@ typedef struct {
typedef
struct
{
int32_t
code
;
SName
tableName
;
}
SQueryTableRsp
;
int32_t
tSerializeSQueryTableRsp
(
void
*
buf
,
int32_t
bufLen
,
SQueryTableRsp
*
pRsp
);
int32_t
tDeserializeSQueryTableRsp
(
void
*
buf
,
int32_t
bufLen
,
SQueryTableRsp
*
pRsp
);
typedef
struct
{
char
db
[
TSDB_DB_FNAME_LEN
];
int32_t
numOfVgroups
;
...
...
include/common/tname.h
浏览文件 @
d7406cab
...
...
@@ -17,7 +17,6 @@
#define _TD_COMMON_NAME_H_
#include "tdef.h"
#include "tmsg.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -61,7 +60,8 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type);
int32_t
tNameSetAcctId
(
SName
*
dst
,
int32_t
acctId
);
SSchema
createSchema
(
uint8_t
type
,
int32_t
bytes
,
int32_t
colId
,
const
char
*
name
);
bool
tNameDBNameEqual
(
SName
*
left
,
SName
*
right
);
#ifdef __cplusplus
}
...
...
include/libs/catalog/catalog.h
浏览文件 @
d7406cab
...
...
@@ -103,11 +103,10 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pDBName (input, full db name)
* @param forceUpdate (input, force update db vgroup info from mnode)
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
int32_t
catalogGetDBVgInfo
(
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
bool
forceUpdate
,
SArray
**
pVgroupList
);
int32_t
catalogGetDBVgInfo
(
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
char
*
pDBName
,
SArray
**
pVgroupList
);
int32_t
catalogUpdateDBVgInfo
(
SCatalog
*
pCatalog
,
const
char
*
dbName
,
uint64_t
dbId
,
SDBVgInfo
*
dbInfo
);
...
...
@@ -120,7 +119,7 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId,
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name
, NOT including db name
)
* @param pTableName (input, table name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
...
...
@@ -131,7 +130,7 @@ int32_t catalogGetTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSe
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name
, NOT including db name
)
* @param pTableName (input, table name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
...
...
@@ -140,28 +139,38 @@ int32_t catalogGetSTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpS
int32_t
catalogUpdateSTableMeta
(
SCatalog
*
pCatalog
,
STableMetaRsp
*
rspMsg
);
/**
* Force refresh DB's local cached vgroup info.
* @param pCtg (input, got with catalogGetHandle)
* @param pTrans (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param dbFName (input, db full name)
* @return error code
*/
int32_t
catalogRefreshDBVgInfo
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbFName
);
/**
* Force refresh a table's local cached meta data.
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name
, NOT including db name
)
* @param pTableName (input, table name)
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code
*/
int32_t
catalogRefreshTableMeta
(
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
int32_t
isSTable
);
int32_t
catalogRefreshTableMeta
(
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
int32_t
isSTable
);
/**
* Force refresh a table's local cached meta data and get the new one.
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name
, NOT including db name
)
* @param pTableName (input, table name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code
*/
int32_t
catalogRefreshGetTableMeta
(
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
isSTable
);
int32_t
catalogRefreshGetTableMeta
(
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
isSTable
);
...
...
@@ -170,7 +179,7 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg);
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name
, NOT including db name
)
* @param pTableName (input, table name)
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
...
...
@@ -181,7 +190,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, void *pTransporter, const
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name
, NOT including db name
)
* @param pTableName (input, table name)
* @param vgInfo (output, vgroup info)
* @return error code
*/
...
...
include/libs/executor/executor.h
浏览文件 @
d7406cab
...
...
@@ -21,6 +21,7 @@ extern "C" {
#endif
#include "tcommon.h"
#include "query.h"
typedef
void
*
qTaskInfo_t
;
typedef
void
*
DataSinkHandle
;
...
...
@@ -30,6 +31,7 @@ struct SSubplan;
typedef
struct
SReadHandle
{
void
*
reader
;
void
*
meta
;
void
*
config
;
}
SReadHandle
;
/**
...
...
@@ -67,7 +69,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA
* @param qId
* @return
*/
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
struct
SSubplan
*
pPlan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
);
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
struct
SSubplan
*
pPlan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
,
SQueryErrorInfo
*
errInfo
);
/**
* The main task execution function, including query on both table and multiple tables,
...
...
include/libs/nodes/plannodes.h
浏览文件 @
d7406cab
...
...
@@ -106,7 +106,7 @@ typedef struct SSubLogicPlan {
}
SSubLogicPlan
;
typedef
struct
SQueryLogicPlan
{
ENodeType
type
;
;
ENodeType
type
;
int32_t
totalLevel
;
SNodeList
*
pTopSubplans
;
}
SQueryLogicPlan
;
...
...
include/libs/qcom/query.h
浏览文件 @
d7406cab
...
...
@@ -134,6 +134,11 @@ typedef struct SQueryNodeStat {
int32_t
tableNum
;
// vg table number, unit is TSDB_TABLE_NUM_UNIT
}
SQueryNodeStat
;
typedef
struct
SQueryErrorInfo
{
int32_t
code
;
SName
tableName
;
}
SQueryErrorInfo
;
int32_t
initTaskQueue
();
int32_t
cleanupTaskQueue
();
...
...
@@ -166,6 +171,8 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_
int32_t
queryCreateTableMetaFromMsg
(
STableMetaRsp
*
msg
,
bool
isSuperTable
,
STableMeta
**
pMeta
);
SSchema
createSchema
(
uint8_t
type
,
int32_t
bytes
,
int32_t
colId
,
const
char
*
name
);
extern
int32_t
(
*
queryBuildMsg
[
TDMT_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
);
extern
int32_t
(
*
queryProcessMsgRsp
[
TDMT_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
);
...
...
@@ -174,6 +181,9 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
#define IS_CLIENT_RETRY_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH)
#define IS_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT)
#define qFatal(...) \
do { \
if (qDebugFlag & DEBUG_FATAL) { \
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
d7406cab
...
...
@@ -52,10 +52,11 @@ typedef struct SQueryProfileSummary {
}
SQueryProfileSummary
;
typedef
struct
SQueryResult
{
int32_t
code
;
uint64_t
numOfRows
;
int32_t
msgSize
;
char
*
msg
;
int32_t
code
;
SArray
*
errList
;
// SArray<SQueryErrorInfo>
uint64_t
numOfRows
;
int32_t
msgSize
;
char
*
msg
;
}
SQueryResult
;
typedef
struct
STaskInfo
{
...
...
include/util/taoserror.h
浏览文件 @
d7406cab
...
...
@@ -328,6 +328,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513)
#define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514)
#define TSDB_CODE_VND_TB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0515)
#define TSDB_CODE_VND_HASH_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x0516)
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
...
...
@@ -454,6 +455,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404)
#define TSDB_CODE_CTG_DB_DROPPED TAOS_DEF_ERROR_CODE(0, 0x2405)
#define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2406)
#define TSDB_CODE_CTG_VG_META_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x2407)
//scheduler
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501)
...
...
source/client/inc/clientInt.h
浏览文件 @
d7406cab
...
...
@@ -184,6 +184,7 @@ typedef struct SRequestObj {
char
*
msgBuf
;
void
*
pInfo
;
// sql parse info, generated by parser module
int32_t
code
;
SArray
*
errList
;
// SArray<SQueryErrorInfo>
SQueryExecMetric
metric
;
SRequestSendRecvBody
body
;
}
SRequestObj
;
...
...
source/client/src/clientImpl.c
浏览文件 @
d7406cab
...
...
@@ -229,6 +229,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
}
pRequest
->
errList
=
res
.
errList
;
pRequest
->
code
=
code
;
return
pRequest
->
code
;
}
...
...
@@ -240,19 +241,13 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
}
}
pRequest
->
errList
=
res
.
errList
;
pRequest
->
code
=
res
.
code
;
return
pRequest
->
code
;
}
TAOS_RES
*
taos_query_l
(
TAOS
*
taos
,
const
char
*
sql
,
int
sqlLen
)
{
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
if
(
sqlLen
>
(
size_t
)
TSDB_MAX_ALLOWED_SQL_LEN
)
{
tscError
(
"sql string exceeds max length:%d"
,
TSDB_MAX_ALLOWED_SQL_LEN
);
terrno
=
TSDB_CODE_TSC_EXCEED_SQL_LIMIT
;
return
NULL
;
}
SRequestObj
*
execQueryImpl
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
)
{
SRequestObj
*
pRequest
=
NULL
;
SQuery
*
pQuery
=
NULL
;
SArray
*
pNodeList
=
taosArrayInit
(
4
,
sizeof
(
struct
SQueryNodeAddr
));
...
...
@@ -279,6 +274,97 @@ _return:
return
pRequest
;
}
int32_t
clientProcessErrorList
(
SArray
**
pList
)
{
SArray
*
errList
=
*
pList
;
int32_t
errNum
=
(
int32_t
)
taosArrayGetSize
(
errList
);
for
(
int32_t
i
=
0
;
i
<
errNum
;
++
i
)
{
SQueryErrorInfo
*
errInfo
=
taosArrayGet
(
errList
,
i
);
if
(
TSDB_CODE_VND_HASH_MISMATCH
==
errInfo
->
code
)
{
if
(
i
==
(
errNum
-
1
))
{
break
;
}
// TODO REMOVE SAME DB ERROR
}
else
{
taosArrayRemove
(
errList
,
i
);
--
i
;
--
errNum
;
}
}
if
(
0
==
errNum
)
{
taosArrayDestroy
(
*
pList
);
*
pList
=
NULL
;
}
return
TSDB_CODE_SUCCESS
;
}
SRequestObj
*
execQuery
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
)
{
SRequestObj
*
pRequest
=
NULL
;
int32_t
code
=
0
;
bool
quit
=
false
;
while
(
!
quit
)
{
pRequest
=
execQueryImpl
(
pTscObj
,
sql
,
sqlLen
);
if
(
TSDB_CODE_SUCCESS
==
pRequest
->
code
||
NULL
==
pRequest
->
errList
)
{
break
;
}
code
=
clientProcessErrorList
(
&
pRequest
->
errList
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
NULL
==
pRequest
->
errList
)
{
break
;
}
int32_t
errNum
=
(
int32_t
)
taosArrayGetSize
(
pRequest
->
errList
);
for
(
int32_t
i
=
0
;
i
<
errNum
;
++
i
)
{
SQueryErrorInfo
*
errInfo
=
taosArrayGet
(
pRequest
->
errList
,
i
);
if
(
TSDB_CODE_VND_HASH_MISMATCH
==
errInfo
->
code
)
{
SCatalog
*
pCatalog
=
NULL
;
code
=
catalogGetHandle
(
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
quit
=
true
;
break
;
}
SEpSet
epset
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
);
char
dbFName
[
TSDB_DB_FNAME_LEN
];
tNameGetFullDbName
(
&
errInfo
->
tableName
,
dbFName
);
code
=
catalogRefreshDBVgInfo
(
pCatalog
,
pTscObj
->
pAppInfo
->
pTransporter
,
&
epset
,
dbFName
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
quit
=
true
;
break
;
}
}
}
if
(
!
quit
)
{
destroyRequest
(
pRequest
);
}
}
if
(
code
)
{
pRequest
->
code
=
code
;
}
return
pRequest
;
}
TAOS_RES
*
taos_query_l
(
TAOS
*
taos
,
const
char
*
sql
,
int
sqlLen
)
{
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
if
(
sqlLen
>
(
size_t
)
TSDB_MAX_ALLOWED_SQL_LEN
)
{
tscError
(
"sql string exceeds max length:%d"
,
TSDB_MAX_ALLOWED_SQL_LEN
);
terrno
=
TSDB_CODE_TSC_EXCEED_SQL_LIMIT
;
return
NULL
;
}
return
execQuery
(
pTscObj
,
sql
,
sqlLen
);
}
int
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
)
{
pEpSet
->
version
=
0
;
...
...
@@ -395,7 +481,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
tfree
(
pMsgBody
);
}
bool
persistConnForSpecificMsg
(
void
*
parenct
,
tmsg_t
msgType
)
{
return
msgType
==
TDMT_VND_QUERY_RSP
||
msgType
==
TDMT_VND_FETCH_RSP
||
msgType
==
TDMT_VND_RES_READY_RSP
;
return
msgType
==
TDMT_VND_QUERY_RSP
||
msgType
==
TDMT_VND_FETCH_RSP
||
msgType
==
TDMT_VND_RES_READY_RSP
||
msgType
==
TDMT_VND_QUERY_HEARTBEAT_RSP
;
}
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SMsgSendInfo
*
pSendInfo
=
(
SMsgSendInfo
*
)
pMsg
->
ahandle
;
...
...
source/common/src/tmsg.c
浏览文件 @
d7406cab
...
...
@@ -2623,6 +2623,40 @@ int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *
void
tFreeSSchedulerHbRsp
(
SSchedulerHbRsp
*
pRsp
)
{
taosArrayDestroy
(
pRsp
->
taskStatus
);
}
int32_t
tSerializeSQueryTableRsp
(
void
*
buf
,
int32_t
bufLen
,
SQueryTableRsp
*
pRsp
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
code
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
tableName
.
type
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
tableName
.
acctId
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pRsp
->
tableName
.
dbname
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pRsp
->
tableName
.
tname
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tCoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSQueryTableRsp
(
void
*
buf
,
int32_t
bufLen
,
SQueryTableRsp
*
pRsp
)
{
SCoder
decoder
=
{
0
};
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_DECODER
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
code
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
tableName
.
type
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
tableName
.
acctId
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pRsp
->
tableName
.
dbname
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pRsp
->
tableName
.
tname
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSVCreateTSmaReq
(
void
**
buf
,
SVCreateTSmaReq
*
pReq
)
{
int32_t
tlen
=
0
;
...
...
source/common/src/tname.c
浏览文件 @
d7406cab
...
...
@@ -222,6 +222,27 @@ int32_t tNameSetAcctId(SName* dst, int32_t acctId) {
return
0
;
}
bool
tNameDBNameEqual
(
SName
*
left
,
SName
*
right
)
{
if
(
NULL
==
left
)
{
if
(
NULL
==
right
)
{
return
true
;
}
return
false
;
}
if
(
NULL
==
right
)
{
return
false
;
}
if
(
left
->
acctId
!=
right
->
acctId
)
{
return
false
;
}
return
(
0
==
strcmp
(
left
->
dbname
,
right
->
dbname
));
}
int32_t
tNameFromString
(
SName
*
dst
,
const
char
*
str
,
uint32_t
type
)
{
assert
(
dst
!=
NULL
&&
str
!=
NULL
&&
strlen
(
str
)
>
0
);
...
...
@@ -273,13 +294,4 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
return
0
;
}
SSchema
createSchema
(
uint8_t
type
,
int32_t
bytes
,
int32_t
colId
,
const
char
*
name
)
{
SSchema
s
=
{
0
};
s
.
type
=
type
;
s
.
bytes
=
bytes
;
s
.
colId
=
colId
;
tstrncpy
(
s
.
name
,
name
,
tListLen
(
s
.
name
));
return
s
;
}
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
d7406cab
...
...
@@ -913,12 +913,12 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
while
(
vindex
<
pDb
->
cfg
.
numOfVgroups
)
{
while
(
true
)
{
SVgObj
*
pVgroup
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
if
(
pVgroup
->
dbUid
==
pDb
->
uid
)
{
if
(
NULL
==
pDb
||
pVgroup
->
dbUid
==
pDb
->
uid
)
{
SVgroupInfo
vgInfo
=
{
0
};
vgInfo
.
vgId
=
pVgroup
->
vgId
;
vgInfo
.
hashBegin
=
pVgroup
->
hashBegin
;
...
...
@@ -943,6 +943,10 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
}
sdbRelease
(
pSdb
,
pVgroup
);
if
(
pDb
&&
(
vindex
>=
pDb
->
cfg
.
numOfVgroups
))
{
break
;
}
}
sdbCancelFetch
(
pSdb
,
pIter
);
...
...
@@ -964,6 +968,20 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
char
*
p
=
strchr
(
usedbReq
.
db
,
'.'
);
if
(
p
&&
0
==
strcmp
(
p
+
1
,
TSDB_INFORMATION_SCHEMA_DB
))
{
memcpy
(
usedbRsp
.
db
,
usedbReq
.
db
,
TSDB_DB_FNAME_LEN
);
int32_t
vgVersion
=
taosGetTimestampSec
()
/
300
;
if
(
usedbReq
.
vgVersion
<
vgVersion
)
{
usedbRsp
.
pVgroupInfos
=
taosArrayInit
(
10
,
sizeof
(
SVgroupInfo
));
if
(
usedbRsp
.
pVgroupInfos
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
USE_DB_OVER
;
}
mndBuildDBVgroupInfo
(
NULL
,
pMnode
,
usedbRsp
.
pVgroupInfos
);
usedbRsp
.
vgVersion
=
vgVersion
;
}
else
{
usedbRsp
.
vgVersion
=
usedbReq
.
vgVersion
;
}
usedbRsp
.
vgNum
=
taosArrayGetSize
(
usedbRsp
.
pVgroupInfos
);
code
=
0
;
}
else
{
pDb
=
mndAcquireDb
(
pMnode
,
usedbReq
.
db
);
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
d7406cab
...
...
@@ -193,6 +193,9 @@ void vnodeOptionsInit(SVnodeCfg *pOptions);
*/
void
vnodeOptionsClear
(
SVnodeCfg
*
pOptions
);
int
vnodeValidateTableHash
(
SVnodeCfg
*
pVnodeOptions
,
char
*
tableName
);
/* ------------------------ FOR COMPILE ------------------------ */
int32_t
vnodeAlter
(
SVnode
*
pVnode
,
const
SVnodeCfg
*
pCfg
);
...
...
source/dnode/vnode/src/vnd/vnodeCfg.c
浏览文件 @
d7406cab
...
...
@@ -32,4 +32,23 @@ int vnodeValidateOptions(const SVnodeCfg *pVnodeOptions) {
void
vnodeOptionsCopy
(
SVnodeCfg
*
pDest
,
const
SVnodeCfg
*
pSrc
)
{
memcpy
((
void
*
)
pDest
,
(
void
*
)
pSrc
,
sizeof
(
SVnodeCfg
));
}
\ No newline at end of file
}
int
vnodeValidateTableHash
(
SVnodeCfg
*
pVnodeOptions
,
char
*
tableName
)
{
uint32_t
hashValue
=
0
;
switch
(
pVnodeOptions
->
hashMethod
)
{
default:
hashValue
=
MurmurHash3_32
(
tableName
,
strlen
(
tableName
));
break
;
}
if
(
hashValue
<
pVnodeOptions
->
hashBegin
||
hashValue
>
pVnodeOptions
->
hashEnd
)
{
terrno
=
TSDB_CODE_VND_HASH_MISMATCH
;
return
TSDB_CODE_VND_HASH_MISMATCH
;
}
return
TSDB_CODE_SUCCESS
;
}
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
d7406cab
...
...
@@ -30,7 +30,7 @@ void vnodeQueryClose(SVnode *pVnode) {
int
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
vTrace
(
"message in query queue is processing"
);
SReadHandle
handle
=
{.
reader
=
pVnode
->
pTsdb
,
.
meta
=
pVnode
->
pMeta
};
SReadHandle
handle
=
{.
reader
=
pVnode
->
pTsdb
,
.
meta
=
pVnode
->
pMeta
,
.
config
=
&
pVnode
->
config
};
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_QUERY
:
{
...
...
@@ -89,6 +89,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
SRpcMsg
rpcMsg
;
int
msgLen
=
0
;
int32_t
code
=
TSDB_CODE_VND_APP_ERROR
;
char
tableFName
[
TSDB_TABLE_FNAME_LEN
];
STableInfoReq
infoReq
=
{
0
};
if
(
tDeserializeSTableInfoReq
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
infoReq
)
!=
0
)
{
...
...
@@ -96,6 +97,16 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
goto
_exit
;
}
metaRsp
.
dbId
=
pVnode
->
config
.
dbId
;
memcpy
(
metaRsp
.
dbFName
,
infoReq
.
dbFName
,
sizeof
(
metaRsp
.
dbFName
));
strcpy
(
metaRsp
.
tbName
,
infoReq
.
tbName
);
sprintf
(
tableFName
,
"%s.%s"
,
infoReq
.
dbFName
,
infoReq
.
tbName
);
code
=
vnodeValidateTableHash
(
&
pVnode
->
config
,
tableFName
);
if
(
code
)
{
goto
_exit
;
}
pTbCfg
=
metaGetTbInfoByName
(
pVnode
->
pMeta
,
infoReq
.
tbName
,
&
uid
);
if
(
pTbCfg
==
NULL
)
{
code
=
TSDB_CODE_VND_TB_NOT_EXIST
;
...
...
@@ -132,9 +143,6 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
goto
_exit
;
}
metaRsp
.
dbId
=
pVnode
->
config
.
dbId
;
memcpy
(
metaRsp
.
dbFName
,
infoReq
.
dbFName
,
sizeof
(
metaRsp
.
dbFName
));
strcpy
(
metaRsp
.
tbName
,
infoReq
.
tbName
);
if
(
pTbCfg
->
type
==
META_CHILD_TABLE
)
{
strcpy
(
metaRsp
.
stbName
,
pStbCfg
->
name
);
metaRsp
.
suid
=
pTbCfg
->
ctbCfg
.
suid
;
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
d7406cab
...
...
@@ -30,6 +30,7 @@ extern "C" {
#define CTG_DEFAULT_CACHE_TBLMETA_NUMBER 1000
#define CTG_DEFAULT_RENT_SECOND 10
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
#define CTG_DEFAULT_MAX_RETRY_TIMES 3
#define CTG_RENT_SLOT_SECOND 1.5
...
...
@@ -159,8 +160,10 @@ typedef struct SCtgRemoveTblMsg {
typedef
struct
SCtgMetaAction
{
int32_t
act
;
void
*
data
;
int32_t
act
;
void
*
data
;
bool
syncReq
;
uint64_t
seqId
;
}
SCtgMetaAction
;
typedef
struct
SCtgQNode
{
...
...
@@ -168,14 +171,21 @@ typedef struct SCtgQNode {
struct
SCtgQNode
*
next
;
}
SCtgQNode
;
typedef
struct
SCatalogMgmt
{
bool
exit
;
SRWLatch
lock
;
typedef
struct
SCtgQueue
{
SRWLatch
qlock
;
uint64_t
seqId
;
uint64_t
seqDone
;
SCtgQNode
*
head
;
SCtgQNode
*
tail
;
tsem_t
sem
;
tsem_t
reqSem
;
tsem_t
rspSem
;
uint64_t
qRemainNum
;
}
SCtgQueue
;
typedef
struct
SCatalogMgmt
{
bool
exit
;
SRWLatch
lock
;
SCtgQueue
queue
;
pthread_t
updateThread
;
SHashObj
*
pCluster
;
//key: clusterId, value: SCatalog*
SCatalogStat
stat
;
...
...
@@ -191,8 +201,8 @@ typedef struct SCtgAction {
ctgActFunc
func
;
}
SCtgAction
;
#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.qRemainNum, 1)
#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.qRemainNum, 1)
#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.q
ueue.q
RemainNum, 1)
#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.q
ueue.q
RemainNum, 1)
#define CTG_STAT_ADD(n) atomic_add_fetch_64(&(n), 1)
#define CTG_STAT_SUB(n) atomic_sub_fetch_64(&(n), 1)
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
d7406cab
此差异已折叠。
点击以展开。
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
d7406cab
...
...
@@ -713,7 +713,7 @@ void *ctgTestGetDbVgroupThread(void *param) {
int32_t
n
=
0
;
while
(
!
ctgTestStop
)
{
code
=
catalogGetDBVgInfo
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
false
,
&
vgList
);
code
=
catalogGetDBVgInfo
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
&
vgList
);
if
(
code
)
{
assert
(
0
);
}
...
...
@@ -2009,7 +2009,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
strcpy
(
n
.
dbname
,
"db1"
);
strcpy
(
n
.
tname
,
ctgTestTablename
);
code
=
catalogGetDBVgInfo
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
false
,
&
vgList
);
code
=
catalogGetDBVgInfo
(
pCtg
,
mockPointer
,
(
const
SEpSet
*
)
mockPointer
,
ctgTestDbname
,
&
vgList
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
taosArrayGetSize
((
const
SArray
*
)
vgList
),
ctgTestVgNum
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
d7406cab
...
...
@@ -731,7 +731,7 @@ int32_t getMaximumIdleDurationSec();
void
doInvokeUdf
(
struct
SUdfInfo
*
pUdfInfo
,
SqlFunctionCtx
*
pCtx
,
int32_t
idx
,
int32_t
type
);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
);
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
SQueryErrorInfo
*
errInfo
);
#ifdef __cplusplus
}
...
...
source/libs/executor/src/executor.c
浏览文件 @
d7406cab
...
...
@@ -84,7 +84,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
}
qTaskInfo_t
pTaskInfo
=
NULL
;
code
=
qCreateExecTask
(
streamReadHandle
,
0
,
0
,
plan
,
&
pTaskInfo
,
NULL
);
code
=
qCreateExecTask
(
streamReadHandle
,
0
,
0
,
plan
,
&
pTaskInfo
,
NULL
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// TODO: destroy SSubplan & pTaskInfo
terrno
=
code
;
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
d7406cab
...
...
@@ -51,11 +51,11 @@ static void freeqinfoFn(void *qhandle) {
qDestroyTask
(
*
handle
);
}
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
)
{
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
,
SQueryErrorInfo
*
errInfo
)
{
assert
(
readHandle
!=
NULL
&&
pSubplan
!=
NULL
);
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
int32_t
code
=
createExecTaskInfoImpl
(
pSubplan
,
pTask
,
readHandle
,
taskId
);
int32_t
code
=
createExecTaskInfoImpl
(
pSubplan
,
pTask
,
readHandle
,
taskId
,
errInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
d7406cab
...
...
@@ -8091,7 +8091,7 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t
static
SArray
*
extractTableIdList
(
const
STableGroupInfo
*
pTableGroupInfo
);
static
SArray
*
extractScanColumnId
(
SNodeList
*
pNodeList
);
SOperatorInfo
*
doCreateOperatorTreeNode
(
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
queryId
,
uint64_t
taskId
,
STableGroupInfo
*
pTableGroupInfo
)
{
SOperatorInfo
*
doCreateOperatorTreeNode
(
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
queryId
,
uint64_t
taskId
,
STableGroupInfo
*
pTableGroupInfo
,
SQueryErrorInfo
*
errInfo
)
{
if
(
nodeType
(
pPhyNode
)
==
QUERY_NODE_PHYSICAL_PLAN_PROJECT
)
{
// ignore the project node
pPhyNode
=
nodesListGetNode
(
pPhyNode
->
pChildren
,
0
);
}
...
...
@@ -8100,10 +8100,20 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
==
nodeType
(
pPhyNode
))
{
SScanPhysiNode
*
pScanPhyNode
=
(
SScanPhysiNode
*
)
pPhyNode
;
char
tableFName
[
TSDB_TABLE_FNAME_LEN
];
tNameExtractFullName
(
&
pScanPhyNode
->
tableName
,
tableFName
);
int32_t
code
=
vnodeValidateTableHash
(
pHandle
->
config
,
tableFName
);
if
(
code
)
{
errInfo
->
code
=
code
;
errInfo
->
tableName
=
pScanPhyNode
->
tableName
;
return
NULL
;
}
size_t
numOfCols
=
LIST_LENGTH
(
pScanPhyNode
->
pScanCols
);
tsdbReaderT
pDataReader
=
doCreateDataReader
((
STableScanPhysiNode
*
)
pPhyNode
,
pHandle
,
(
uint64_t
)
queryId
,
taskId
);
int32_t
code
=
doCreateTableGroup
(
pHandle
->
meta
,
pScanPhyNode
->
tableType
,
pScanPhyNode
->
uid
,
pTableGroupInfo
,
queryId
,
taskId
);
code
=
doCreateTableGroup
(
pHandle
->
meta
,
pScanPhyNode
->
tableType
,
pScanPhyNode
->
uid
,
pTableGroupInfo
,
queryId
,
taskId
);
return
createTableScanOperatorInfo
(
pDataReader
,
pScanPhyNode
->
order
,
numOfCols
,
pScanPhyNode
->
count
,
pScanPhyNode
->
reverse
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
==
nodeType
(
pPhyNode
))
{
...
...
@@ -8134,7 +8144,10 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SPhysiNode
*
pChildNode
=
(
SPhysiNode
*
)
nodesListGetNode
(
pPhyNode
->
pChildren
,
i
);
SOperatorInfo
*
op
=
doCreateOperatorTreeNode
(
pChildNode
,
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
pTableGroupInfo
);
SOperatorInfo
*
op
=
doCreateOperatorTreeNode
(
pChildNode
,
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
pTableGroupInfo
,
errInfo
);
if
(
errInfo
->
code
)
{
return
NULL
;
}
SArray
*
pExprInfo
=
createExprInfo
((
SAggPhysiNode
*
)
pPhyNode
);
SSDataBlock
*
pResBlock
=
createOutputBuf_rv1
(
pPhyNode
->
pOutputDataBlockDesc
);
...
...
@@ -8253,7 +8266,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
return
NULL
;
}
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
)
{
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
SQueryErrorInfo
*
errInfo
)
{
uint64_t
queryId
=
pPlan
->
id
.
queryId
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -8264,7 +8277,12 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
}
STableGroupInfo
group
=
{
0
};
(
*
pTaskInfo
)
->
pRoot
=
doCreateOperatorTreeNode
(
pPlan
->
pNode
,
*
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
&
group
);
(
*
pTaskInfo
)
->
pRoot
=
doCreateOperatorTreeNode
(
pPlan
->
pNode
,
*
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
&
group
,
errInfo
);
if
(
errInfo
->
code
)
{
code
=
errInfo
->
code
;
goto
_complete
;
}
if
((
*
pTaskInfo
)
->
pRoot
==
NULL
)
{
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_complete
;
...
...
source/libs/executor/test/executorTests.cpp
浏览文件 @
d7406cab
...
...
@@ -946,7 +946,7 @@ TEST(testCase, build_executor_tree_Test) {
int32_t
code
=
qStringToSubplan
(
msg
,
&
plan
);
ASSERT_EQ
(
code
,
0
);
code
=
qCreateExecTask
(
&
handle
,
2
,
1
,
plan
,
(
void
**
)
&
pTaskInfo
,
&
sinkHandle
);
code
=
qCreateExecTask
(
&
handle
,
2
,
1
,
plan
,
(
void
**
)
&
pTaskInfo
,
&
sinkHandle
,
NULL
);
ASSERT_EQ
(
code
,
0
);
}
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
d7406cab
...
...
@@ -1125,7 +1125,7 @@ static int32_t translateShowTables(STranslateContext* pCxt) {
tNameGetFullDbName
(
&
name
,
dbFname
);
SArray
*
array
=
NULL
;
int32_t
code
=
catalogGetDBVgInfo
(
pCxt
->
pParseCxt
->
pCatalog
,
pCxt
->
pParseCxt
->
pTransporter
,
&
pCxt
->
pParseCxt
->
mgmtEpSet
,
dbFname
,
false
,
&
array
);
int32_t
code
=
catalogGetDBVgInfo
(
pCxt
->
pParseCxt
->
pCatalog
,
pCxt
->
pParseCxt
->
pTransporter
,
&
pCxt
->
pParseCxt
->
mgmtEpSet
,
dbFname
,
&
array
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
source/libs/qcom/src/queryUtil.c
浏览文件 @
d7406cab
...
...
@@ -161,3 +161,17 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp
rpcSendRequest
(
pTransporter
,
epSet
,
&
rpcMsg
,
pTransporterId
);
return
TSDB_CODE_SUCCESS
;
}
SSchema
createSchema
(
uint8_t
type
,
int32_t
bytes
,
int32_t
colId
,
const
char
*
name
)
{
SSchema
s
=
{
0
};
s
.
type
=
type
;
s
.
bytes
=
bytes
;
s
.
colId
=
colId
;
tstrncpy
(
s
.
name
,
name
,
tListLen
(
s
.
name
));
return
s
;
}
source/libs/qcom/src/querymsg.c
浏览文件 @
d7406cab
...
...
@@ -27,6 +27,7 @@ int32_t (*queryProcessMsgRsp[TDMT_MAX])(void *output, char *msg, int32_t msgSize
int32_t
queryBuildUseDbOutput
(
SUseDbOutput
*
pOut
,
SUseDbRsp
*
usedbRsp
)
{
memcpy
(
pOut
->
db
,
usedbRsp
->
db
,
TSDB_DB_FNAME_LEN
);
pOut
->
dbId
=
usedbRsp
->
uid
;
pOut
->
dbVgroup
=
calloc
(
1
,
sizeof
(
SDBVgInfo
));
if
(
NULL
==
pOut
->
dbVgroup
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -34,6 +35,11 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
pOut
->
dbVgroup
->
vgVersion
=
usedbRsp
->
vgVersion
;
pOut
->
dbVgroup
->
hashMethod
=
usedbRsp
->
hashMethod
;
if
(
usedbRsp
->
vgNum
<=
0
)
{
return
TSDB_CODE_SUCCESS
;
}
pOut
->
dbVgroup
->
vgHash
=
taosHashInit
(
usedbRsp
->
vgNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
pOut
->
dbVgroup
->
vgHash
)
{
...
...
source/libs/qworker/inc/qworkerMsg.h
浏览文件 @
d7406cab
...
...
@@ -36,7 +36,7 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_
void
qwBuildFetchRsp
(
void
*
msg
,
SOutputData
*
input
,
int32_t
len
,
bool
qComplete
);
int32_t
qwBuildAndSendCQueryMsg
(
QW_FPARAMS_DEF
,
void
*
connection
);
int32_t
qwBuildAndSendReadyRsp
(
void
*
connection
,
int32_t
code
);
int32_t
qwBuildAndSendQueryRsp
(
void
*
connection
,
int32_t
code
);
int32_t
qwBuildAndSendQueryRsp
(
void
*
connection
,
int32_t
code
,
SQueryErrorInfo
*
errInfo
);
void
qwFreeFetchRsp
(
void
*
msg
);
int32_t
qwMallocFetchRsp
(
int32_t
length
,
SRetrieveTableRsp
**
rsp
);
int32_t
qwGetSchTasksStatus
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
SSchedulerStatusRsp
**
rsp
);
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
d7406cab
...
...
@@ -998,6 +998,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
qTaskInfo_t
pTaskInfo
=
NULL
;
DataSinkHandle
sinkHandle
=
NULL
;
SQWTaskCtx
*
ctx
=
NULL
;
SQueryErrorInfo
errInfo
=
{
0
};
QW_ERR_JRET
(
qwHandlePrePhaseEvents
(
QW_FPARAMS
(),
QW_PHASE_PRE_QUERY
,
&
input
,
&
output
));
...
...
@@ -1019,7 +1020,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
QW_ERR_JRET
(
code
);
}
code
=
qCreateExecTask
(
qwMsg
->
node
,
0
,
tId
,
(
struct
SSubplan
*
)
plan
,
&
pTaskInfo
,
&
sinkHandle
);
code
=
qCreateExecTask
(
qwMsg
->
node
,
0
,
tId
,
(
struct
SSubplan
*
)
plan
,
&
pTaskInfo
,
&
sinkHandle
,
&
errInfo
);
if
(
code
)
{
QW_TASK_ELOG
(
"qCreateExecTask failed, code:%s"
,
tstrerror
(
code
));
QW_ERR_JRET
(
code
);
...
...
@@ -1032,7 +1033,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
//TODO OPTIMIZE EMTYP RESULT QUERY RSP TO AVOID FURTHER FETCH
QW_ERR_JRET
(
qwBuildAndSendQueryRsp
(
qwMsg
->
connection
,
code
));
QW_ERR_JRET
(
qwBuildAndSendQueryRsp
(
qwMsg
->
connection
,
code
,
NULL
));
QW_TASK_DLOG
(
"query msg rsped, code:%d"
,
code
);
queryRsped
=
true
;
...
...
@@ -1051,7 +1052,7 @@ _return:
}
if
(
!
queryRsped
)
{
qwBuildAndSendQueryRsp
(
qwMsg
->
connection
,
rspCode
);
qwBuildAndSendQueryRsp
(
qwMsg
->
connection
,
rspCode
,
&
errInfo
);
QW_TASK_DLOG
(
"query msg rsped, code:%x"
,
rspCode
);
}
...
...
source/libs/qworker/src/qworkerMsg.c
浏览文件 @
d7406cab
...
...
@@ -44,17 +44,23 @@ void qwFreeFetchRsp(void *msg) {
}
}
int32_t
qwBuildAndSendQueryRsp
(
void
*
connection
,
int32_t
code
)
{
int32_t
qwBuildAndSendQueryRsp
(
void
*
connection
,
int32_t
code
,
SQueryErrorInfo
*
errInfo
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
SQueryTableRsp
*
pRsp
=
(
SQueryTableRsp
*
)
rpcMallocCont
(
sizeof
(
SQueryTableRsp
));
pRsp
->
code
=
code
;
SQueryTableRsp
rsp
=
{.
code
=
code
};
if
(
errInfo
&&
errInfo
->
code
)
{
rsp
.
tableName
=
errInfo
->
tableName
;
}
int32_t
contLen
=
tSerializeSQueryTableRsp
(
NULL
,
0
,
&
rsp
);
void
*
msg
=
rpcMallocCont
(
contLen
);
tSerializeSQueryTableRsp
(
msg
,
contLen
,
&
rsp
);
SRpcMsg
rpcRsp
=
{
.
msgType
=
TDMT_VND_QUERY_RSP
,
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
)
,
.
pCont
=
msg
,
.
contLen
=
contLen
,
.
code
=
code
,
};
...
...
source/libs/qworker/test/qworkerTests.cpp
浏览文件 @
d7406cab
...
...
@@ -262,7 +262,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
return
;
}
int32_t
qwtCreateExecTask
(
void
*
tsdb
,
int32_t
vgId
,
struct
SSubplan
*
pPlan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
)
{
int32_t
qwtCreateExecTask
(
void
*
tsdb
,
int32_t
vgId
,
struct
SSubplan
*
pPlan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
,
SQueryErrorInfo
*
errInfo
)
{
int32_t
idx
=
abs
((
++
qwtTestCaseIdx
)
%
qwtTestCaseNum
);
qwtTestSinkBlockNum
=
0
;
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
d7406cab
...
...
@@ -136,6 +136,7 @@ typedef struct SSchJob {
uint64_t
queryId
;
SSchJobAttr
attr
;
int32_t
levelNum
;
int32_t
taskNum
;
void
*
transport
;
SArray
*
nodeList
;
// qnode/vnode list, element is SQueryNodeAddr
SArray
*
levels
;
// Element is SQueryLevel, starting from 0. SArray<SSchLevel>
...
...
@@ -154,7 +155,8 @@ typedef struct SSchJob {
int32_t
remoteFetch
;
SSchTask
*
fetchTask
;
int32_t
errCode
;
void
*
res
;
//TODO free it or not
SArray
*
errList
;
// SArray<SQueryErrorInfo>
void
*
resData
;
//TODO free it or not
int32_t
resNumOfRows
;
const
char
*
sql
;
SQueryProfileSummary
summary
;
...
...
@@ -168,9 +170,9 @@ extern SSchedulerMgmt schMgmt;
#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0)
#define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1)
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_
TASK_NEED_WAIT_ALL(task) ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY
)
#define SCH_
TASK_NO_NEED_DROP(task) ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY
)
#define SCH_IS_DATA_SRC_
QRY_
TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_
IS_DATA_SRC_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)
)
#define SCH_
IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum
)
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
...
...
@@ -180,12 +182,14 @@ extern SSchedulerMgmt schMgmt;
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_
QRY_
TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
#define SCH_JOB_NEED_FETCH(_job) SCH_IS_QUERY_JOB(_job)
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
#define SCH_IS_WAIT_ALL_JOB(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
#define SCH_SWITCH_EPSET(_addr) ((_addr)->epSet.inUse = ((_addr)->epSet.inUse + 1) % (_addr)->epSet.numOfEps)
...
...
@@ -219,7 +223,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t
schLaunchTasksInFlowCtrlList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schLaunchTaskImpl
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schFetchFromRemote
(
SSchJob
*
pJob
);
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
);
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
,
SQueryErrorInfo
*
errInfo
);
#ifdef __cplusplus
...
...
source/libs/scheduler/src/schFlowCtrl.c
浏览文件 @
d7406cab
...
...
@@ -259,7 +259,7 @@ _return:
SCH_UNLOCK
(
SCH_WRITE
,
&
ctrl
->
lock
);
if
(
code
)
{
code
=
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
);
code
=
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
,
NULL
);
}
SCH_RET
(
code
);
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
d7406cab
...
...
@@ -410,6 +410,8 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_TASK_ELOG
(
"taosHashPut to planToTaks failed, taskIdx:%d"
,
n
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
++
pJob
->
taskNum
;
}
SCH_JOB_DLOG
(
"level initialized, taskNum:%d"
,
taskNum
);
...
...
@@ -467,7 +469,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
if
(
addNum
<=
0
)
{
SCH_TASK_ELOG
(
"no available execNode as candidates, nodeNum:%d"
,
nodeNum
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
)
;
}
/*
...
...
@@ -588,7 +590,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return
TSDB_CODE_SUCCESS
;
//TODO CHECK epList/condidateList
if
(
SCH_IS_DATA_SRC_TASK
(
pTask
))
{
if
(
SCH_IS_DATA_SRC_
QRY_
TASK
(
pTask
))
{
}
else
{
int32_t
candidateNum
=
taosArrayGetSize
(
pTask
->
candidateAddrs
);
...
...
@@ -611,7 +613,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET
(
schLaunchTasksInFlowCtrlList
(
pJob
,
pTask
));
}
if
(
SCH_IS_DATA_SRC_TASK
(
pTask
))
{
if
(
SCH_IS_DATA_SRC_
QRY_
TASK
(
pTask
))
{
SCH_SWITCH_EPSET
(
&
pTask
->
plan
->
execNode
);
}
else
{
++
pTask
->
candidateIdx
;
...
...
@@ -727,8 +729,32 @@ int32_t schProcessOnDataFetched(SSchJob *job) {
tsem_post
(
&
job
->
rspSem
);
}
int32_t
schPushToErrInfoList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryErrorInfo
*
errInfo
)
{
if
(
NULL
==
errInfo
||
!
SCH_IS_DATA_SRC_TASK
(
pTask
)
||
!
IS_CLIENT_RETRY_ERROR
(
errInfo
->
code
))
{
return
TSDB_CODE_SUCCESS
;
}
if
(
NULL
==
pJob
->
errList
)
{
SSchLevel
*
level
=
taosArrayGetLast
(
pJob
->
levels
);
pJob
->
errList
=
taosArrayInit
(
level
->
taskNum
,
sizeof
(
SQueryErrorInfo
));
if
(
NULL
==
pJob
->
errList
)
{
SCH_TASK_ELOG
(
"taosArrayInit %d errInfofailed"
,
pJob
->
taskNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
if
(
NULL
==
taosArrayPush
(
pJob
->
errList
,
errInfo
))
{
SCH_TASK_ELOG
(
"taosArrayPush errInfo to list failed, errCode:%x"
,
errInfo
->
code
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
return
TSDB_CODE_SUCCESS
;
}
// Note: no more task error processing, handled in function internal
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
)
{
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
,
SQueryErrorInfo
*
errInfo
)
{
int8_t
status
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
...
...
@@ -752,13 +778,15 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
if
(
SCH_GET_TASK_STATUS
(
pTask
)
==
JOB_TASK_STATUS_EXECUTING
)
{
SCH_ERR_JRET
(
schMoveTaskToFailList
(
pJob
,
pTask
,
&
moved
));
}
else
{
SCH_TASK_
DLOG
(
"task already done, no more failure process
, status:%d"
,
SCH_GET_TASK_STATUS
(
pTask
));
return
TSDB_CODE_SUCCESS
;
SCH_TASK_
ELOG
(
"task not in executing list
, status:%d"
,
SCH_GET_TASK_STATUS
(
pTask
));
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
)
;
}
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_FAILED
);
SCH_ERR_JRET
(
schPushToErrInfoList
(
pJob
,
pTask
,
errInfo
));
if
(
SCH_
TASK_NEED_WAIT_ALL
(
pTask
))
{
if
(
SCH_
IS_WAIT_ALL_JOB
(
pJob
))
{
SCH_LOCK
(
SCH_WRITE
,
&
pTask
->
level
->
lock
);
pTask
->
level
->
taskFailed
++
;
taskDone
=
pTask
->
level
->
taskSucceed
+
pTask
->
level
->
taskFailed
;
...
...
@@ -801,7 +829,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
if
(
parentNum
==
0
)
{
int32_t
taskDone
=
0
;
if
(
SCH_
TASK_NEED_WAIT_ALL
(
pTask
))
{
if
(
SCH_
IS_WAIT_ALL_JOB
(
pJob
))
{
SCH_LOCK
(
SCH_WRITE
,
&
pTask
->
level
->
lock
);
pTask
->
level
->
taskSucceed
++
;
taskDone
=
pTask
->
level
->
taskSucceed
+
pTask
->
level
->
taskFailed
;
...
...
@@ -870,11 +898,11 @@ int32_t schFetchFromRemote(SSchJob *pJob) {
return
TSDB_CODE_SUCCESS
;
}
void
*
res
=
atomic_load_ptr
(
&
pJob
->
res
);
if
(
res
)
{
void
*
res
Data
=
atomic_load_ptr
(
&
pJob
->
resData
);
if
(
res
Data
)
{
atomic_val_compare_exchange_32
(
&
pJob
->
remoteFetch
,
1
,
0
);
SCH_JOB_DLOG
(
"res already fetched, res:%p"
,
res
);
SCH_JOB_DLOG
(
"res already fetched, res:%p"
,
res
Data
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -886,7 +914,7 @@ _return:
atomic_val_compare_exchange_32
(
&
pJob
->
remoteFetch
,
1
,
0
);
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pJob
->
fetchTask
,
code
));
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pJob
->
fetchTask
,
code
,
NULL
));
}
...
...
@@ -894,6 +922,8 @@ _return:
int32_t
schHandleResponseMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
int8_t
status
=
0
;
bool
errInfoGot
=
false
;
SQueryErrorInfo
errInfo
=
{
0
};
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_TASK_ELOG
(
"rsp not processed cause of job status, job status:%d"
,
status
);
...
...
@@ -933,13 +963,23 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
break
;
}
case
TDMT_VND_QUERY_RSP
:
{
SQueryTableRsp
*
rsp
=
(
SQueryTableRsp
*
)
msg
;
SQueryTableRsp
rsp
=
{
0
};
if
(
msg
)
{
tDeserializeSQueryTableRsp
(
msg
,
msgSize
,
&
rsp
);
if
(
rsp
.
code
)
{
errInfo
.
code
=
rsp
.
code
;
errInfo
.
tableName
=
rsp
.
tableName
;
errInfoGot
=
true
;
}
SCH_ERR_JRET
(
rsp
.
code
);
}
SCH_ERR_JRET
(
rspCode
);
if
(
NULL
==
msg
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SCH_ERR_JRET
(
rsp
->
code
);
SCH_ERR_JRET
(
schBuildAndSendMsg
(
pJob
,
pTask
,
NULL
,
TDMT_VND_RES_READY
));
...
...
@@ -966,13 +1006,13 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
if
(
pJob
->
res
)
{
SCH_TASK_ELOG
(
"got fetch rsp while res already exists, res:%p"
,
pJob
->
res
);
if
(
pJob
->
res
Data
)
{
SCH_TASK_ELOG
(
"got fetch rsp while res already exists, res:%p"
,
pJob
->
res
Data
);
tfree
(
rsp
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
atomic_store_ptr
(
&
pJob
->
res
,
rsp
);
atomic_store_ptr
(
&
pJob
->
res
Data
,
rsp
);
atomic_add_fetch_32
(
&
pJob
->
resNumOfRows
,
htonl
(
rsp
->
numOfRows
));
if
(
rsp
->
completed
)
{
...
...
@@ -999,7 +1039,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
_return:
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
));
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
,
errInfoGot
?
&
errInfo
:
NULL
));
}
...
...
@@ -1374,6 +1414,12 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
SCH_RET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
// NOTE: race condition: the task should be put into the hash table before send msg to server
if
(
SCH_GET_TASK_STATUS
(
pTask
)
!=
JOB_TASK_STATUS_EXECUTING
)
{
SCH_ERR_RET
(
schPushTaskToExecList
(
pJob
,
pTask
));
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_EXECUTING
);
}
SSubplan
*
plan
=
pTask
->
plan
;
...
...
@@ -1389,12 +1435,6 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET
(
schSetTaskCandidateAddrs
(
pJob
,
pTask
));
// NOTE: race condition: the task should be put into the hash table before send msg to server
if
(
SCH_GET_TASK_STATUS
(
pTask
)
!=
JOB_TASK_STATUS_EXECUTING
)
{
SCH_ERR_RET
(
schPushTaskToExecList
(
pJob
,
pTask
));
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_EXECUTING
);
}
if
(
SCH_IS_QUERY_JOB
(
pJob
))
{
SCH_ERR_RET
(
schEnsureHbConnection
(
pJob
,
pTask
));
}
...
...
@@ -1423,7 +1463,7 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
_return:
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
));
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
,
NULL
));
}
int32_t
schLaunchLevelTasks
(
SSchJob
*
pJob
,
SSchLevel
*
level
)
{
...
...
@@ -1474,13 +1514,15 @@ void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
}
void
schDropTaskInHashList
(
SSchJob
*
pJob
,
SHashObj
*
list
)
{
if
(
!
SCH_IS_NEED_DROP_JOB
(
pJob
))
{
return
;
}
void
*
pIter
=
taosHashIterate
(
list
,
NULL
);
while
(
pIter
)
{
SSchTask
*
pTask
=
*
(
SSchTask
**
)
pIter
;
if
(
!
SCH_TASK_NO_NEED_DROP
(
pTask
))
{
schDropTaskOnExecutedNode
(
pJob
,
pTask
);
}
schDropTaskOnExecutedNode
(
pJob
,
pTask
);
pIter
=
taosHashIterate
(
list
,
pIter
);
}
...
...
@@ -1537,8 +1579,9 @@ void schFreeJobImpl(void *job) {
taosArrayDestroy
(
pJob
->
levels
);
taosArrayDestroy
(
pJob
->
nodeList
);
tfree
(
pJob
->
res
);
taosArrayDestroy
(
pJob
->
errList
);
tfree
(
pJob
->
resData
);
tfree
(
pJob
);
...
...
@@ -1673,8 +1716,12 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan* pDag, in
SCH_ERR_RET
(
schExecJobImpl
(
transport
,
nodeList
,
pDag
,
pJob
,
sql
,
true
));
SSchJob
*
job
=
schAcquireJob
(
*
pJob
);
pRes
->
code
=
atomic_load_32
(
&
job
->
errCode
);
pRes
->
numOfRows
=
job
->
resNumOfRows
;
pRes
->
errList
=
job
->
errList
;
job
->
errList
=
NULL
;
schReleaseJob
(
*
pJob
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1862,14 +1909,14 @@ int32_t schedulerFetchRows(int64_t job, void** pData) {
SCH_ERR_JRET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
if
(
pJob
->
res
&&
((
SRetrieveTableRsp
*
)
pJob
->
res
)
->
completed
)
{
if
(
pJob
->
res
Data
&&
((
SRetrieveTableRsp
*
)
pJob
->
resData
)
->
completed
)
{
SCH_ERR_JRET
(
schCheckAndUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_SUCCEED
));
}
while
(
true
)
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
res
);
if
(
*
pData
!=
atomic_val_compare_exchange_ptr
(
&
pJob
->
res
,
*
pData
,
NULL
))
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
res
Data
);
if
(
*
pData
!=
atomic_val_compare_exchange_ptr
(
&
pJob
->
res
Data
,
*
pData
,
NULL
))
{
continue
;
}
...
...
source/util/src/terror.c
浏览文件 @
d7406cab
...
...
@@ -324,6 +324,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operat
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_IS_SYNCING
,
"Database is syncing"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_INVALID_TSDB_STATE
,
"Invalid tsdb state"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_TB_NOT_EXIST
,
"Table not exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_HASH_MISMATCH
,
"Hash value mismatch"
)
// tsdb
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_INVALID_TABLE_ID
,
"Invalid table ID"
)
...
...
@@ -414,7 +415,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limi
TAOS_DEFINE_ERROR
(
TSDB_CODE_WAL_INVALID_VER
,
"WAL use invalid version"
)
// tfs
TAOS_DEFINE_ERROR
(
TSDB_CODE_FS_APP_ERROR
,
"tfs out of memory"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_FS_APP_ERROR
,
"tfs out of memory"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_FS_INVLD_CFG
,
"tfs invalid mount config"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_FS_TOO_MANY_MOUNT
,
"tfs too many mount"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_FS_DUP_PRIMARY
,
"tfs duplicate primary mount"
)
...
...
@@ -432,6 +433,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error"
TAOS_DEFINE_ERROR
(
TSDB_CODE_CTG_SYS_ERROR
,
"catalog system error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_CTG_DB_DROPPED
,
"Database is dropped"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_CTG_OUT_OF_SERVICE
,
"catalog is out of service"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_CTG_VG_META_MISMATCH
,
"table meta and vgroup mismatch"
)
//scheduler
TAOS_DEFINE_ERROR
(
TSDB_CODE_SCH_STATUS_ERROR
,
"scheduler status error"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录