Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
763eccf8
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
763eccf8
编写于
3月 10, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/scheduler
上级
54de6fd9
变更
31
隐藏空白更改
内联
并排
Showing
31 changed file
with
562 addition
and
227 deletion
+562
-227
include/common/tmsg.h
include/common/tmsg.h
+7
-0
include/common/tname.h
include/common/tname.h
+2
-2
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+18
-8
include/libs/executor/executor.h
include/libs/executor/executor.h
+3
-1
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+2
-2
include/libs/qcom/query.h
include/libs/qcom/query.h
+12
-0
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+5
-4
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+1
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+92
-10
source/common/src/tmsg.c
source/common/src/tmsg.c
+1
-0
source/common/src/tname.c
source/common/src/tname.c
+21
-9
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+3
-0
source/dnode/vnode/src/vnd/vnodeCfg.c
source/dnode/vnode/src/vnd/vnodeCfg.c
+20
-1
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+1
-1
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+17
-8
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+203
-124
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+1
-1
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+2
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+33
-10
source/libs/executor/test/executorTests.cpp
source/libs/executor/test/executorTests.cpp
+1
-1
source/libs/qcom/src/queryUtil.c
source/libs/qcom/src/queryUtil.c
+14
-0
source/libs/qworker/inc/qworkerMsg.h
source/libs/qworker/inc/qworkerMsg.h
+1
-1
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+4
-3
source/libs/qworker/src/qworkerMsg.c
source/libs/qworker/src/qworkerMsg.c
+11
-5
source/libs/qworker/test/qworkerTests.cpp
source/libs/qworker/test/qworkerTests.cpp
+1
-1
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+11
-7
source/libs/scheduler/src/schFlowCtrl.c
source/libs/scheduler/src/schFlowCtrl.c
+1
-1
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+71
-24
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
763eccf8
...
...
@@ -24,6 +24,7 @@
#include "thash.h"
#include "tlist.h"
#include "trow.h"
#include "tname.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -459,8 +460,14 @@ typedef struct {
typedef
struct
{
int32_t
code
;
SName
tableName
;
}
SQueryTableRsp
;
int32_t
tSerializeSQueryTableRsp
(
void
*
buf
,
int32_t
bufLen
,
SQueryTableRsp
*
pRsp
);
int32_t
tDeserializeSQueryTableRsp
(
void
*
buf
,
int32_t
bufLen
,
SQueryTableRsp
*
pRsp
);
typedef
struct
{
char
db
[
TSDB_DB_FNAME_LEN
];
int32_t
numOfVgroups
;
...
...
include/common/tname.h
浏览文件 @
763eccf8
...
...
@@ -17,7 +17,6 @@
#define _TD_COMMON_NAME_H_
#include "tdef.h"
#include "tmsg.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -61,7 +60,8 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type);
int32_t
tNameSetAcctId
(
SName
*
dst
,
int32_t
acctId
);
SSchema
createSchema
(
uint8_t
type
,
int32_t
bytes
,
int32_t
colId
,
const
char
*
name
);
bool
tNameDBNameEqual
(
SName
*
left
,
SName
*
right
);
#ifdef __cplusplus
}
...
...
include/libs/catalog/catalog.h
浏览文件 @
763eccf8
...
...
@@ -120,7 +120,7 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId,
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name
, NOT including db name
)
* @param pTableName (input, table name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
...
...
@@ -131,7 +131,7 @@ int32_t catalogGetTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSe
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name
, NOT including db name
)
* @param pTableName (input, table name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
...
...
@@ -140,28 +140,38 @@ int32_t catalogGetSTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpS
int32_t
catalogUpdateSTableMeta
(
SCatalog
*
pCatalog
,
STableMetaRsp
*
rspMsg
);
/**
* Force refresh DB's local cached vgroup info.
* @param pCtg (input, got with catalogGetHandle)
* @param pTrans (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param dbFName (input, db full name)
* @return error code
*/
int32_t
catalogRefreshDBVgInfo
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbFName
);
/**
* Force refresh a table's local cached meta data.
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name
, NOT including db name
)
* @param pTableName (input, table name)
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code
*/
int32_t
catalogRefreshTableMeta
(
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
int32_t
isSTable
);
int32_t
catalogRefreshTableMeta
(
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
int32_t
isSTable
);
/**
* Force refresh a table's local cached meta data and get the new one.
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name
, NOT including db name
)
* @param pTableName (input, table name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code
*/
int32_t
catalogRefreshGetTableMeta
(
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
isSTable
);
int32_t
catalogRefreshGetTableMeta
(
SCatalog
*
pCatalog
,
void
*
pTransporter
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
,
int32_t
isSTable
);
...
...
@@ -170,7 +180,7 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg);
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name
, NOT including db name
)
* @param pTableName (input, table name)
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
...
...
@@ -181,7 +191,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, void *pTransporter, const
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name
, NOT including db name
)
* @param pTableName (input, table name)
* @param vgInfo (output, vgroup info)
* @return error code
*/
...
...
include/libs/executor/executor.h
浏览文件 @
763eccf8
...
...
@@ -21,6 +21,7 @@ extern "C" {
#endif
#include "tcommon.h"
#include "query.h"
typedef
void
*
qTaskInfo_t
;
typedef
void
*
DataSinkHandle
;
...
...
@@ -30,6 +31,7 @@ struct SSubplan;
typedef
struct
SReadHandle
{
void
*
reader
;
void
*
meta
;
void
*
config
;
}
SReadHandle
;
/**
...
...
@@ -67,7 +69,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA
* @param qId
* @return
*/
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
struct
SSubplan
*
pPlan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
);
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
struct
SSubplan
*
pPlan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
,
SQueryErrorInfo
*
errInfo
);
/**
* The main task execution function, including query on both table and multiple tables,
...
...
include/libs/nodes/plannodes.h
浏览文件 @
763eccf8
...
...
@@ -90,7 +90,7 @@ typedef struct SSubLogicPlan {
}
SSubLogicPlan
;
typedef
struct
SQueryLogicPlan
{
ENodeType
type
;
;
ENodeType
type
;
SNodeList
*
pSubplans
;
}
SQueryLogicPlan
;
...
...
@@ -127,7 +127,7 @@ typedef struct SScanPhysiNode {
int32_t
order
;
// scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC
int32_t
count
;
// repeat count
int32_t
reverse
;
// reverse scan count
char
tableName
[
TSDB_TABLE_NAME_LEN
]
;
SName
tableName
;
}
SScanPhysiNode
;
typedef
SScanPhysiNode
SSystemTableScanPhysiNode
;
...
...
include/libs/qcom/query.h
浏览文件 @
763eccf8
...
...
@@ -138,6 +138,11 @@ typedef struct SQueryNodeStat {
int32_t
tableNum
;
// vg table number, unit is TSDB_TABLE_NUM_UNIT
}
SQueryNodeStat
;
typedef
struct
SQueryErrorInfo
{
int32_t
code
;
SName
tableName
;
}
SQueryErrorInfo
;
int32_t
initTaskQueue
();
int32_t
cleanupTaskQueue
();
...
...
@@ -170,6 +175,9 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag
int32_t
queryCreateTableMetaFromMsg
(
STableMetaRsp
*
msg
,
bool
isSuperTable
,
STableMeta
**
pMeta
);
SSchema
createSchema
(
uint8_t
type
,
int32_t
bytes
,
int32_t
colId
,
const
char
*
name
);
extern
int32_t
(
*
queryBuildMsg
[
TDMT_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
);
extern
int32_t
(
*
queryProcessMsgRsp
[
TDMT_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
);
...
...
@@ -178,6 +186,10 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
#define IS_CLIENT_RETRY_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH)
#define IS_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT)
#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); }} while(0)
#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); }} while(0)
#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); }} while(0)
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
763eccf8
...
...
@@ -52,10 +52,11 @@ typedef struct SQueryProfileSummary {
}
SQueryProfileSummary
;
typedef
struct
SQueryResult
{
int32_t
code
;
uint64_t
numOfRows
;
int32_t
msgSize
;
char
*
msg
;
int32_t
code
;
SArray
*
errList
;
// SArray<SQueryErrorInfo>
uint64_t
numOfRows
;
int32_t
msgSize
;
char
*
msg
;
}
SQueryResult
;
typedef
struct
STaskInfo
{
...
...
include/util/taoserror.h
浏览文件 @
763eccf8
...
...
@@ -326,6 +326,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513)
#define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514)
#define TSDB_CODE_VND_TB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0515)
#define TSDB_CODE_VND_HASH_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x0516)
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
...
...
source/client/inc/clientInt.h
浏览文件 @
763eccf8
...
...
@@ -184,6 +184,7 @@ typedef struct SRequestObj {
char
*
msgBuf
;
void
*
pInfo
;
// sql parse info, generated by parser module
int32_t
code
;
SArray
*
errList
;
// SArray<SQueryErrorInfo>
SQueryExecMetric
metric
;
SRequestSendRecvBody
body
;
}
SRequestObj
;
...
...
source/client/src/clientImpl.c
浏览文件 @
763eccf8
...
...
@@ -223,6 +223,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
}
pRequest
->
errList
=
res
.
errList
;
pRequest
->
code
=
code
;
return
pRequest
->
code
;
}
...
...
@@ -234,19 +235,13 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
}
}
pRequest
->
errList
=
res
.
errList
;
pRequest
->
code
=
res
.
code
;
return
pRequest
->
code
;
}
TAOS_RES
*
taos_query_l
(
TAOS
*
taos
,
const
char
*
sql
,
int
sqlLen
)
{
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
if
(
sqlLen
>
(
size_t
)
TSDB_MAX_ALLOWED_SQL_LEN
)
{
tscError
(
"sql string exceeds max length:%d"
,
TSDB_MAX_ALLOWED_SQL_LEN
);
terrno
=
TSDB_CODE_TSC_EXCEED_SQL_LIMIT
;
return
NULL
;
}
SRequestObj
*
execQueryImpl
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
)
{
SRequestObj
*
pRequest
=
NULL
;
SQuery
*
pQuery
;
SArray
*
pNodeList
=
taosArrayInit
(
4
,
sizeof
(
struct
SQueryNodeAddr
));
...
...
@@ -273,6 +268,93 @@ _return:
return
pRequest
;
}
int32_t
clientProcessErrorList
(
SArray
**
pList
)
{
SArray
*
errList
=
*
pList
;
int32_t
errNum
=
(
int32_t
)
taosArrayGetSize
(
errList
);
for
(
int32_t
i
=
0
;
i
<
errNum
;
++
i
)
{
SQueryErrorInfo
*
errInfo
=
taosArrayGet
(
errList
,
i
);
if
(
TSDB_CODE_VND_HASH_MISMATCH
==
errInfo
->
code
)
{
if
(
i
==
(
errNum
-
1
))
{
break
;
}
// TODO REMOVE SAME DB ERROR
}
else
{
taosArrayRemove
(
errList
,
i
);
--
i
;
--
errNum
;
}
}
if
(
0
==
errNum
)
{
taosArrayDestroy
(
*
pList
);
*
pList
=
NULL
;
}
return
TSDB_CODE_SUCCESS
;
}
SRequestObj
*
execQuery
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
)
{
SRequestObj
*
pRequest
=
NULL
;
int32_t
code
=
0
;
bool
quit
=
false
;
while
(
!
quit
)
{
pRequest
=
execQueryImpl
(
pTscObj
,
sql
,
sqlLen
);
if
(
TSDB_CODE_SUCCESS
==
pRequest
->
code
||
NULL
==
pRequest
->
errList
)
{
break
;
}
code
=
clientProcessErrorList
(
&
pRequest
->
errList
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
NULL
==
pRequest
->
errList
)
{
break
;
}
int32_t
errNum
=
(
int32_t
)
taosArrayGetSize
(
pRequest
->
errList
);
for
(
int32_t
i
=
0
;
i
<
errNum
;
++
i
)
{
SQueryErrorInfo
*
errInfo
=
taosArrayGet
(
pRequest
->
errList
,
i
);
if
(
TSDB_CODE_VND_HASH_MISMATCH
==
errInfo
->
code
)
{
SCatalog
*
pCatalog
=
NULL
;
code
=
catalogGetHandle
(
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
quit
=
true
;
break
;
}
SEpSet
epset
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
);
char
dbFName
[
TSDB_DB_FNAME_LEN
];
tNameGetFullDbName
(
&
errInfo
->
tableName
,
dbFName
);
code
=
catalogRefreshDBVgInfo
(
pCatalog
,
pTscObj
->
pAppInfo
->
pTransporter
,
&
epset
,
dbFName
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
quit
=
true
;
break
;
}
}
}
}
if
(
code
)
{
pRequest
->
code
=
code
;
}
return
pRequest
;
}
TAOS_RES
*
taos_query_l
(
TAOS
*
taos
,
const
char
*
sql
,
int
sqlLen
)
{
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
if
(
sqlLen
>
(
size_t
)
TSDB_MAX_ALLOWED_SQL_LEN
)
{
tscError
(
"sql string exceeds max length:%d"
,
TSDB_MAX_ALLOWED_SQL_LEN
);
terrno
=
TSDB_CODE_TSC_EXCEED_SQL_LIMIT
;
return
NULL
;
}
return
execQuery
(
pTscObj
,
sql
,
sqlLen
);
}
int
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
)
{
pEpSet
->
version
=
0
;
...
...
@@ -389,7 +471,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
tfree
(
pMsgBody
);
}
bool
persistConnForSpecificMsg
(
void
*
parenct
,
tmsg_t
msgType
)
{
return
msgType
==
TDMT_VND_QUERY_RSP
||
msgType
==
TDMT_VND_FETCH_RSP
||
msgType
==
TDMT_VND_RES_READY_RSP
;
return
msgType
==
TDMT_VND_QUERY_RSP
||
msgType
==
TDMT_VND_FETCH_RSP
||
msgType
==
TDMT_VND_RES_READY_RSP
||
msgType
==
TDMT_VND_QUERY_HEARTBEAT_RSP
;
}
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SMsgSendInfo
*
pSendInfo
=
(
SMsgSendInfo
*
)
pMsg
->
ahandle
;
...
...
source/common/src/tmsg.c
浏览文件 @
763eccf8
无法预览此类型文件
source/common/src/tname.c
浏览文件 @
763eccf8
...
...
@@ -222,6 +222,27 @@ int32_t tNameSetAcctId(SName* dst, int32_t acctId) {
return
0
;
}
bool
tNameDBNameEqual
(
SName
*
left
,
SName
*
right
)
{
if
(
NULL
==
left
)
{
if
(
NULL
==
right
)
{
return
true
;
}
return
false
;
}
if
(
NULL
==
right
)
{
return
false
;
}
if
(
left
->
acctId
!=
right
->
acctId
)
{
return
false
;
}
return
(
0
==
strcmp
(
left
->
dbname
,
right
->
dbname
));
}
int32_t
tNameFromString
(
SName
*
dst
,
const
char
*
str
,
uint32_t
type
)
{
assert
(
dst
!=
NULL
&&
str
!=
NULL
&&
strlen
(
str
)
>
0
);
...
...
@@ -273,13 +294,4 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
return
0
;
}
SSchema
createSchema
(
uint8_t
type
,
int32_t
bytes
,
int32_t
colId
,
const
char
*
name
)
{
SSchema
s
=
{
0
};
s
.
type
=
type
;
s
.
bytes
=
bytes
;
s
.
colId
=
colId
;
tstrncpy
(
s
.
name
,
name
,
tListLen
(
s
.
name
));
return
s
;
}
source/dnode/vnode/inc/vnode.h
浏览文件 @
763eccf8
...
...
@@ -193,6 +193,9 @@ void vnodeOptionsInit(SVnodeCfg *pOptions);
*/
void
vnodeOptionsClear
(
SVnodeCfg
*
pOptions
);
int
vnodeValidateTableHash
(
SVnodeCfg
*
pVnodeOptions
,
char
*
tableName
);
/* ------------------------ FOR COMPILE ------------------------ */
int32_t
vnodeAlter
(
SVnode
*
pVnode
,
const
SVnodeCfg
*
pCfg
);
...
...
source/dnode/vnode/src/vnd/vnodeCfg.c
浏览文件 @
763eccf8
...
...
@@ -32,4 +32,23 @@ int vnodeValidateOptions(const SVnodeCfg *pVnodeOptions) {
void
vnodeOptionsCopy
(
SVnodeCfg
*
pDest
,
const
SVnodeCfg
*
pSrc
)
{
memcpy
((
void
*
)
pDest
,
(
void
*
)
pSrc
,
sizeof
(
SVnodeCfg
));
}
\ No newline at end of file
}
int
vnodeValidateTableHash
(
SVnodeCfg
*
pVnodeOptions
,
char
*
tableName
)
{
uint32_t
hashValue
=
0
;
switch
(
pVnodeOptions
->
hashMethod
)
{
default:
hashValue
=
MurmurHash3_32
(
tableName
,
strlen
(
tableName
));
break
;
}
if
(
hashValue
<
pVnodeOptions
->
hashBegin
||
hashValue
>
pVnodeOptions
->
hashEnd
)
{
terrno
=
TSDB_CODE_VND_HASH_MISMATCH
;
return
TSDB_CODE_VND_HASH_MISMATCH
;
}
return
TSDB_CODE_SUCCESS
;
}
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
763eccf8
...
...
@@ -30,7 +30,7 @@ void vnodeQueryClose(SVnode *pVnode) {
int
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
vTrace
(
"message in query queue is processing"
);
SReadHandle
handle
=
{.
reader
=
pVnode
->
pTsdb
,
.
meta
=
pVnode
->
pMeta
};
SReadHandle
handle
=
{.
reader
=
pVnode
->
pTsdb
,
.
meta
=
pVnode
->
pMeta
,
.
config
=
&
pVnode
->
config
};
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_QUERY
:{
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
763eccf8
...
...
@@ -159,8 +159,10 @@ typedef struct SCtgRemoveTblMsg {
typedef
struct
SCtgMetaAction
{
int32_t
act
;
void
*
data
;
int32_t
act
;
void
*
data
;
bool
syncReq
;
uint64_t
seqId
;
}
SCtgMetaAction
;
typedef
struct
SCtgQNode
{
...
...
@@ -168,14 +170,21 @@ typedef struct SCtgQNode {
struct
SCtgQNode
*
next
;
}
SCtgQNode
;
typedef
struct
SCatalogMgmt
{
bool
exit
;
SRWLatch
lock
;
typedef
struct
SCtgQueue
{
SRWLatch
qlock
;
uint64_t
seqId
;
uint64_t
seqDone
;
SCtgQNode
*
head
;
SCtgQNode
*
tail
;
tsem_t
sem
;
tsem_t
reqSem
;
tsem_t
rspSem
;
uint64_t
qRemainNum
;
}
SCtgQueue
;
typedef
struct
SCatalogMgmt
{
bool
exit
;
SRWLatch
lock
;
SCtgQueue
queue
;
pthread_t
updateThread
;
SHashObj
*
pCluster
;
//key: clusterId, value: SCatalog*
SCatalogStat
stat
;
...
...
@@ -191,8 +200,8 @@ typedef struct SCtgAction {
ctgActFunc
func
;
}
SCtgAction
;
#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.qRemainNum, 1)
#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.qRemainNum, 1)
#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.q
ueue.q
RemainNum, 1)
#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.q
ueue.q
RemainNum, 1)
#define CTG_STAT_ADD(n) atomic_add_fetch_64(&(n), 1)
#define CTG_STAT_SUB(n) atomic_sub_fetch_64(&(n), 1)
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
763eccf8
...
...
@@ -229,12 +229,112 @@ void ctgDbgShowClusterCache(SCatalog* pCtg) {
}
void
ctgFreeMetaRent
(
SCtgRentMgmt
*
mgmt
)
{
if
(
NULL
==
mgmt
->
slots
)
{
return
;
}
for
(
int32_t
i
=
0
;
i
<
mgmt
->
slotNum
;
++
i
)
{
SCtgRentSlot
*
slot
=
&
mgmt
->
slots
[
i
];
if
(
slot
->
meta
)
{
taosArrayDestroy
(
slot
->
meta
);
slot
->
meta
=
NULL
;
}
}
tfree
(
mgmt
->
slots
);
}
void
ctgFreeTableMetaCache
(
SCtgTbMetaCache
*
cache
)
{
CTG_LOCK
(
CTG_WRITE
,
&
cache
->
stbLock
);
if
(
cache
->
stbCache
)
{
taosHashCleanup
(
cache
->
stbCache
);
cache
->
stbCache
=
NULL
;
}
CTG_UNLOCK
(
CTG_WRITE
,
&
cache
->
stbLock
);
CTG_LOCK
(
CTG_WRITE
,
&
cache
->
metaLock
);
if
(
cache
->
metaCache
)
{
taosHashCleanup
(
cache
->
metaCache
);
cache
->
metaCache
=
NULL
;
}
CTG_UNLOCK
(
CTG_WRITE
,
&
cache
->
metaLock
);
}
void
ctgFreeVgInfo
(
SDBVgInfo
*
vgInfo
)
{
if
(
NULL
==
vgInfo
)
{
return
;
}
if
(
vgInfo
->
vgHash
)
{
taosHashCleanup
(
vgInfo
->
vgHash
);
vgInfo
->
vgHash
=
NULL
;
}
tfree
(
vgInfo
);
}
void
ctgFreeDbCache
(
SCtgDBCache
*
dbCache
)
{
if
(
NULL
==
dbCache
)
{
return
;
}
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
ctgFreeVgInfo
(
dbCache
->
vgInfo
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
ctgFreeTableMetaCache
(
&
dbCache
->
tbCache
);
}
void
ctgFreeHandle
(
SCatalog
*
pCtg
)
{
ctgFreeMetaRent
(
&
pCtg
->
dbRent
);
ctgFreeMetaRent
(
&
pCtg
->
stbRent
);
if
(
pCtg
->
dbCache
)
{
void
*
pIter
=
taosHashIterate
(
pCtg
->
dbCache
,
NULL
);
while
(
pIter
)
{
SCtgDBCache
*
dbCache
=
pIter
;
atomic_store_8
(
&
dbCache
->
deleted
,
1
);
ctgFreeDbCache
(
dbCache
);
pIter
=
taosHashIterate
(
pCtg
->
dbCache
,
pIter
);
}
taosHashCleanup
(
pCtg
->
dbCache
);
}
free
(
pCtg
);
}
void
ctgWaitAction
(
SCtgMetaAction
*
action
)
{
while
(
true
)
{
tsem_wait
(
&
gCtgMgmt
.
queue
.
rspSem
);
if
(
atomic_load_8
(
&
gCtgMgmt
.
exit
))
{
tsem_post
(
&
gCtgMgmt
.
queue
.
rspSem
);
break
;
}
if
(
gCtgMgmt
.
queue
.
seqDone
>=
action
->
seqId
)
{
break
;
}
tsem_post
(
&
gCtgMgmt
.
queue
.
rspSem
);
sched_yield
();
}
}
void
ctgPopAction
(
SCtgMetaAction
**
action
)
{
SCtgQNode
*
orig
=
gCtgMgmt
.
head
;
SCtgQNode
*
orig
=
gCtgMgmt
.
queue
.
head
;
SCtgQNode
*
node
=
gCtgMgmt
.
head
->
next
;
gCtgMgmt
.
head
=
gCtgMgmt
.
head
->
next
;
SCtgQNode
*
node
=
gCtgMgmt
.
queue
.
head
->
next
;
gCtgMgmt
.
queue
.
head
=
gCtgMgmt
.
queue
.
head
->
next
;
CTG_QUEUE_SUB
();
...
...
@@ -244,43 +344,34 @@ void ctgPopAction(SCtgMetaAction **action) {
}
int32_t
ctgPushAction
(
SCtgMetaAction
*
action
)
{
int32_t
ctgPushAction
(
SC
atalog
*
pCtg
,
SC
tgMetaAction
*
action
)
{
SCtgQNode
*
node
=
calloc
(
1
,
sizeof
(
SCtgQNode
));
if
(
NULL
==
node
)
{
qError
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgQNode
));
CTG_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
action
->
seqId
=
atomic_add_fetch_64
(
&
gCtgMgmt
.
queue
.
seqId
,
1
);
node
->
action
=
*
action
;
CTG_LOCK
(
CTG_WRITE
,
&
gCtgMgmt
.
qlock
);
gCtgMgmt
.
tail
->
next
=
node
;
gCtgMgmt
.
tail
=
node
;
CTG_UNLOCK
(
CTG_WRITE
,
&
gCtgMgmt
.
qlock
);
CTG_LOCK
(
CTG_WRITE
,
&
gCtgMgmt
.
q
ueue
.
q
lock
);
gCtgMgmt
.
queue
.
tail
->
next
=
node
;
gCtgMgmt
.
queue
.
tail
=
node
;
CTG_UNLOCK
(
CTG_WRITE
,
&
gCtgMgmt
.
q
ueue
.
q
lock
);
CTG_QUEUE_ADD
();
CTG_STAT_ADD
(
gCtgMgmt
.
stat
.
runtime
.
qNum
);
tsem_post
(
&
gCtgMgmt
.
sem
);
return
TSDB_CODE_SUCCESS
;
}
tsem_post
(
&
gCtgMgmt
.
queue
.
reqSem
);
ctgDebug
(
"action [%s] added into queue"
,
gCtgAction
[
action
->
act
].
name
);
void
ctgFreeMetaRent
(
SCtgRentMgmt
*
mgmt
)
{
if
(
NULL
==
mgmt
->
slots
)
{
return
;
}
for
(
int32_t
i
=
0
;
i
<
mgmt
->
slotNum
;
++
i
)
{
SCtgRentSlot
*
slot
=
&
mgmt
->
slots
[
i
];
if
(
slot
->
meta
)
{
taosArrayDestroy
(
slot
->
meta
);
slot
->
meta
=
NULL
;
}
if
(
action
->
syncReq
)
{
ctgWaitAction
(
action
);
}
tfree
(
mgmt
->
slots
)
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -299,9 +390,7 @@ int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId)
action
.
data
=
msg
;
CTG_ERR_JRET
(
ctgPushAction
(
&
action
));
ctgDebug
(
"action [%s] added into queue"
,
gCtgAction
[
action
.
act
].
name
);
CTG_ERR_JRET
(
ctgPushAction
(
pCtg
,
&
action
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -329,9 +418,7 @@ int32_t ctgPushRmStbMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
action
.
data
=
msg
;
CTG_ERR_JRET
(
ctgPushAction
(
&
action
));
ctgDebug
(
"action [%s] added into queue"
,
gCtgAction
[
action
.
act
].
name
);
CTG_ERR_JRET
(
ctgPushAction
(
pCtg
,
&
action
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -359,9 +446,7 @@ int32_t ctgPushRmTblMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
action
.
data
=
msg
;
CTG_ERR_JRET
(
ctgPushAction
(
&
action
));
ctgDebug
(
"action [%s] added into queue"
,
gCtgAction
[
action
.
act
].
name
);
CTG_ERR_JRET
(
ctgPushAction
(
pCtg
,
&
action
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -371,69 +456,32 @@ _return:
CTG_RET
(
code
);
}
void
ctgFreeTableMetaCache
(
SCtgTbMetaCache
*
cache
)
{
CTG_LOCK
(
CTG_WRITE
,
&
cache
->
stbLock
);
if
(
cache
->
stbCache
)
{
taosHashCleanup
(
cache
->
stbCache
);
cache
->
stbCache
=
NULL
;
}
CTG_UNLOCK
(
CTG_WRITE
,
&
cache
->
stbLock
);
CTG_LOCK
(
CTG_WRITE
,
&
cache
->
metaLock
);
if
(
cache
->
metaCache
)
{
taosHashCleanup
(
cache
->
metaCache
);
cache
->
metaCache
=
NULL
;
}
CTG_UNLOCK
(
CTG_WRITE
,
&
cache
->
metaLock
);
}
void
ctgFreeVgInfo
(
SDBVgInfo
*
vgInfo
)
{
if
(
NULL
==
vgInfo
)
{
return
;
}
if
(
vgInfo
->
vgHash
)
{
taosHashCleanup
(
vgInfo
->
vgHash
);
vgInfo
->
vgHash
=
NULL
;
}
tfree
(
vgInfo
);
}
void
ctgFreeDbCache
(
SCtgDBCache
*
dbCache
)
{
if
(
NULL
==
dbCache
)
{
return
;
int32_t
ctgPushUpdateVgMsgInQueue
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
int64_t
dbId
,
SDBVgInfo
*
dbInfo
,
bool
syncReq
)
{
int32_t
code
=
0
;
SCtgMetaAction
action
=
{.
act
=
CTG_ACT_UPDATE_VG
,
.
syncReq
=
syncReq
};
SCtgUpdateVgMsg
*
msg
=
malloc
(
sizeof
(
SCtgUpdateVgMsg
));
if
(
NULL
==
msg
)
{
ctgError
(
"malloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgUpdateVgMsg
));
ctgFreeVgInfo
(
dbInfo
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
CTG_LOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
ctgFreeVgInfo
(
dbCache
->
vgInfo
);
CTG_UNLOCK
(
CTG_WRITE
,
&
dbCache
->
vgLock
);
ctgFreeTableMetaCache
(
&
dbCache
->
tbCache
);
}
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
msg
->
pCtg
=
pCtg
;
msg
->
dbId
=
dbId
;
msg
->
dbInfo
=
dbInfo
;
action
.
data
=
msg
;
void
ctgFreeHandle
(
SCatalog
*
pCtg
)
{
ctgFreeMetaRent
(
&
pCtg
->
dbRent
);
ctgFreeMetaRent
(
&
pCtg
->
stbRent
);
if
(
pCtg
->
dbCache
)
{
void
*
pIter
=
taosHashIterate
(
pCtg
->
dbCache
,
NULL
);
while
(
pIter
)
{
SCtgDBCache
*
dbCache
=
pIter
;
CTG_ERR_JRET
(
ctgPushAction
(
pCtg
,
&
action
));
atomic_store_8
(
&
dbCache
->
deleted
,
1
)
;
return
TSDB_CODE_SUCCESS
;
ctgFreeDbCache
(
dbCache
);
pIter
=
taosHashIterate
(
pCtg
->
dbCache
,
pIter
);
}
_return:
taosHashCleanup
(
pCtg
->
dbCache
);
}
free
(
pCtg
);
ctgFreeVgInfo
(
dbInfo
);
tfree
(
action
.
data
);
CTG_RET
(
code
);
}
...
...
@@ -1591,38 +1639,57 @@ int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const
}
CTG_ERR_JRET
(
ctgCloneVgInfo
(
DbOut
.
dbVgroup
,
pInfo
));
SCtgMetaAction
action
=
{.
act
=
CTG_ACT_UPDATE_VG
};
SCtgUpdateVgMsg
*
msg
=
malloc
(
sizeof
(
SCtgUpdateVgMsg
));
if
(
NULL
==
msg
)
{
ctgError
(
"malloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgUpdateVgMsg
));
ctgFreeVgInfo
(
DbOut
.
dbVgroup
);
CTG_ERR_RET
(
TSDB_CODE_CTG_MEM_ERROR
);
}
strncpy
(
msg
->
dbFName
,
dbFName
,
sizeof
(
msg
->
dbFName
));
msg
->
pCtg
=
pCtg
;
msg
->
dbId
=
DbOut
.
dbId
;
msg
->
dbInfo
=
DbOut
.
dbVgroup
;
action
.
data
=
msg
;
CTG_ERR_JRET
(
ctgPushAction
(
&
action
));
ctgDebug
(
"action [%s] added into queue"
,
gCtgAction
[
action
.
act
].
name
);
CTG_ERR_RET
(
ctgPushUpdateVgMsgInQueue
(
pCtg
,
dbFName
,
DbOut
.
dbId
,
DbOut
.
dbVgroup
,
false
));
return
TSDB_CODE_SUCCESS
;
_return:
tfree
(
*
pInfo
);
tfree
(
msg
);
*
pInfo
=
DbOut
.
dbVgroup
;
CTG_RET
(
code
);
}
int32_t
ctgRefreshDBVgInfo
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbFName
)
{
bool
inCache
=
false
;
int32_t
code
=
0
;
SCtgDBCache
**
dbCache
=
NULL
;
CTG_ERR_RET
(
ctgAcquireVgInfoFromCache
(
pCtg
,
dbFName
,
dbCache
,
&
inCache
));
SUseDbOutput
DbOut
=
{
0
};
SBuildUseDBInput
input
=
{
0
};
tstrncpy
(
input
.
db
,
dbFName
,
tListLen
(
input
.
db
));
if
(
inCache
)
{
input
.
dbId
=
(
*
dbCache
)
->
dbId
;
input
.
vgVersion
=
(
*
dbCache
)
->
vgInfo
->
vgVersion
;
input
.
numOfTable
=
(
*
dbCache
)
->
vgInfo
->
numOfTable
;
ctgReleaseVgInfo
(
*
dbCache
);
ctgReleaseDBCache
(
pCtg
,
*
dbCache
);
}
else
{
input
.
vgVersion
=
CTG_DEFAULT_INVALID_VERSION
;
}
code
=
ctgGetDBVgInfoFromMnode
(
pCtg
,
pRpc
,
pMgmtEps
,
&
input
,
&
DbOut
);
if
(
code
)
{
if
(
CTG_DB_NOT_EXIST
(
code
)
&&
input
.
vgVersion
>
CTG_DEFAULT_INVALID_VERSION
)
{
ctgDebug
(
"db no longer exist, dbFName:%s, dbId:%"
PRIx64
,
input
.
db
,
input
.
dbId
);
ctgPushRmDBMsgInQueue
(
pCtg
,
input
.
db
,
input
.
dbId
);
}
CTG_ERR_RET
(
code
);
}
CTG_ERR_RET
(
ctgPushUpdateVgMsgInQueue
(
pCtg
,
dbFName
,
DbOut
.
dbId
,
DbOut
.
dbVgroup
,
true
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgCloneMetaOutput
(
STableMetaOutput
*
output
,
STableMetaOutput
**
pOutput
)
{
*
pOutput
=
malloc
(
sizeof
(
STableMetaOutput
));
...
...
@@ -1746,9 +1813,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
action
.
data
=
msg
;
CTG_ERR_JRET
(
ctgPushAction
(
&
action
));
ctgDebug
(
"action [%s] added into queue"
,
gCtgAction
[
action
.
act
].
name
);
CTG_ERR_JRET
(
ctgPushAction
(
pCtg
,
&
action
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2042,9 +2107,10 @@ void* ctgUpdateThreadFunc(void* param) {
CTG_LOCK
(
CTG_READ
,
&
gCtgMgmt
.
lock
);
while
(
true
)
{
tsem_wait
(
&
gCtgMgmt
.
s
em
);
tsem_wait
(
&
gCtgMgmt
.
queue
.
reqS
em
);
if
(
atomic_load_8
(
&
gCtgMgmt
.
exit
))
{
tsem_post
(
&
gCtgMgmt
.
queue
.
rspSem
);
break
;
}
...
...
@@ -2056,6 +2122,12 @@ void* ctgUpdateThreadFunc(void* param) {
(
*
gCtgAction
[
action
->
act
].
func
)(
action
);
gCtgMgmt
.
queue
.
seqDone
=
action
->
seqId
;
if
(
action
->
syncReq
)
{
tsem_post
(
&
gCtgMgmt
.
queue
.
rspSem
);
}
CTG_STAT_ADD
(
gCtgMgmt
.
stat
.
runtime
.
qDoneNum
);
ctgDbgShowClusterCache
(
pCtg
);
...
...
@@ -2125,14 +2197,15 @@ int32_t catalogInit(SCatalogCfg *cfg) {
CTG_ERR_RET
(
ctgStartUpdateThread
());
tsem_init
(
&
gCtgMgmt
.
sem
,
0
,
0
);
tsem_init
(
&
gCtgMgmt
.
queue
.
reqSem
,
0
,
0
);
tsem_init
(
&
gCtgMgmt
.
queue
.
rspSem
,
0
,
0
);
gCtgMgmt
.
head
=
calloc
(
1
,
sizeof
(
SCtgQNode
));
if
(
NULL
==
gCtgMgmt
.
head
)
{
gCtgMgmt
.
queue
.
head
=
calloc
(
1
,
sizeof
(
SCtgQNode
));
if
(
NULL
==
gCtgMgmt
.
queue
.
head
)
{
qError
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SCtgQNode
));
CTG_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
gCtgMgmt
.
tail
=
gCtgMgmt
.
head
;
gCtgMgmt
.
queue
.
tail
=
gCtgMgmt
.
queue
.
head
;
qDebug
(
"catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u"
,
gCtgMgmt
.
cfg
.
maxDBCacheNum
,
gCtgMgmt
.
cfg
.
maxTblCacheNum
,
gCtgMgmt
.
cfg
.
dbRentSec
,
gCtgMgmt
.
cfg
.
stbRentSec
);
...
...
@@ -2332,12 +2405,10 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId
action
.
data
=
msg
;
CTG_ERR_JRET
(
ctgPushAction
(
&
action
));
CTG_ERR_JRET
(
ctgPushAction
(
pCtg
,
&
action
));
dbInfo
=
NULL
;
ctgDebug
(
"action [%s] added into queue"
,
gCtgAction
[
action
.
act
].
name
);
CTG_API_LEAVE
(
code
);
_return:
...
...
@@ -2443,9 +2514,7 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) {
action
.
data
=
msg
;
CTG_ERR_JRET
(
ctgPushAction
(
&
action
));
ctgDebug
(
"action [%s] added into queue"
,
gCtgAction
[
action
.
act
].
name
);
CTG_ERR_JRET
(
ctgPushAction
(
pCtg
,
&
action
));
CTG_API_LEAVE
(
code
);
...
...
@@ -2458,6 +2527,15 @@ _return:
CTG_API_LEAVE
(
code
);
}
int32_t
catalogRefreshDBVgInfo
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbFName
)
{
CTG_API_ENTER
();
if
(
NULL
==
pCtg
||
NULL
==
pTrans
||
NULL
==
pMgmtEps
||
NULL
==
dbFName
)
{
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
CTG_API_LEAVE
(
ctgRefreshDBVgInfo
(
pCtg
,
pTrans
,
pMgmtEps
,
dbFName
));
}
int32_t
catalogRefreshTableMeta
(
SCatalog
*
pCtg
,
void
*
pTrans
,
const
SEpSet
*
pMgmtEps
,
const
SName
*
pTableName
,
int32_t
isSTable
)
{
CTG_API_ENTER
();
...
...
@@ -2702,7 +2780,8 @@ void catalogDestroy(void) {
atomic_store_8
(
&
gCtgMgmt
.
exit
,
true
);
tsem_post
(
&
gCtgMgmt
.
sem
);
tsem_post
(
&
gCtgMgmt
.
queue
.
reqSem
);
tsem_post
(
&
gCtgMgmt
.
queue
.
rspSem
);
while
(
CTG_IS_LOCKED
(
&
gCtgMgmt
.
lock
))
{
usleep
(
1
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
763eccf8
...
...
@@ -721,7 +721,7 @@ int32_t getMaximumIdleDurationSec();
void
doInvokeUdf
(
struct
SUdfInfo
*
pUdfInfo
,
SqlFunctionCtx
*
pCtx
,
int32_t
idx
,
int32_t
type
);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
);
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
SQueryErrorInfo
*
errInfo
);
#ifdef __cplusplus
}
...
...
source/libs/executor/src/executor.c
浏览文件 @
763eccf8
...
...
@@ -84,7 +84,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
}
qTaskInfo_t
pTaskInfo
=
NULL
;
code
=
qCreateExecTask
(
streamReadHandle
,
0
,
0
,
plan
,
&
pTaskInfo
,
NULL
);
code
=
qCreateExecTask
(
streamReadHandle
,
0
,
0
,
plan
,
&
pTaskInfo
,
NULL
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// TODO: destroy SSubplan & pTaskInfo
terrno
=
code
;
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
763eccf8
...
...
@@ -51,11 +51,11 @@ static void freeqinfoFn(void *qhandle) {
qDestroyTask
(
*
handle
);
}
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
)
{
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
,
SQueryErrorInfo
*
errInfo
)
{
assert
(
readHandle
!=
NULL
&&
pSubplan
!=
NULL
);
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
int32_t
code
=
createExecTaskInfoImpl
(
pSubplan
,
pTask
,
readHandle
,
taskId
);
int32_t
code
=
createExecTaskInfoImpl
(
pSubplan
,
pTask
,
readHandle
,
taskId
,
errInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
763eccf8
...
...
@@ -8072,7 +8072,8 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead
static
int32_t
doCreateTableGroup
(
void
*
metaHandle
,
int32_t
tableType
,
uint64_t
tableUid
,
STableGroupInfo
*
pGroupInfo
,
uint64_t
queryId
,
uint64_t
taskId
);
SOperatorInfo
*
doCreateOperatorTreeNode
(
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
queryId
,
uint64_t
taskId
,
STableGroupInfo
*
pTableGroupInfo
)
{
int32_t
doCreateOperatorTreeNode
(
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
queryId
,
uint64_t
taskId
,
STableGroupInfo
*
pTableGroupInfo
,
SQueryErrorInfo
*
errInfo
)
{
int32_t
code
=
0
;
if
(
nodeType
(
pPhyNode
)
==
QUERY_NODE_PHYSICAL_PLAN_PROJECT
)
{
// ignore the project node
pPhyNode
=
nodesListGetNode
(
pPhyNode
->
pChildren
,
0
);
}
...
...
@@ -8081,11 +8082,21 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
==
nodeType
(
pPhyNode
))
{
SScanPhysiNode
*
pScanPhyNode
=
(
SScanPhysiNode
*
)
pPhyNode
;
char
tableFName
[
TSDB_TABLE_FNAME_LEN
];
tNameExtractFullName
(
&
pScanPhyNode
->
tableName
,
tableFName
);
code
=
vnodeValidateTableHash
(
pHandle
->
config
,
tableFName
);
if
(
code
)
{
errInfo
->
code
=
code
;
errInfo
->
tableName
=
pScanPhyNode
->
tableName
;
return
code
;
}
size_t
numOfCols
=
LIST_LENGTH
(
pScanPhyNode
->
pScanCols
);
tsdbReaderT
pDataReader
=
doCreateDataReader
((
STableScanPhysiNode
*
)
pPhyNode
,
pHandle
,
(
uint64_t
)
queryId
,
taskId
);
int32_t
code
=
doCreateTableGroup
(
pHandle
->
meta
,
pScanPhyNode
->
tableType
,
pScanPhyNode
->
uid
,
pTableGroupInfo
,
queryId
,
taskId
);
return
createTableScanOperatorInfo
(
pDataReader
,
pScanPhyNode
->
order
,
numOfCols
,
pScanPhyNode
->
count
,
pScanPhyNode
->
reverse
,
pTaskInfo
);
code
=
doCreateTableGroup
(
pHandle
->
meta
,
pScanPhyNode
->
tableType
,
pScanPhyNode
->
uid
,
pTableGroupInfo
,
queryId
,
taskId
);
pTaskInfo
->
pRoot
=
createTableScanOperatorInfo
(
pDataReader
,
pScanPhyNode
->
order
,
numOfCols
,
pScanPhyNode
->
count
,
pScanPhyNode
->
reverse
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
==
nodeType
(
pPhyNode
))
{
// SExchangePhysiNode* pEx = (SExchangePhysiNode*) pPhyNode;
// return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo);
...
...
@@ -8093,7 +8104,11 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SScanPhysiNode
*
pScanPhyNode
=
(
SScanPhysiNode
*
)
pPhyNode
;
// simple child table.
STableGroupInfo
groupInfo
=
{
0
};
int32_t
code
=
doCreateTableGroup
(
pHandle
->
meta
,
pScanPhyNode
->
tableType
,
pScanPhyNode
->
uid
,
&
groupInfo
,
queryId
,
taskId
);
code
=
doCreateTableGroup
(
pHandle
->
meta
,
pScanPhyNode
->
tableType
,
pScanPhyNode
->
uid
,
&
groupInfo
,
queryId
,
taskId
);
if
(
code
)
{
return
code
;
}
SArray
*
idList
=
NULL
;
if
(
groupInfo
.
numOfTables
>
0
)
{
...
...
@@ -8126,12 +8141,15 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SPhysiNode
*
pChildNode
=
(
SPhysiNode
*
)
nodesListGetNode
(
pPhyNode
->
pChildren
,
i
);
SOperatorInfo
*
op
=
doCreateOperatorTreeNode
(
pChildNode
,
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
pTableGroupInfo
);
code
=
doCreateOperatorTreeNode
(
pChildNode
,
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
pTableGroupInfo
,
errInfo
);
if
(
code
)
{
return
code
;
}
int32_t
resultRowSize
=
0
;
SArray
*
pExprInfo
=
createExprInfo
((
SAggPhysiNode
*
)
pPhyNode
,
&
resultRowSize
);
SSDataBlock
*
pResBlock
=
createOutputBuf_rv1
(
pPhyNode
->
pOutputDataBlockDesc
);
return
createAggregateOperatorInfo
(
op
,
pExprInfo
,
pResBlock
,
pTaskInfo
,
pTableGroupInfo
);
pTaskInfo
->
pRoot
=
createAggregateOperatorInfo
(
pTaskInfo
->
pRoot
,
pExprInfo
,
pResBlock
,
pTaskInfo
,
pTableGroupInfo
);
}
}
/*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
size_t size = taosArrayGetSize(pPhyNode->pChildren);
...
...
@@ -8143,6 +8161,12 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo);
}
}*/
if
(
pTaskInfo
->
pRoot
==
NULL
)
{
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
return
code
;
}
static
tsdbReaderT
createDataReaderImpl
(
STableScanPhysiNode
*
pTableScanNode
,
STableGroupInfo
*
pGroupInfo
,
void
*
readHandle
,
uint64_t
queryId
,
uint64_t
taskId
)
{
...
...
@@ -8205,7 +8229,7 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead
return
NULL
;
}
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
)
{
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
SQueryErrorInfo
*
errInfo
)
{
uint64_t
queryId
=
pPlan
->
id
.
queryId
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -8216,9 +8240,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
}
STableGroupInfo
group
=
{
0
};
(
*
pTaskInfo
)
->
pRoot
=
doCreateOperatorTreeNode
(
pPlan
->
pNode
,
*
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
&
group
);
if
((
*
pTaskInfo
)
->
pRoot
==
NULL
)
{
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
code
=
doCreateOperatorTreeNode
(
pPlan
->
pNode
,
*
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
&
group
,
errInfo
);
if
(
code
)
{
goto
_complete
;
}
...
...
source/libs/executor/test/executorTests.cpp
浏览文件 @
763eccf8
...
...
@@ -946,7 +946,7 @@ TEST(testCase, build_executor_tree_Test) {
int32_t
code
=
qStringToSubplan
(
msg
,
&
plan
);
ASSERT_EQ
(
code
,
0
);
code
=
qCreateExecTask
(
&
handle
,
2
,
1
,
plan
,
(
void
**
)
&
pTaskInfo
,
&
sinkHandle
);
code
=
qCreateExecTask
(
&
handle
,
2
,
1
,
plan
,
(
void
**
)
&
pTaskInfo
,
&
sinkHandle
,
NULL
);
ASSERT_EQ
(
code
,
0
);
}
...
...
source/libs/qcom/src/queryUtil.c
浏览文件 @
763eccf8
...
...
@@ -161,3 +161,17 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp
rpcSendRequest
(
pTransporter
,
epSet
,
&
rpcMsg
,
pTransporterId
);
return
TSDB_CODE_SUCCESS
;
}
SSchema
createSchema
(
uint8_t
type
,
int32_t
bytes
,
int32_t
colId
,
const
char
*
name
)
{
SSchema
s
=
{
0
};
s
.
type
=
type
;
s
.
bytes
=
bytes
;
s
.
colId
=
colId
;
tstrncpy
(
s
.
name
,
name
,
tListLen
(
s
.
name
));
return
s
;
}
source/libs/qworker/inc/qworkerMsg.h
浏览文件 @
763eccf8
...
...
@@ -36,7 +36,7 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_
void
qwBuildFetchRsp
(
void
*
msg
,
SOutputData
*
input
,
int32_t
len
,
bool
qComplete
);
int32_t
qwBuildAndSendCQueryMsg
(
QW_FPARAMS_DEF
,
void
*
connection
);
int32_t
qwBuildAndSendReadyRsp
(
void
*
connection
,
int32_t
code
);
int32_t
qwBuildAndSendQueryRsp
(
void
*
connection
,
int32_t
code
);
int32_t
qwBuildAndSendQueryRsp
(
void
*
connection
,
int32_t
code
,
SQueryErrorInfo
*
errInfo
);
void
qwFreeFetchRsp
(
void
*
msg
);
int32_t
qwMallocFetchRsp
(
int32_t
length
,
SRetrieveTableRsp
**
rsp
);
int32_t
qwGetSchTasksStatus
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
SSchedulerStatusRsp
**
rsp
);
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
763eccf8
...
...
@@ -998,6 +998,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
qTaskInfo_t
pTaskInfo
=
NULL
;
DataSinkHandle
sinkHandle
=
NULL
;
SQWTaskCtx
*
ctx
=
NULL
;
SQueryErrorInfo
errInfo
=
{
0
};
QW_ERR_JRET
(
qwHandlePrePhaseEvents
(
QW_FPARAMS
(),
QW_PHASE_PRE_QUERY
,
&
input
,
&
output
));
...
...
@@ -1019,7 +1020,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
QW_ERR_JRET
(
code
);
}
code
=
qCreateExecTask
(
qwMsg
->
node
,
0
,
tId
,
(
struct
SSubplan
*
)
plan
,
&
pTaskInfo
,
&
sinkHandle
);
code
=
qCreateExecTask
(
qwMsg
->
node
,
0
,
tId
,
(
struct
SSubplan
*
)
plan
,
&
pTaskInfo
,
&
sinkHandle
,
&
errInfo
);
if
(
code
)
{
QW_TASK_ELOG
(
"qCreateExecTask failed, code:%s"
,
tstrerror
(
code
));
QW_ERR_JRET
(
code
);
...
...
@@ -1032,7 +1033,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
//TODO OPTIMIZE EMTYP RESULT QUERY RSP TO AVOID FURTHER FETCH
QW_ERR_JRET
(
qwBuildAndSendQueryRsp
(
qwMsg
->
connection
,
code
));
QW_ERR_JRET
(
qwBuildAndSendQueryRsp
(
qwMsg
->
connection
,
code
,
NULL
));
QW_TASK_DLOG
(
"query msg rsped, code:%d"
,
code
);
queryRsped
=
true
;
...
...
@@ -1051,7 +1052,7 @@ _return:
}
if
(
!
queryRsped
)
{
qwBuildAndSendQueryRsp
(
qwMsg
->
connection
,
rspCode
);
qwBuildAndSendQueryRsp
(
qwMsg
->
connection
,
rspCode
,
&
errInfo
);
QW_TASK_DLOG
(
"query msg rsped, code:%x"
,
rspCode
);
}
...
...
source/libs/qworker/src/qworkerMsg.c
浏览文件 @
763eccf8
...
...
@@ -44,17 +44,23 @@ void qwFreeFetchRsp(void *msg) {
}
}
int32_t
qwBuildAndSendQueryRsp
(
void
*
connection
,
int32_t
code
)
{
int32_t
qwBuildAndSendQueryRsp
(
void
*
connection
,
int32_t
code
,
SQueryErrorInfo
*
errInfo
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
SQueryTableRsp
*
pRsp
=
(
SQueryTableRsp
*
)
rpcMallocCont
(
sizeof
(
SQueryTableRsp
));
pRsp
->
code
=
code
;
SQueryTableRsp
rsp
=
{.
code
=
code
};
if
(
errInfo
&&
errInfo
->
code
)
{
rsp
.
tableName
=
errInfo
->
tableName
;
}
int32_t
contLen
=
tSerializeSQueryTableRsp
(
NULL
,
0
,
&
rsp
);
void
*
msg
=
rpcMallocCont
(
contLen
);
tSerializeSQueryTableRsp
(
msg
,
contLen
,
&
rsp
);
SRpcMsg
rpcRsp
=
{
.
msgType
=
TDMT_VND_QUERY_RSP
,
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
)
,
.
pCont
=
msg
,
.
contLen
=
contLen
,
.
code
=
code
,
};
...
...
source/libs/qworker/test/qworkerTests.cpp
浏览文件 @
763eccf8
...
...
@@ -262,7 +262,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
return
;
}
int32_t
qwtCreateExecTask
(
void
*
tsdb
,
int32_t
vgId
,
struct
SSubplan
*
pPlan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
)
{
int32_t
qwtCreateExecTask
(
void
*
tsdb
,
int32_t
vgId
,
struct
SSubplan
*
pPlan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
,
SQueryErrorInfo
*
errInfo
)
{
int32_t
idx
=
abs
((
++
qwtTestCaseIdx
)
%
qwtTestCaseNum
);
qwtTestSinkBlockNum
=
0
;
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
763eccf8
...
...
@@ -136,6 +136,7 @@ typedef struct SSchJob {
uint64_t
queryId
;
SSchJobAttr
attr
;
int32_t
levelNum
;
int32_t
taskNum
;
void
*
transport
;
SArray
*
nodeList
;
// qnode/vnode list, element is SQueryNodeAddr
SArray
*
levels
;
// Element is SQueryLevel, starting from 0. SArray<SSchLevel>
...
...
@@ -154,7 +155,8 @@ typedef struct SSchJob {
int32_t
remoteFetch
;
SSchTask
*
fetchTask
;
int32_t
errCode
;
void
*
res
;
//TODO free it or not
SArray
*
errList
;
// SArray<SQueryErrorInfo>
void
*
resData
;
//TODO free it or not
int32_t
resNumOfRows
;
const
char
*
sql
;
SQueryProfileSummary
summary
;
...
...
@@ -168,9 +170,9 @@ extern SSchedulerMgmt schMgmt;
#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0)
#define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1)
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_
TASK_NEED_WAIT_ALL(task) ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY
)
#define SCH_
TASK_NO_NEED_DROP(task) ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY
)
#define SCH_IS_DATA_SRC_
QRY_
TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_
IS_DATA_SRC_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)
)
#define SCH_
IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum
)
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
...
...
@@ -180,12 +182,14 @@ extern SSchedulerMgmt schMgmt;
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_
QRY_
TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEAF_TASK(_job, _task) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
#define SCH_JOB_NEED_FETCH(_job) SCH_IS_QUERY_JOB(_job)
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
#define SCH_IS_WAIT_ALL_JOB(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epset.eps[(_addr)->epset.inUse])
#define SCH_SWITCH_EPSET(_addr) ((_addr)->epset.inUse = ((_addr)->epset.inUse + 1) % (_addr)->epset.numOfEps)
...
...
@@ -219,7 +223,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t
schLaunchTasksInFlowCtrlList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schLaunchTaskImpl
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schFetchFromRemote
(
SSchJob
*
pJob
);
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
);
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
,
SQueryErrorInfo
*
errInfo
);
#ifdef __cplusplus
...
...
source/libs/scheduler/src/schFlowCtrl.c
浏览文件 @
763eccf8
...
...
@@ -259,7 +259,7 @@ _return:
SCH_UNLOCK
(
SCH_WRITE
,
&
ctrl
->
lock
);
if
(
code
)
{
code
=
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
);
code
=
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
,
NULL
);
}
SCH_RET
(
code
);
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
763eccf8
...
...
@@ -410,6 +410,8 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_TASK_ELOG
(
"taosHashPut to planToTaks failed, taskIdx:%d"
,
n
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
++
pJob
->
taskNum
;
}
SCH_JOB_DLOG
(
"level initialized, taskNum:%d"
,
taskNum
);
...
...
@@ -588,7 +590,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return
TSDB_CODE_SUCCESS
;
//TODO CHECK epList/condidateList
if
(
SCH_IS_DATA_SRC_TASK
(
pTask
))
{
if
(
SCH_IS_DATA_SRC_
QRY_
TASK
(
pTask
))
{
}
else
{
int32_t
candidateNum
=
taosArrayGetSize
(
pTask
->
candidateAddrs
);
...
...
@@ -611,7 +613,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET
(
schLaunchTasksInFlowCtrlList
(
pJob
,
pTask
));
}
if
(
SCH_IS_DATA_SRC_TASK
(
pTask
))
{
if
(
SCH_IS_DATA_SRC_
QRY_
TASK
(
pTask
))
{
SCH_SWITCH_EPSET
(
&
pTask
->
plan
->
execNode
);
}
else
{
++
pTask
->
candidateIdx
;
...
...
@@ -727,8 +729,32 @@ int32_t schProcessOnDataFetched(SSchJob *job) {
tsem_post
(
&
job
->
rspSem
);
}
int32_t
schPushToErrInfoList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SQueryErrorInfo
*
errInfo
)
{
if
(
NULL
==
errInfo
||
!
SCH_IS_DATA_SRC_TASK
(
pTask
)
||
!
IS_CLIENT_RETRY_ERROR
(
errInfo
->
code
))
{
return
TSDB_CODE_SUCCESS
;
}
if
(
NULL
==
pJob
->
errList
)
{
SSchLevel
*
level
=
taosArrayGetLast
(
pJob
->
levels
);
pJob
->
errList
=
taosArrayInit
(
level
->
taskNum
,
sizeof
(
SQueryErrorInfo
));
if
(
NULL
==
pJob
->
errList
)
{
SCH_TASK_ELOG
(
"taosArrayInit %d errInfofailed"
,
pJob
->
taskNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
if
(
NULL
==
taosArrayPush
(
pJob
->
errList
,
errInfo
))
{
SCH_TASK_ELOG
(
"taosArrayPush errInfo to list failed, errCode:%x"
,
errInfo
->
code
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
return
TSDB_CODE_SUCCESS
;
}
// Note: no more task error processing, handled in function internal
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
)
{
int32_t
schProcessOnTaskFailure
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
errCode
,
SQueryErrorInfo
*
errInfo
)
{
int8_t
status
=
0
;
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
...
...
@@ -757,8 +783,10 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
}
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_FAILED
);
SCH_ERR_JRET
(
schPushToErrInfoList
(
pJob
,
pTask
,
errInfo
));
if
(
SCH_
TASK_NEED_WAIT_ALL
(
pTask
))
{
if
(
SCH_
IS_WAIT_ALL_JOB
(
pJob
))
{
SCH_LOCK
(
SCH_WRITE
,
&
pTask
->
level
->
lock
);
pTask
->
level
->
taskFailed
++
;
taskDone
=
pTask
->
level
->
taskSucceed
+
pTask
->
level
->
taskFailed
;
...
...
@@ -801,7 +829,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
if
(
parentNum
==
0
)
{
int32_t
taskDone
=
0
;
if
(
SCH_
TASK_NEED_WAIT_ALL
(
pTask
))
{
if
(
SCH_
IS_WAIT_ALL_JOB
(
pJob
))
{
SCH_LOCK
(
SCH_WRITE
,
&
pTask
->
level
->
lock
);
pTask
->
level
->
taskSucceed
++
;
taskDone
=
pTask
->
level
->
taskSucceed
+
pTask
->
level
->
taskFailed
;
...
...
@@ -870,11 +898,11 @@ int32_t schFetchFromRemote(SSchJob *pJob) {
return
TSDB_CODE_SUCCESS
;
}
void
*
res
=
atomic_load_ptr
(
&
pJob
->
res
);
if
(
res
)
{
void
*
res
Data
=
atomic_load_ptr
(
&
pJob
->
resData
);
if
(
res
Data
)
{
atomic_val_compare_exchange_32
(
&
pJob
->
remoteFetch
,
1
,
0
);
SCH_JOB_DLOG
(
"res already fetched, res:%p"
,
res
);
SCH_JOB_DLOG
(
"res already fetched, res:%p"
,
res
Data
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -886,7 +914,7 @@ _return:
atomic_val_compare_exchange_32
(
&
pJob
->
remoteFetch
,
1
,
0
);
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pJob
->
fetchTask
,
code
));
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pJob
->
fetchTask
,
code
,
NULL
));
}
...
...
@@ -894,6 +922,8 @@ _return:
int32_t
schHandleResponseMsg
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
int32_t
msgType
,
char
*
msg
,
int32_t
msgSize
,
int32_t
rspCode
)
{
int32_t
code
=
0
;
int8_t
status
=
0
;
bool
errInfoGot
=
false
;
SQueryErrorInfo
errInfo
=
{
0
};
if
(
schJobNeedToStop
(
pJob
,
&
status
))
{
SCH_TASK_ELOG
(
"rsp not processed cause of job status, job status:%d"
,
status
);
...
...
@@ -933,13 +963,23 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
break
;
}
case
TDMT_VND_QUERY_RSP
:
{
SQueryTableRsp
*
rsp
=
(
SQueryTableRsp
*
)
msg
;
SQueryTableRsp
rsp
=
{
0
};
if
(
msg
)
{
tDeserializeSQueryTableRsp
(
msg
,
msgSize
,
&
rsp
);
if
(
rsp
.
code
)
{
errInfo
.
code
=
rsp
.
code
;
errInfo
.
tableName
=
rsp
.
tableName
;
errInfoGot
=
true
;
}
SCH_ERR_JRET
(
rsp
.
code
);
}
SCH_ERR_JRET
(
rspCode
);
if
(
NULL
==
msg
)
{
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SCH_ERR_JRET
(
rsp
->
code
);
SCH_ERR_JRET
(
schBuildAndSendMsg
(
pJob
,
pTask
,
NULL
,
TDMT_VND_RES_READY
));
...
...
@@ -966,13 +1006,13 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
if
(
pJob
->
res
)
{
SCH_TASK_ELOG
(
"got fetch rsp while res already exists, res:%p"
,
pJob
->
res
);
if
(
pJob
->
res
Data
)
{
SCH_TASK_ELOG
(
"got fetch rsp while res already exists, res:%p"
,
pJob
->
res
Data
);
tfree
(
rsp
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
atomic_store_ptr
(
&
pJob
->
res
,
rsp
);
atomic_store_ptr
(
&
pJob
->
res
Data
,
rsp
);
atomic_add_fetch_32
(
&
pJob
->
resNumOfRows
,
htonl
(
rsp
->
numOfRows
));
if
(
rsp
->
completed
)
{
...
...
@@ -999,7 +1039,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
_return:
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
));
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
,
errInfoGot
?
&
errInfo
:
NULL
));
}
...
...
@@ -1423,7 +1463,7 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
_return:
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
));
SCH_RET
(
schProcessOnTaskFailure
(
pJob
,
pTask
,
code
,
NULL
));
}
int32_t
schLaunchLevelTasks
(
SSchJob
*
pJob
,
SSchLevel
*
level
)
{
...
...
@@ -1474,13 +1514,15 @@ void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
}
void
schDropTaskInHashList
(
SSchJob
*
pJob
,
SHashObj
*
list
)
{
if
(
!
SCH_IS_NEED_DROP_JOB
(
pJob
))
{
return
;
}
void
*
pIter
=
taosHashIterate
(
list
,
NULL
);
while
(
pIter
)
{
SSchTask
*
pTask
=
*
(
SSchTask
**
)
pIter
;
if
(
!
SCH_TASK_NO_NEED_DROP
(
pTask
))
{
schDropTaskOnExecutedNode
(
pJob
,
pTask
);
}
schDropTaskOnExecutedNode
(
pJob
,
pTask
);
pIter
=
taosHashIterate
(
list
,
pIter
);
}
...
...
@@ -1537,8 +1579,9 @@ void schFreeJobImpl(void *job) {
taosArrayDestroy
(
pJob
->
levels
);
taosArrayDestroy
(
pJob
->
nodeList
);
tfree
(
pJob
->
res
);
taosArrayDestroy
(
pJob
->
errList
);
tfree
(
pJob
->
resData
);
tfree
(
pJob
);
...
...
@@ -1673,8 +1716,12 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan* pDag, in
SCH_ERR_RET
(
schExecJobImpl
(
transport
,
nodeList
,
pDag
,
pJob
,
sql
,
true
));
SSchJob
*
job
=
schAcquireJob
(
*
pJob
);
pRes
->
code
=
atomic_load_32
(
&
job
->
errCode
);
pRes
->
numOfRows
=
job
->
resNumOfRows
;
pRes
->
errList
=
job
->
errList
;
job
->
errList
=
NULL
;
schReleaseJob
(
*
pJob
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1862,14 +1909,14 @@ int32_t schedulerFetchRows(int64_t job, void** pData) {
SCH_ERR_JRET
(
atomic_load_32
(
&
pJob
->
errCode
));
}
if
(
pJob
->
res
&&
((
SRetrieveTableRsp
*
)
pJob
->
res
)
->
completed
)
{
if
(
pJob
->
res
Data
&&
((
SRetrieveTableRsp
*
)
pJob
->
resData
)
->
completed
)
{
SCH_ERR_JRET
(
schCheckAndUpdateJobStatus
(
pJob
,
JOB_TASK_STATUS_SUCCEED
));
}
while
(
true
)
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
res
);
if
(
*
pData
!=
atomic_val_compare_exchange_ptr
(
&
pJob
->
res
,
*
pData
,
NULL
))
{
*
pData
=
atomic_load_ptr
(
&
pJob
->
res
Data
);
if
(
*
pData
!=
atomic_val_compare_exchange_ptr
(
&
pJob
->
res
Data
,
*
pData
,
NULL
))
{
continue
;
}
...
...
source/util/src/terror.c
浏览文件 @
763eccf8
...
...
@@ -324,6 +324,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operat
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_IS_SYNCING
,
"Database is syncing"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_INVALID_TSDB_STATE
,
"Invalid tsdb state"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_TB_NOT_EXIST
,
"Table not exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_VND_HASH_MISMATCH
,
"Hash value mismatch"
)
// tsdb
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_INVALID_TABLE_ID
,
"Invalid table ID"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录