Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
190b7029
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
190b7029
编写于
4月 29, 2022
作者:
D
dapan1121
提交者:
GitHub
4月 29, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12017 from taosdata/feature/qnode
fix: fix hb crash issue
上级
08504bd9
6973e355
变更
20
展开全部
隐藏空白更改
内联
并排
Showing
20 changed file
with
558 addition
and
195 deletion
+558
-195
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+2
-0
source/client/inc/clientStmt.h
source/client/inc/clientStmt.h
+14
-13
source/client/src/clientStmt.c
source/client/src/clientStmt.c
+44
-9
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+1
-1
source/dnode/qnode/inc/qndInt.h
source/dnode/qnode/inc/qndInt.h
+1
-1
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+1
-1
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+28
-0
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+63
-4
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+47
-7
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+2
-0
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+27
-5
source/libs/qworker/inc/qworkerInt.h
source/libs/qworker/inc/qworkerInt.h
+30
-6
source/libs/qworker/inc/qworkerMsg.h
source/libs/qworker/inc/qworkerMsg.h
+3
-3
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+145
-42
source/libs/qworker/src/qworkerMsg.c
source/libs/qworker/src/qworkerMsg.c
+9
-9
source/libs/scalar/src/filter.c
source/libs/scalar/src/filter.c
+1
-0
source/libs/scalar/src/scalar.c
source/libs/scalar/src/scalar.c
+12
-4
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+3
-0
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+34
-9
tests/script/api/batchprepare.c
tests/script/api/batchprepare.c
+91
-81
未找到文件。
include/libs/nodes/querynodes.h
浏览文件 @
190b7029
...
...
@@ -88,6 +88,7 @@ typedef struct SValueNode {
double
d
;
char
*
p
;
}
datum
;
int64_t
typeData
;
char
unit
;
}
SValueNode
;
...
...
@@ -313,6 +314,7 @@ bool nodesIsTimeorderQuery(const SNode* pQuery);
bool
nodesIsTimelineQuery
(
const
SNode
*
pQuery
);
void
*
nodesGetValueFromNode
(
SValueNode
*
pNode
);
int32_t
nodesSetValueNodeValue
(
SValueNode
*
pNode
,
void
*
value
);
char
*
nodesGetStrValueFromNode
(
SValueNode
*
pNode
);
char
*
getFillModeString
(
EFillMode
mode
);
void
valueNodeToVariant
(
const
SValueNode
*
pNode
,
SVariant
*
pVal
);
...
...
source/client/inc/clientStmt.h
浏览文件 @
190b7029
...
...
@@ -46,11 +46,12 @@ typedef struct SStmtTableCache {
void
*
boundTags
;
}
SStmtTableCache
;
typedef
struct
S
QueryFields
{
typedef
struct
S
StmtQueryResInfo
{
TAOS_FIELD
*
fields
;
TAOS_FIELD
*
userFields
;
uint32_t
numOfCols
;
}
SQueryFields
;
int32_t
precision
;
}
SStmtQueryResInfo
;
typedef
struct
SStmtBindInfo
{
bool
needParse
;
...
...
@@ -72,17 +73,17 @@ typedef struct SStmtExecInfo {
}
SStmtExecInfo
;
typedef
struct
SStmtSQLInfo
{
STMT_TYPE
type
;
STMT_STATUS
status
;
bool
autoCreate
;
uint64_t
runTimes
;
SHashObj
*
pTableCache
;
//SHash<SStmtTableCache>
SQuery
*
pQuery
;
char
*
sqlStr
;
int32_t
sqlLen
;
SArray
*
nodeList
;
SQueryPlan
*
pQueryPlan
;
S
QueryFields
field
s
;
STMT_TYPE
type
;
STMT_STATUS
status
;
bool
autoCreate
;
uint64_t
runTimes
;
SHashObj
*
pTableCache
;
//SHash<SStmtTableCache>
SQuery
*
pQuery
;
char
*
sqlStr
;
int32_t
sqlLen
;
SArray
*
nodeList
;
SQueryPlan
*
pQueryPlan
;
S
StmtQueryResInfo
queryRe
s
;
}
SStmtSQLInfo
;
typedef
struct
STscStmt
{
...
...
source/client/src/clientStmt.c
浏览文件 @
190b7029
...
...
@@ -74,17 +74,44 @@ int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) {
}
int32_t
stmtBackupQueryFields
(
STscStmt
*
pStmt
)
{
SQueryFields
*
pFields
=
&
pStmt
->
sql
.
fields
;
int32_t
size
=
pFields
->
numOfCols
*
sizeof
(
TAOS_FIELD
);
SStmtQueryResInfo
*
pRes
=
&
pStmt
->
sql
.
queryRes
;
pRes
->
numOfCols
=
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
numOfCols
;
pRes
->
precision
=
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
precision
;
pFields
->
numOfCols
=
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
numOfCols
;
p
Field
s
->
fields
=
taosMemoryMalloc
(
size
);
p
Field
s
->
userFields
=
taosMemoryMalloc
(
size
);
if
(
NULL
==
p
Fields
->
fields
||
NULL
==
pField
s
->
userFields
)
{
int32_t
size
=
pRes
->
numOfCols
*
sizeof
(
TAOS_FIELD
)
;
p
Re
s
->
fields
=
taosMemoryMalloc
(
size
);
p
Re
s
->
userFields
=
taosMemoryMalloc
(
size
);
if
(
NULL
==
p
Res
->
fields
||
NULL
==
pRe
s
->
userFields
)
{
STMT_ERR_RET
(
TSDB_CODE_TSC_OUT_OF_MEMORY
);
}
memcpy
(
pFields
->
fields
,
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
fields
,
size
);
memcpy
(
pFields
->
userFields
,
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
userFields
,
size
);
memcpy
(
pRes
->
fields
,
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
fields
,
size
);
memcpy
(
pRes
->
userFields
,
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
userFields
,
size
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
stmtRestoreQueryFields
(
STscStmt
*
pStmt
)
{
SStmtQueryResInfo
*
pRes
=
&
pStmt
->
sql
.
queryRes
;
int32_t
size
=
pRes
->
numOfCols
*
sizeof
(
TAOS_FIELD
);
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
numOfCols
=
pRes
->
numOfCols
;
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
precision
=
pRes
->
precision
;
if
(
NULL
==
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
fields
)
{
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
fields
=
taosMemoryMalloc
(
size
);
if
(
NULL
==
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
fields
)
{
STMT_ERR_RET
(
TSDB_CODE_TSC_OUT_OF_MEMORY
);
}
memcpy
(
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
fields
,
pRes
->
fields
,
size
);
}
if
(
NULL
==
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
userFields
)
{
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
userFields
=
taosMemoryMalloc
(
size
);
if
(
NULL
==
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
userFields
)
{
STMT_ERR_RET
(
TSDB_CODE_TSC_OUT_OF_MEMORY
);
}
memcpy
(
pStmt
->
exec
.
pRequest
->
body
.
resInfo
.
userFields
,
pRes
->
userFields
,
size
);
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -235,6 +262,8 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) {
}
int32_t
stmtCleanSQLInfo
(
STscStmt
*
pStmt
)
{
taosMemoryFree
(
pStmt
->
sql
.
queryRes
.
fields
);
taosMemoryFree
(
pStmt
->
sql
.
queryRes
.
userFields
);
taosMemoryFree
(
pStmt
->
sql
.
sqlStr
);
qDestroyQuery
(
pStmt
->
sql
.
pQuery
);
qDestroyQueryPlan
(
pStmt
->
sql
.
pQueryPlan
);
...
...
@@ -497,6 +526,8 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx) {
pStmt
->
sql
.
pQueryPlan
=
pStmt
->
exec
.
pRequest
->
body
.
pDag
;
pStmt
->
exec
.
pRequest
->
body
.
pDag
=
NULL
;
STMT_ERR_RET
(
stmtBackupQueryFields
(
pStmt
));
}
else
{
STMT_ERR_RET
(
stmtRestoreQueryFields
(
pStmt
));
}
STMT_RET
(
qStmtBindParam
(
pStmt
->
sql
.
pQueryPlan
,
bind
,
colIdx
,
pStmt
->
exec
.
pRequest
->
requestId
));
...
...
@@ -509,7 +540,11 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx) {
}
if
(
colIdx
<
0
)
{
qBindStmtColsValue
(
*
pDataBlock
,
bind
,
pStmt
->
exec
.
pRequest
->
msgBuf
,
pStmt
->
exec
.
pRequest
->
msgBufLen
);
int32_t
code
=
qBindStmtColsValue
(
*
pDataBlock
,
bind
,
pStmt
->
exec
.
pRequest
->
msgBuf
,
pStmt
->
exec
.
pRequest
->
msgBufLen
);
if
(
code
)
{
tscError
(
"qBindStmtColsValue failed, error:%s"
,
tstrerror
(
code
));
STMT_ERR_RET
(
code
);
}
}
else
{
if
(
colIdx
!=
(
pStmt
->
bInfo
.
sBindLastIdx
+
1
)
&&
colIdx
!=
0
)
{
tscError
(
"bind column index not in sequence"
);
...
...
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
190b7029
...
...
@@ -47,7 +47,7 @@ typedef int32_t (*MndInitFp)(SMnode *pMnode);
typedef
void
(
*
MndCleanupFp
)(
SMnode
*
pMnode
);
typedef
int32_t
(
*
ShowRetrieveFp
)(
SNodeMsg
*
pMsg
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
typedef
void
(
*
ShowFreeIterFp
)(
SMnode
*
pMnode
,
void
*
pIter
);
typedef
struct
SQWorker
Mgmt
SQHandle
;
typedef
struct
SQWorker
SQHandle
;
typedef
struct
{
const
char
*
name
;
...
...
source/dnode/qnode/inc/qndInt.h
浏览文件 @
190b7029
...
...
@@ -29,7 +29,7 @@
extern
"C"
{
#endif
typedef
struct
SQWorker
Mgmt
SQHandle
;
typedef
struct
SQWorker
SQHandle
;
typedef
struct
SQnode
{
int32_t
qndId
;
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
190b7029
...
...
@@ -52,7 +52,7 @@ typedef struct STsdb STsdb;
typedef
struct
STQ
STQ
;
typedef
struct
SVState
SVState
;
typedef
struct
SVBufPool
SVBufPool
;
typedef
struct
SQWorker
Mgmt
SQHandle
;
typedef
struct
SQWorker
SQHandle
;
#define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb"
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
190b7029
...
...
@@ -1911,23 +1911,51 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) {
break
;
case
TSDB_DATA_TYPE_BOOL
:
code
=
tjsonGetBoolValue
(
pJson
,
jkValueDatum
,
&
pNode
->
datum
.
b
);
*
(
bool
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
b
;
break
;
case
TSDB_DATA_TYPE_TINYINT
:
code
=
tjsonGetBigIntValue
(
pJson
,
jkValueDatum
,
&
pNode
->
datum
.
i
);
*
(
int8_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
i
;
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
code
=
tjsonGetBigIntValue
(
pJson
,
jkValueDatum
,
&
pNode
->
datum
.
i
);
*
(
int16_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
i
;
break
;
case
TSDB_DATA_TYPE_INT
:
code
=
tjsonGetBigIntValue
(
pJson
,
jkValueDatum
,
&
pNode
->
datum
.
i
);
*
(
int32_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
i
;
break
;
case
TSDB_DATA_TYPE_BIGINT
:
code
=
tjsonGetBigIntValue
(
pJson
,
jkValueDatum
,
&
pNode
->
datum
.
i
);
*
(
int64_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
i
;
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
code
=
tjsonGetBigIntValue
(
pJson
,
jkValueDatum
,
&
pNode
->
datum
.
i
);
*
(
int64_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
i
;
break
;
case
TSDB_DATA_TYPE_UTINYINT
:
code
=
tjsonGetUBigIntValue
(
pJson
,
jkValueDatum
,
&
pNode
->
datum
.
u
);
*
(
uint8_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
u
;
break
;
case
TSDB_DATA_TYPE_USMALLINT
:
code
=
tjsonGetUBigIntValue
(
pJson
,
jkValueDatum
,
&
pNode
->
datum
.
u
);
*
(
uint16_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
u
;
break
;
case
TSDB_DATA_TYPE_UINT
:
code
=
tjsonGetUBigIntValue
(
pJson
,
jkValueDatum
,
&
pNode
->
datum
.
u
);
*
(
uint32_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
u
;
break
;
case
TSDB_DATA_TYPE_UBIGINT
:
code
=
tjsonGetUBigIntValue
(
pJson
,
jkValueDatum
,
&
pNode
->
datum
.
u
);
*
(
uint64_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
u
;
break
;
case
TSDB_DATA_TYPE_FLOAT
:
code
=
tjsonGetDoubleValue
(
pJson
,
jkValueDatum
,
&
pNode
->
datum
.
d
);
*
(
float
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
d
;
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
code
=
tjsonGetDoubleValue
(
pJson
,
jkValueDatum
,
&
pNode
->
datum
.
d
);
*
(
double
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
d
;
break
;
case
TSDB_DATA_TYPE_NCHAR
:
case
TSDB_DATA_TYPE_VARCHAR
:
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
190b7029
...
...
@@ -871,21 +871,18 @@ void nodesClearList(SNodeList* pList) {
void
*
nodesGetValueFromNode
(
SValueNode
*
pNode
)
{
switch
(
pNode
->
node
.
resType
.
type
)
{
case
TSDB_DATA_TYPE_BOOL
:
return
(
void
*
)
&
pNode
->
datum
.
b
;
case
TSDB_DATA_TYPE_TINYINT
:
case
TSDB_DATA_TYPE_SMALLINT
:
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_BIGINT
:
case
TSDB_DATA_TYPE_TIMESTAMP
:
return
(
void
*
)
&
pNode
->
datum
.
i
;
case
TSDB_DATA_TYPE_UTINYINT
:
case
TSDB_DATA_TYPE_USMALLINT
:
case
TSDB_DATA_TYPE_UINT
:
case
TSDB_DATA_TYPE_UBIGINT
:
return
(
void
*
)
&
pNode
->
datum
.
u
;
case
TSDB_DATA_TYPE_FLOAT
:
case
TSDB_DATA_TYPE_DOUBLE
:
return
(
void
*
)
&
pNode
->
datum
.
d
;
return
(
void
*
)
&
pNode
->
typeData
;
case
TSDB_DATA_TYPE_NCHAR
:
case
TSDB_DATA_TYPE_VARCHAR
:
case
TSDB_DATA_TYPE_VARBINARY
:
...
...
@@ -897,6 +894,68 @@ void* nodesGetValueFromNode(SValueNode* pNode) {
return
NULL
;
}
int32_t
nodesSetValueNodeValue
(
SValueNode
*
pNode
,
void
*
value
)
{
switch
(
pNode
->
node
.
resType
.
type
)
{
case
TSDB_DATA_TYPE_BOOL
:
pNode
->
datum
.
b
=
*
(
bool
*
)
value
;
*
(
bool
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
b
;
break
;
case
TSDB_DATA_TYPE_TINYINT
:
pNode
->
datum
.
i
=
*
(
int8_t
*
)
value
;
*
(
int8_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
i
;
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
pNode
->
datum
.
i
=
*
(
int16_t
*
)
value
;
*
(
int16_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
i
;
break
;
case
TSDB_DATA_TYPE_INT
:
pNode
->
datum
.
i
=
*
(
int32_t
*
)
value
;
*
(
int32_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
i
;
break
;
case
TSDB_DATA_TYPE_BIGINT
:
pNode
->
datum
.
i
=
*
(
int64_t
*
)
value
;
*
(
int64_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
i
;
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
pNode
->
datum
.
i
=
*
(
int64_t
*
)
value
;
*
(
int64_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
i
;
break
;
case
TSDB_DATA_TYPE_UTINYINT
:
pNode
->
datum
.
u
=
*
(
int8_t
*
)
value
;
*
(
int8_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
u
;
break
;
case
TSDB_DATA_TYPE_USMALLINT
:
pNode
->
datum
.
u
=
*
(
int16_t
*
)
value
;
*
(
int16_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
u
;
break
;
case
TSDB_DATA_TYPE_UINT
:
pNode
->
datum
.
u
=
*
(
int32_t
*
)
value
;
*
(
int32_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
u
;
break
;
case
TSDB_DATA_TYPE_UBIGINT
:
pNode
->
datum
.
u
=
*
(
uint64_t
*
)
value
;
*
(
uint64_t
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
u
;
break
;
case
TSDB_DATA_TYPE_FLOAT
:
pNode
->
datum
.
d
=
*
(
float
*
)
value
;
*
(
float
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
d
;
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
pNode
->
datum
.
d
=
*
(
double
*
)
value
;
*
(
double
*
)
&
pNode
->
typeData
=
pNode
->
datum
.
d
;
break
;
case
TSDB_DATA_TYPE_NCHAR
:
case
TSDB_DATA_TYPE_VARCHAR
:
case
TSDB_DATA_TYPE_VARBINARY
:
pNode
->
datum
.
p
=
(
char
*
)
value
;
break
;
default:
return
TSDB_CODE_QRY_APP_ERROR
;
}
return
TSDB_CODE_SUCCESS
;
}
char
*
nodesGetStrValueFromNode
(
SValueNode
*
pNode
)
{
switch
(
pNode
->
node
.
resType
.
type
)
{
case
TSDB_DATA_TYPE_BOOL
:
{
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
190b7029
...
...
@@ -466,27 +466,66 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
break
;
case
TSDB_DATA_TYPE_BOOL
:
pVal
->
datum
.
b
=
(
0
==
strcasecmp
(
pVal
->
literal
,
"true"
));
*
(
bool
*
)
&
pVal
->
typeData
=
pVal
->
datum
.
b
;
break
;
case
TSDB_DATA_TYPE_TINYINT
:
case
TSDB_DATA_TYPE_SMALLINT
:
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_TINYINT
:{
char
*
endPtr
=
NULL
;
pVal
->
datum
.
i
=
strtoll
(
pVal
->
literal
,
&
endPtr
,
10
);
*
(
int8_t
*
)
&
pVal
->
typeData
=
pVal
->
datum
.
i
;
break
;
}
case
TSDB_DATA_TYPE_SMALLINT
:{
char
*
endPtr
=
NULL
;
pVal
->
datum
.
i
=
strtoll
(
pVal
->
literal
,
&
endPtr
,
10
);
*
(
int16_t
*
)
&
pVal
->
typeData
=
pVal
->
datum
.
i
;
break
;
}
case
TSDB_DATA_TYPE_INT
:{
char
*
endPtr
=
NULL
;
pVal
->
datum
.
i
=
strtoll
(
pVal
->
literal
,
&
endPtr
,
10
);
*
(
int32_t
*
)
&
pVal
->
typeData
=
pVal
->
datum
.
i
;
break
;
}
case
TSDB_DATA_TYPE_BIGINT
:
{
char
*
endPtr
=
NULL
;
pVal
->
datum
.
i
=
strtoll
(
pVal
->
literal
,
&
endPtr
,
10
);
*
(
int64_t
*
)
&
pVal
->
typeData
=
pVal
->
datum
.
i
;
break
;
}
case
TSDB_DATA_TYPE_UTINYINT
:{
char
*
endPtr
=
NULL
;
pVal
->
datum
.
u
=
strtoull
(
pVal
->
literal
,
&
endPtr
,
10
);
*
(
uint8_t
*
)
&
pVal
->
typeData
=
pVal
->
datum
.
u
;
break
;
}
case
TSDB_DATA_TYPE_USMALLINT
:{
char
*
endPtr
=
NULL
;
pVal
->
datum
.
u
=
strtoull
(
pVal
->
literal
,
&
endPtr
,
10
);
*
(
uint16_t
*
)
&
pVal
->
typeData
=
pVal
->
datum
.
u
;
break
;
}
case
TSDB_DATA_TYPE_UINT
:{
char
*
endPtr
=
NULL
;
pVal
->
datum
.
u
=
strtoull
(
pVal
->
literal
,
&
endPtr
,
10
);
*
(
uint32_t
*
)
&
pVal
->
typeData
=
pVal
->
datum
.
u
;
break
;
}
case
TSDB_DATA_TYPE_UTINYINT
:
case
TSDB_DATA_TYPE_USMALLINT
:
case
TSDB_DATA_TYPE_UINT
:
case
TSDB_DATA_TYPE_UBIGINT
:
{
char
*
endPtr
=
NULL
;
pVal
->
datum
.
u
=
strtoull
(
pVal
->
literal
,
&
endPtr
,
10
);
*
(
uint64_t
*
)
&
pVal
->
typeData
=
pVal
->
datum
.
u
;
break
;
}
case
TSDB_DATA_TYPE_FLOAT
:{
char
*
endPtr
=
NULL
;
pVal
->
datum
.
d
=
strtold
(
pVal
->
literal
,
&
endPtr
);
*
(
float
*
)
&
pVal
->
typeData
=
pVal
->
datum
.
d
;
break
;
}
case
TSDB_DATA_TYPE_FLOAT
:
case
TSDB_DATA_TYPE_DOUBLE
:
{
char
*
endPtr
=
NULL
;
pVal
->
datum
.
d
=
strtold
(
pVal
->
literal
,
&
endPtr
);
*
(
double
*
)
&
pVal
->
typeData
=
pVal
->
datum
.
d
;
break
;
}
case
TSDB_DATA_TYPE_VARCHAR
:
...
...
@@ -504,6 +543,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
TSDB_CODE_SUCCESS
)
{
return
generateDealNodeErrMsg
(
pCxt
,
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
pVal
->
literal
);
}
*
(
int64_t
*
)
&
pVal
->
typeData
=
pVal
->
datum
.
i
;
break
;
}
case
TSDB_DATA_TYPE_NCHAR
:
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
190b7029
...
...
@@ -206,6 +206,8 @@ static int32_t cpdMergeCond(SNode** pDst, SNode** pSrc) {
if
(
NULL
==
pLogicCond
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pLogicCond
->
node
.
resType
.
type
=
TSDB_DATA_TYPE_BOOL
;
pLogicCond
->
node
.
resType
.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_BOOL
].
bytes
;
pLogicCond
->
condType
=
LOGIC_COND_TYPE_AND
;
int32_t
code
=
nodesListMakeAppend
(
&
pLogicCond
->
pParameterList
,
*
pSrc
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
source/libs/planner/src/planner.c
浏览文件 @
190b7029
...
...
@@ -104,8 +104,9 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
pVal
->
node
.
resType
.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_NULL
].
bytes
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
inputSize
=
(
NULL
!=
pParam
->
length
?
*
(
pParam
->
length
)
:
tDataTypes
[
pParam
->
buffer_type
].
bytes
);
pVal
->
node
.
resType
.
type
=
pParam
->
buffer_type
;
pVal
->
node
.
resType
.
bytes
=
NULL
!=
pParam
->
length
?
*
(
pParam
->
length
)
:
tDataTypes
[
pParam
->
buffer_type
].
bytes
;
pVal
->
node
.
resType
.
bytes
=
inputSize
;
switch
(
pParam
->
buffer_type
)
{
case
TSDB_DATA_TYPE_BOOL
:
pVal
->
datum
.
b
=
*
((
bool
*
)
pParam
->
buffer
);
...
...
@@ -130,7 +131,6 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
break
;
case
TSDB_DATA_TYPE_VARCHAR
:
case
TSDB_DATA_TYPE_VARBINARY
:
case
TSDB_DATA_TYPE_NCHAR
:
pVal
->
datum
.
p
=
taosMemoryCalloc
(
1
,
pVal
->
node
.
resType
.
bytes
+
VARSTR_HEADER_SIZE
+
1
);
if
(
NULL
==
pVal
->
datum
.
p
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -138,6 +138,21 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
varDataSetLen
(
pVal
->
datum
.
p
,
pVal
->
node
.
resType
.
bytes
);
strncpy
(
varDataVal
(
pVal
->
datum
.
p
),
(
const
char
*
)
pParam
->
buffer
,
pVal
->
node
.
resType
.
bytes
);
break
;
case
TSDB_DATA_TYPE_NCHAR
:
{
pVal
->
node
.
resType
.
bytes
*=
TSDB_NCHAR_SIZE
;
pVal
->
datum
.
p
=
taosMemoryCalloc
(
1
,
pVal
->
node
.
resType
.
bytes
+
VARSTR_HEADER_SIZE
+
1
);
if
(
NULL
==
pVal
->
datum
.
p
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
output
=
0
;
if
(
!
taosMbsToUcs4
(
pParam
->
buffer
,
inputSize
,
(
TdUcs4
*
)
varDataVal
(
pVal
->
datum
.
p
),
pVal
->
node
.
resType
.
bytes
,
&
output
))
{
return
errno
;
}
varDataSetLen
(
pVal
->
datum
.
p
,
output
);
pVal
->
node
.
resType
.
bytes
=
output
;
break
;
}
case
TSDB_DATA_TYPE_TIMESTAMP
:
pVal
->
datum
.
i
=
*
((
int64_t
*
)
pParam
->
buffer
);
break
;
...
...
@@ -181,13 +196,20 @@ static EDealRes updatePlanQueryId(SNode* pNode, void* pContext) {
int32_t
qStmtBindParam
(
SQueryPlan
*
pPlan
,
TAOS_MULTI_BIND
*
pParams
,
int32_t
colIdx
,
uint64_t
queryId
)
{
int32_t
size
=
taosArrayGetSize
(
pPlan
->
pPlaceholderValues
);
int32_t
code
=
0
;
if
(
colIdx
<
0
)
{
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
setValueByBindParam
((
SValueNode
*
)
taosArrayGetP
(
pPlan
->
pPlaceholderValues
,
i
),
pParams
+
i
);
code
=
setValueByBindParam
((
SValueNode
*
)
taosArrayGetP
(
pPlan
->
pPlaceholderValues
,
i
),
pParams
+
i
);
if
(
code
)
{
return
code
;
}
}
}
else
{
setValueByBindParam
((
SValueNode
*
)
taosArrayGetP
(
pPlan
->
pPlaceholderValues
,
colIdx
),
pParams
);
code
=
setValueByBindParam
((
SValueNode
*
)
taosArrayGetP
(
pPlan
->
pPlaceholderValues
,
colIdx
),
pParams
);
if
(
code
)
{
return
code
;
}
}
if
(
colIdx
<
0
||
((
colIdx
+
1
)
==
size
))
{
...
...
source/libs/qworker/inc/qworkerInt.h
浏览文件 @
190b7029
...
...
@@ -23,6 +23,7 @@ extern "C" {
#include "qworker.h"
#include "tlockfree.h"
#include "ttimer.h"
#include "tref.h"
#define QW_DEFAULT_SCHEDULER_NUMBER 10000
#define QW_DEFAULT_TASK_NUMBER 10000
...
...
@@ -85,6 +86,12 @@ typedef struct SQWMsg {
SQWConnInfo
connInfo
;
}
SQWMsg
;
typedef
struct
SQWHbParam
{
bool
inUse
;
int32_t
qwrId
;
int64_t
refId
;
}
SQWHbParam
;
typedef
struct
SQWHbInfo
{
SSchedulerHbRsp
rsp
;
SQWConnInfo
connInfo
;
...
...
@@ -137,7 +144,8 @@ typedef struct SQWSchStatus {
}
SQWSchStatus
;
// Qnode/Vnode level task management
typedef
struct
SQWorkerMgmt
{
typedef
struct
SQWorker
{
int64_t
refId
;
SQWorkerCfg
cfg
;
int8_t
nodeType
;
int32_t
nodeId
;
...
...
@@ -148,9 +156,17 @@ typedef struct SQWorkerMgmt {
SHashObj
*
schHash
;
// key: schedulerId, value: SQWSchStatus
SHashObj
*
ctxHash
;
// key: queryId+taskId, value: SQWTaskCtx
SMsgCb
msgCb
;
}
SQWorker
;
typedef
struct
SQWorkerMgmt
{
SRWLatch
lock
;
int32_t
qwRef
;
int32_t
qwNum
;
SQWHbParam
param
[
1024
];
int32_t
paramIdx
;
}
SQWorkerMgmt
;
#define QW_FPARAMS_DEF SQWorker
Mgmt
*mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId
#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId
#define QW_IDS() sId, qId, tId, rId
#define QW_FPARAMS() mgmt, QW_IDS()
...
...
@@ -209,13 +225,13 @@ typedef struct SQWorkerMgmt {
} \
} while (0)
#define QW_ELOG(
param, ...) qError("QW:%p "
param, mgmt, __VA_ARGS__)
#define QW_DLOG(
param, ...) qDebug("QW:%p "
param, mgmt, __VA_ARGS__)
#define QW_ELOG(
_param, ...) qError("QW:%p " _
param, mgmt, __VA_ARGS__)
#define QW_DLOG(
_param, ...) qDebug("QW:%p " _
param, mgmt, __VA_ARGS__)
#define QW_DUMP(param, ...) \
#define QW_DUMP(
_
param, ...) \
do { \
if (gQWDebug.dumpEnable) { \
qDebug("QW:%p " param, mgmt, __VA_ARGS__); \
qDebug("QW:%p "
_
param, mgmt, __VA_ARGS__); \
} \
} while (0)
...
...
@@ -282,6 +298,14 @@ typedef struct SQWorkerMgmt {
} \
} while (0)
extern
SQWorkerMgmt
gQwMgmt
;
FORCE_INLINE
SQWorker
*
qwAcquire
(
int64_t
refId
)
{
return
(
SQWorker
*
)
taosAcquireRef
(
atomic_load_32
(
&
gQwMgmt
.
qwRef
),
refId
);
}
FORCE_INLINE
int32_t
qwRelease
(
int64_t
refId
)
{
return
taosReleaseRef
(
gQwMgmt
.
qwRef
,
refId
);
}
#ifdef __cplusplus
}
#endif
...
...
source/libs/qworker/inc/qworkerMsg.h
浏览文件 @
190b7029
...
...
@@ -28,7 +28,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t
qwProcessReady
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessFetch
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessDrop
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessHb
(
SQWorker
Mgmt
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
);
int32_t
qwProcessHb
(
SQWorker
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
);
int32_t
qwBuildAndSendDropRsp
(
SQWConnInfo
*
pConn
,
int32_t
code
);
int32_t
qwBuildAndSendCancelRsp
(
SQWConnInfo
*
pConn
,
int32_t
code
);
...
...
@@ -41,10 +41,10 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code);
int32_t
qwBuildAndSendExplainRsp
(
SQWConnInfo
*
pConn
,
SExplainExecInfo
*
execInfo
,
int32_t
num
);
void
qwFreeFetchRsp
(
void
*
msg
);
int32_t
qwMallocFetchRsp
(
int32_t
length
,
SRetrieveTableRsp
**
rsp
);
int32_t
qwGetSchTasksStatus
(
SQWorker
Mgmt
*
mgmt
,
uint64_t
sId
,
SSchedulerStatusRsp
**
rsp
);
int32_t
qwGetSchTasksStatus
(
SQWorker
*
mgmt
,
uint64_t
sId
,
SSchedulerStatusRsp
**
rsp
);
int32_t
qwBuildAndSendHbRsp
(
SQWConnInfo
*
pConn
,
SSchedulerHbRsp
*
rsp
,
int32_t
code
);
int32_t
qwRegisterQueryBrokenLinkArg
(
QW_FPARAMS_DEF
,
SQWConnInfo
*
pConn
);
int32_t
qwRegisterHbBrokenLinkArg
(
SQWorker
Mgmt
*
mgmt
,
uint64_t
sId
,
SQWConnInfo
*
pConn
);
int32_t
qwRegisterHbBrokenLinkArg
(
SQWorker
*
mgmt
,
uint64_t
sId
,
SQWConnInfo
*
pConn
);
#ifdef __cplusplus
}
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
190b7029
...
...
@@ -10,6 +10,11 @@
#include "tname.h"
SQWDebug
gQWDebug
=
{.
statusEnable
=
true
,
.
dumpEnable
=
true
};
SQWorkerMgmt
gQwMgmt
=
{
.
lock
=
0
,
.
qwRef
=
-
1
,
.
qwNum
=
0
,
};
int32_t
qwDbgValidateStatus
(
QW_FPARAMS_DEF
,
int8_t
oriStatus
,
int8_t
newStatus
,
bool
*
ignore
)
{
if
(
!
gQWDebug
.
statusEnable
)
{
...
...
@@ -98,7 +103,7 @@ _return:
void
qwDbgDumpSchInfo
(
SQWSchStatus
*
sch
,
int32_t
i
)
{}
void
qwDbgDumpMgmtInfo
(
SQWorker
Mgmt
*
mgmt
)
{
void
qwDbgDumpMgmtInfo
(
SQWorker
*
mgmt
)
{
if
(
!
gQWDebug
.
dumpEnable
)
{
return
;
}
...
...
@@ -186,7 +191,7 @@ int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwAddSchedulerImpl
(
SQWorker
Mgmt
*
mgmt
,
uint64_t
sId
,
int32_t
rwType
)
{
int32_t
qwAddSchedulerImpl
(
SQWorker
*
mgmt
,
uint64_t
sId
,
int32_t
rwType
)
{
SQWSchStatus
newSch
=
{
0
};
newSch
.
tasksHash
=
taosHashInit
(
mgmt
->
cfg
.
maxSchTaskNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
...
...
@@ -213,7 +218,7 @@ int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwAcquireSchedulerImpl
(
SQWorker
Mgmt
*
mgmt
,
uint64_t
sId
,
int32_t
rwType
,
SQWSchStatus
**
sch
,
int32_t
nOpt
)
{
int32_t
qwAcquireSchedulerImpl
(
SQWorker
*
mgmt
,
uint64_t
sId
,
int32_t
rwType
,
SQWSchStatus
**
sch
,
int32_t
nOpt
)
{
while
(
true
)
{
QW_LOCK
(
rwType
,
&
mgmt
->
schLock
);
*
sch
=
taosHashGet
(
mgmt
->
schHash
,
&
sId
,
sizeof
(
sId
));
...
...
@@ -240,15 +245,15 @@ int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType,
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwAcquireAddScheduler
(
SQWorker
Mgmt
*
mgmt
,
uint64_t
sId
,
int32_t
rwType
,
SQWSchStatus
**
sch
)
{
int32_t
qwAcquireAddScheduler
(
SQWorker
*
mgmt
,
uint64_t
sId
,
int32_t
rwType
,
SQWSchStatus
**
sch
)
{
return
qwAcquireSchedulerImpl
(
mgmt
,
sId
,
rwType
,
sch
,
QW_NOT_EXIST_ADD
);
}
int32_t
qwAcquireScheduler
(
SQWorker
Mgmt
*
mgmt
,
uint64_t
sId
,
int32_t
rwType
,
SQWSchStatus
**
sch
)
{
int32_t
qwAcquireScheduler
(
SQWorker
*
mgmt
,
uint64_t
sId
,
int32_t
rwType
,
SQWSchStatus
**
sch
)
{
return
qwAcquireSchedulerImpl
(
mgmt
,
sId
,
rwType
,
sch
,
QW_NOT_EXIST_RET_ERR
);
}
void
qwReleaseScheduler
(
int32_t
rwType
,
SQWorker
Mgmt
*
mgmt
)
{
QW_UNLOCK
(
rwType
,
&
mgmt
->
schLock
);
}
void
qwReleaseScheduler
(
int32_t
rwType
,
SQWorker
*
mgmt
)
{
QW_UNLOCK
(
rwType
,
&
mgmt
->
schLock
);
}
int32_t
qwAcquireTaskStatus
(
QW_FPARAMS_DEF
,
int32_t
rwType
,
SQWSchStatus
*
sch
,
SQWTaskStatus
**
task
)
{
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
...
...
@@ -384,7 +389,7 @@ int32_t qwAddTaskCtx(QW_FPARAMS_DEF) { QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), fal
int32_t
qwAddAcquireTaskCtx
(
QW_FPARAMS_DEF
,
SQWTaskCtx
**
ctx
)
{
return
qwAddTaskCtxImpl
(
QW_FPARAMS
(),
true
,
ctx
);
}
void
qwReleaseTaskCtx
(
SQWorker
Mgmt
*
mgmt
,
void
*
ctx
)
{
taosHashRelease
(
mgmt
->
ctxHash
,
ctx
);
}
void
qwReleaseTaskCtx
(
SQWorker
*
mgmt
,
void
*
ctx
)
{
taosHashRelease
(
mgmt
->
ctxHash
,
ctx
);
}
void
qwFreeTaskHandle
(
QW_FPARAMS_DEF
,
qTaskInfo_t
*
taskHandle
)
{
// Note: free/kill may in RC
...
...
@@ -606,7 +611,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
QW_RET
(
code
);
}
int32_t
qwGenerateSchHbRsp
(
SQWorker
Mgmt
*
mgmt
,
SQWSchStatus
*
sch
,
SQWHbInfo
*
hbInfo
)
{
int32_t
qwGenerateSchHbRsp
(
SQWorker
*
mgmt
,
SQWSchStatus
*
sch
,
SQWHbInfo
*
hbInfo
)
{
int32_t
taskNum
=
0
;
hbInfo
->
connInfo
=
sch
->
hbConnInfo
;
...
...
@@ -1262,7 +1267,7 @@ _return:
QW_RET
(
TSDB_CODE_SUCCESS
);
}
int32_t
qwProcessHbLinkBroken
(
SQWorker
Mgmt
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
)
{
int32_t
qwProcessHbLinkBroken
(
SQWorker
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
)
{
int32_t
code
=
0
;
SSchedulerHbRsp
rsp
=
{
0
};
SQWSchStatus
*
sch
=
NULL
;
...
...
@@ -1288,7 +1293,7 @@ int32_t qwProcessHbLinkBroken(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq
QW_RET
(
TSDB_CODE_SUCCESS
);
}
int32_t
qwProcessHb
(
SQWorker
Mgmt
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
)
{
int32_t
qwProcessHb
(
SQWorker
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
)
{
int32_t
code
=
0
;
SSchedulerHbRsp
rsp
=
{
0
};
SQWSchStatus
*
sch
=
NULL
;
...
...
@@ -1333,7 +1338,19 @@ _return:
}
void
qwProcessHbTimerEvent
(
void
*
param
,
void
*
tmrId
)
{
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
param
;
SQWHbParam
*
hbParam
=
(
SQWHbParam
*
)
param
;
if
(
hbParam
->
qwrId
!=
atomic_load_32
(
&
gQwMgmt
.
qwRef
))
{
return
;
}
int64_t
refId
=
hbParam
->
refId
;
SQWorker
*
mgmt
=
qwAcquire
(
refId
);
if
(
NULL
==
mgmt
)
{
QW_DLOG
(
"qwAcquire %"
PRIx64
"failed"
,
refId
);
taosMemoryFree
(
param
);
return
;
}
SQWSchStatus
*
sch
=
NULL
;
int32_t
taskNum
=
0
;
SQWHbInfo
*
rspList
=
NULL
;
...
...
@@ -1347,6 +1364,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
if
(
schNum
<=
0
)
{
QW_UNLOCK
(
QW_READ
,
&
mgmt
->
schLock
);
taosTmrReset
(
qwProcessHbTimerEvent
,
QW_DEFAULT_HEARTBEAT_MSEC
,
param
,
mgmt
->
timer
,
&
mgmt
->
hbTimer
);
qwRelease
(
refId
);
return
;
}
...
...
@@ -1355,6 +1373,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
QW_UNLOCK
(
QW_READ
,
&
mgmt
->
schLock
);
QW_ELOG
(
"calloc %d SQWHbInfo failed"
,
schNum
);
taosTmrReset
(
qwProcessHbTimerEvent
,
QW_DEFAULT_HEARTBEAT_MSEC
,
param
,
mgmt
->
timer
,
&
mgmt
->
hbTimer
);
qwRelease
(
refId
);
return
;
}
...
...
@@ -1396,6 +1415,74 @@ _return:
taosMemoryFreeClear
(
rspList
);
taosTmrReset
(
qwProcessHbTimerEvent
,
QW_DEFAULT_HEARTBEAT_MSEC
,
param
,
mgmt
->
timer
,
&
mgmt
->
hbTimer
);
qwRelease
(
refId
);
}
void
qwCloseRef
(
void
)
{
taosWLockLatch
(
&
gQwMgmt
.
lock
);
if
(
atomic_load_32
(
&
gQwMgmt
.
qwNum
)
<=
0
&&
gQwMgmt
.
qwRef
>=
0
)
{
taosCloseRef
(
gQwMgmt
.
qwRef
);
gQwMgmt
.
qwRef
=
-
1
;
}
taosWUnLockLatch
(
&
gQwMgmt
.
lock
);
}
void
qwDestroyImpl
(
void
*
pMgmt
)
{
SQWorker
*
mgmt
=
(
SQWorker
*
)
pMgmt
;
taosTmrStopA
(
&
mgmt
->
hbTimer
);
taosTmrCleanUp
(
mgmt
->
timer
);
// TODO STOP ALL QUERY
// TODO FREE ALL
taosHashCleanup
(
mgmt
->
ctxHash
);
taosHashCleanup
(
mgmt
->
schHash
);
taosMemoryFree
(
mgmt
);
atomic_sub_fetch_32
(
&
gQwMgmt
.
qwNum
,
1
);
qwCloseRef
();
}
int32_t
qwOpenRef
(
void
)
{
taosWLockLatch
(
&
gQwMgmt
.
lock
);
if
(
gQwMgmt
.
qwRef
<
0
)
{
gQwMgmt
.
qwRef
=
taosOpenRef
(
100
,
qwDestroyImpl
);
if
(
gQwMgmt
.
qwRef
<
0
)
{
taosWUnLockLatch
(
&
gQwMgmt
.
lock
);
qError
(
"init qworker ref failed"
);
QW_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
taosWUnLockLatch
(
&
gQwMgmt
.
lock
);
return
TSDB_CODE_SUCCESS
;
}
void
qwSetHbParam
(
int64_t
refId
,
SQWHbParam
**
pParam
)
{
int32_t
paramIdx
=
0
;
int32_t
newParamIdx
=
0
;
while
(
true
)
{
paramIdx
=
atomic_load_32
(
&
gQwMgmt
.
paramIdx
);
if
(
paramIdx
==
tListLen
(
gQwMgmt
.
param
))
{
newParamIdx
=
0
;
}
else
{
newParamIdx
=
paramIdx
+
1
;
}
if
(
paramIdx
==
atomic_val_compare_exchange_32
(
&
gQwMgmt
.
paramIdx
,
paramIdx
,
newParamIdx
))
{
break
;
}
}
gQwMgmt
.
param
[
paramIdx
].
qwrId
=
gQwMgmt
.
qwRef
;
gQwMgmt
.
param
[
paramIdx
].
refId
=
refId
;
*
pParam
=
&
gQwMgmt
.
param
[
paramIdx
];
}
int32_t
qWorkerInit
(
int8_t
nodeType
,
int32_t
nodeId
,
SQWorkerCfg
*
cfg
,
void
**
qWorkerMgmt
,
const
SMsgCb
*
pMsgCb
)
{
...
...
@@ -1404,10 +1491,21 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
QW_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
code
=
0
;
SQWorkerMgmt
*
mgmt
=
taosMemoryCalloc
(
1
,
sizeof
(
SQWorkerMgmt
));
int32_t
qwNum
=
atomic_add_fetch_32
(
&
gQwMgmt
.
qwNum
,
1
);
if
(
1
==
qwNum
)
{
memset
(
gQwMgmt
.
param
,
0
,
sizeof
(
gQwMgmt
.
param
));
}
int32_t
code
=
qwOpenRef
();
if
(
code
)
{
atomic_sub_fetch_32
(
&
gQwMgmt
.
qwNum
,
1
);
QW_RET
(
code
);
}
SQWorker
*
mgmt
=
taosMemoryCalloc
(
1
,
sizeof
(
SQWorker
));
if
(
NULL
==
mgmt
)
{
qError
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SQWorkerMgmt
));
qError
(
"calloc %d failed"
,
(
int32_t
)
sizeof
(
SQWorker
));
atomic_sub_fetch_32
(
&
gQwMgmt
.
qwNum
,
1
);
QW_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -1449,16 +1547,25 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
QW_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
mgmt
->
hbTimer
=
taosTmrStart
(
qwProcessHbTimerEvent
,
QW_DEFAULT_HEARTBEAT_MSEC
,
mgmt
,
mgmt
->
timer
);
if
(
NULL
==
mgmt
->
hbTimer
)
{
qError
(
"start hb timer failed"
);
QW_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
mgmt
->
nodeType
=
nodeType
;
mgmt
->
nodeId
=
nodeId
;
mgmt
->
msgCb
=
*
pMsgCb
;
mgmt
->
refId
=
taosAddRef
(
gQwMgmt
.
qwRef
,
mgmt
);
if
(
mgmt
->
refId
<
0
)
{
qError
(
"taosAddRef qw failed, error:%s"
,
tstrerror
(
terrno
));
QW_ERR_JRET
(
terrno
);
}
SQWHbParam
*
param
=
NULL
;
qwSetHbParam
(
mgmt
->
refId
,
&
param
);
mgmt
->
hbTimer
=
taosTmrStart
(
qwProcessHbTimerEvent
,
QW_DEFAULT_HEARTBEAT_MSEC
,
(
void
*
)
param
,
mgmt
->
timer
);
if
(
NULL
==
mgmt
->
hbTimer
)
{
qError
(
"start hb timer failed"
);
QW_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
*
qWorkerMgmt
=
mgmt
;
qDebug
(
"qworker initialized for node, type:%d, id:%d, handle:%p"
,
mgmt
->
nodeType
,
mgmt
->
nodeId
,
mgmt
);
...
...
@@ -1467,13 +1574,17 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
_return:
taosHashCleanup
(
mgmt
->
schHash
);
taosHashCleanup
(
mgmt
->
ctxHash
);
taosTmrCleanUp
(
mgmt
->
timer
);
taosMemoryFreeClear
(
mgmt
);
if
(
mgmt
->
refId
>=
0
)
{
qwRelease
(
mgmt
->
refId
);
}
else
{
taosHashCleanup
(
mgmt
->
schHash
);
taosHashCleanup
(
mgmt
->
ctxHash
);
taosTmrCleanUp
(
mgmt
->
timer
);
taosMemoryFreeClear
(
mgmt
);
atomic_sub_fetch_32
(
&
gQwMgmt
.
qwNum
,
1
);
}
QW_RET
(
code
);
}
...
...
@@ -1482,22 +1593,14 @@ void qWorkerDestroy(void **qWorkerMgmt) {
return
;
}
SQWorker
Mgmt
*
mgmt
=
*
qWorkerMgmt
;
SQWorker
*
mgmt
=
*
qWorkerMgmt
;
taosTmrStopA
(
&
mgmt
->
hbTimer
);
taosTmrCleanUp
(
mgmt
->
timer
);
// TODO STOP ALL QUERY
// TODO FREE ALL
taosHashCleanup
(
mgmt
->
ctxHash
);
taosHashCleanup
(
mgmt
->
schHash
);
taosMemoryFreeClear
(
*
qWorkerMgmt
);
if
(
taosRemoveRef
(
gQwMgmt
.
qwRef
,
mgmt
->
refId
))
{
qError
(
"remove qw from ref list failed, refId:%"
PRIx64
,
mgmt
->
refId
);
}
}
int32_t
qwGetSchTasksStatus
(
SQWorker
Mgmt
*
mgmt
,
uint64_t
sId
,
SSchedulerStatusRsp
**
rsp
)
{
int32_t
qwGetSchTasksStatus
(
SQWorker
*
mgmt
,
uint64_t
sId
,
SSchedulerStatusRsp
**
rsp
)
{
/*
SQWSchStatus *sch = NULL;
int32_t taskNum = 0;
...
...
@@ -1544,7 +1647,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRs
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwUpdateSchLastAccess
(
SQWorker
Mgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
)
{
int32_t
qwUpdateSchLastAccess
(
SQWorker
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
)
{
SQWSchStatus
*
sch
=
NULL
;
/*
...
...
@@ -1557,7 +1660,7 @@ int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, ui
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwGetTaskStatus
(
SQWorker
Mgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
int8_t
*
taskStatus
)
{
int32_t
qwGetTaskStatus
(
SQWorker
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
int8_t
*
taskStatus
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
...
...
@@ -1584,7 +1687,7 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_RET
(
code
);
}
int32_t
qwCancelTask
(
SQWorker
Mgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
)
{
int32_t
qwCancelTask
(
SQWorker
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
...
...
source/libs/qworker/src/qworkerMsg.c
浏览文件 @
190b7029
...
...
@@ -320,7 +320,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwRegisterHbBrokenLinkArg
(
SQWorker
Mgmt
*
mgmt
,
uint64_t
sId
,
SQWConnInfo
*
pConn
)
{
int32_t
qwRegisterHbBrokenLinkArg
(
SQWorker
*
mgmt
,
uint64_t
sId
,
SQWConnInfo
*
pConn
)
{
SSchedulerHbReq
req
=
{
0
};
req
.
header
.
vgId
=
mgmt
->
nodeId
;
req
.
sId
=
sId
;
...
...
@@ -363,7 +363,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t
code
=
0
;
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
SQWorker
Mgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid query msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
...
@@ -405,7 +405,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQueryContinueReq
*
msg
=
(
SQueryContinueReq
*
)
pMsg
->
pCont
;
bool
needStop
=
false
;
SQWTaskCtx
*
handles
=
NULL
;
SQWorker
Mgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid cquery msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
...
@@ -436,7 +436,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SQWorker
Mgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SResReadyReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid task ready msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
...
@@ -478,7 +478,7 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SQWorker
Mgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
msg
->
sId
=
htobe64
(
msg
->
sId
);
uint64_t
sId
=
msg
->
sId
;
...
...
@@ -499,7 +499,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
SResFetchReq
*
msg
=
pMsg
->
pCont
;
SQWorker
Mgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid fetch msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
...
@@ -539,7 +539,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SQWorker
Mgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
int32_t
code
=
0
;
STaskCancelReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
...
...
@@ -579,7 +579,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t
code
=
0
;
STaskDropReq
*
msg
=
pMsg
->
pCont
;
SQWorker
Mgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid task drop msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
...
@@ -621,7 +621,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t
code
=
0
;
SSchedulerHbReq
req
=
{
0
};
SQWorker
Mgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
if
(
NULL
==
pMsg
->
pCont
)
{
QW_ELOG
(
"invalid hb msg, msg:%p, msgLen:%d"
,
pMsg
->
pCont
,
pMsg
->
contLen
);
...
...
source/libs/scalar/src/filter.c
浏览文件 @
190b7029
...
...
@@ -3765,6 +3765,7 @@ int32_t filterInitFromNode(SNode* pNode, SFilterInfo **pInfo, uint32_t options)
FLT_ERR_JRET
(
fltReviseNodes
(
info
,
&
pNode
,
&
stat
));
info
->
scalarMode
=
stat
.
scalarMode
;
fltDebug
(
"scalar mode: %d"
,
info
->
scalarMode
);
if
(
!
info
->
scalarMode
)
{
FLT_ERR_JRET
(
fltInitFromNode
(
pNode
,
info
,
options
));
...
...
source/libs/scalar/src/scalar.c
浏览文件 @
190b7029
...
...
@@ -75,7 +75,15 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
if
(
valueNode
->
node
.
resType
.
type
!=
type
)
{
out
.
columnData
->
info
.
type
=
type
;
out
.
columnData
->
info
.
bytes
=
tDataTypes
[
type
].
bytes
;
if
(
IS_VAR_DATA_TYPE
(
type
))
{
if
(
IS_VAR_DATA_TYPE
(
valueNode
->
node
.
resType
.
type
))
{
out
.
columnData
->
info
.
bytes
=
valueNode
->
node
.
resType
.
bytes
*
TSDB_NCHAR_SIZE
;
}
else
{
out
.
columnData
->
info
.
bytes
=
64
*
TSDB_NCHAR_SIZE
;
}
}
else
{
out
.
columnData
->
info
.
bytes
=
tDataTypes
[
type
].
bytes
;
}
code
=
doConvertDataType
(
valueNode
,
&
out
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -598,7 +606,7 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
res
->
datum
.
p
=
taosMemoryCalloc
(
res
->
node
.
resType
.
bytes
+
VARSTR_HEADER_SIZE
+
1
,
1
);
memcpy
(
res
->
datum
.
p
,
output
.
columnData
->
pData
,
varDataTLen
(
output
.
columnData
->
pData
));
}
else
{
memcpy
(
nodesGetValueFromNode
(
res
),
output
.
columnData
->
pData
,
tDataTypes
[
type
].
bytes
);
nodesSetValueNodeValue
(
res
,
output
.
columnData
->
pData
);
}
}
...
...
@@ -638,7 +646,7 @@ EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) {
res
->
datum
.
p
=
output
.
columnData
->
pData
;
output
.
columnData
->
pData
=
NULL
;
}
else
{
memcpy
(
nodesGetValueFromNode
(
res
),
output
.
columnData
->
pData
,
tDataTypes
[
type
].
bytes
);
nodesSetValueNodeValue
(
res
,
output
.
columnData
->
pData
);
}
nodesDestroyNode
(
*
pNode
);
...
...
@@ -680,7 +688,7 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
res
->
datum
.
p
=
output
.
columnData
->
pData
;
output
.
columnData
->
pData
=
NULL
;
}
else
{
memcpy
(
nodesGetValueFromNode
(
res
),
output
.
columnData
->
pData
,
tDataTypes
[
type
].
bytes
);
nodesSetValueNodeValue
(
res
,
output
.
columnData
->
pData
);
}
}
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
190b7029
...
...
@@ -85,7 +85,10 @@ typedef struct SSchedulerMgmt {
uint64_t
taskId
;
// sequential taksId
uint64_t
sId
;
// schedulerId
SSchedulerCfg
cfg
;
SRWLatch
lock
;
bool
exit
;
int32_t
jobRef
;
int32_t
jobNum
;
SSchedulerStat
stat
;
SHashObj
*
hbConnections
;
}
SSchedulerMgmt
;
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
190b7029
...
...
@@ -21,7 +21,9 @@
#include "tref.h"
#include "trpc.h"
SSchedulerMgmt
schMgmt
=
{
0
};
SSchedulerMgmt
schMgmt
=
{
.
jobRef
=
-
1
,
};
FORCE_INLINE
SSchJob
*
schAcquireJob
(
int64_t
refId
)
{
return
(
SSchJob
*
)
taosAcquireRef
(
schMgmt
.
jobRef
,
refId
);
}
...
...
@@ -70,6 +72,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
int32_t
schInitJob
(
SSchJob
**
pSchJob
,
SQueryPlan
*
pDag
,
void
*
transport
,
SArray
*
pNodeList
,
const
char
*
sql
,
int64_t
startTs
,
bool
syncSchedule
)
{
int32_t
code
=
0
;
int64_t
refId
=
-
1
;
SSchJob
*
pJob
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchJob
));
if
(
NULL
==
pJob
)
{
qError
(
"QID:%"
PRIx64
" calloc %d failed"
,
pDag
->
queryId
,
(
int32_t
)
sizeof
(
SSchJob
));
...
...
@@ -114,15 +117,17 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray
tsem_init
(
&
pJob
->
rspSem
,
0
,
0
);
int64_t
refId
=
taosAddRef
(
schMgmt
.
jobRef
,
pJob
);
refId
=
taosAddRef
(
schMgmt
.
jobRef
,
pJob
);
if
(
refId
<
0
)
{
SCH_JOB_ELOG
(
"taosAddRef job failed, error:%s"
,
tstrerror
(
terrno
));
SCH_ERR_JRET
(
terrno
);
}
atomic_add_fetch_32
(
&
schMgmt
.
jobNum
,
1
);
if
(
NULL
==
schAcquireJob
(
refId
))
{
SCH_JOB_ELOG
(
"schAcquireJob job failed, refId:%"
PRIx64
,
refId
);
SCH_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
SCH_
ERR_J
RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
pJob
->
refId
=
refId
;
...
...
@@ -137,7 +142,11 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray
_return:
schFreeJobImpl
(
pJob
);
if
(
refId
<
0
)
{
schFreeJobImpl
(
pJob
);
}
else
{
taosRemoveRef
(
schMgmt
.
jobRef
,
refId
);
}
SCH_RET
(
code
);
}
...
...
@@ -2239,6 +2248,19 @@ int32_t schCancelJob(SSchJob *pJob) {
// TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
}
void
schCloseJobRef
(
void
)
{
if
(
!
atomic_load_8
((
int8_t
*
)
&
schMgmt
.
exit
))
{
return
;
}
SCH_LOCK
(
SCH_WRITE
,
&
schMgmt
.
lock
);
if
(
atomic_load_32
(
&
schMgmt
.
jobNum
)
<=
0
&&
schMgmt
.
jobRef
>=
0
)
{
taosCloseRef
(
schMgmt
.
jobRef
);
schMgmt
.
jobRef
=
-
1
;
}
SCH_UNLOCK
(
SCH_WRITE
,
&
schMgmt
.
lock
);
}
void
schFreeJobImpl
(
void
*
job
)
{
if
(
NULL
==
job
)
{
return
;
...
...
@@ -2284,6 +2306,10 @@ void schFreeJobImpl(void *job) {
taosMemoryFreeClear
(
pJob
);
qDebug
(
"QID:0x%"
PRIx64
" job freed, refId:%"
PRIx64
", pointer:%p"
,
queryId
,
refId
,
pJob
);
atomic_sub_fetch_32
(
&
schMgmt
.
jobNum
,
1
);
schCloseJobRef
();
}
static
int32_t
schExecJobImpl
(
void
*
transport
,
SArray
*
pNodeList
,
SQueryPlan
*
pDag
,
int64_t
*
job
,
const
char
*
sql
,
...
...
@@ -2368,7 +2394,7 @@ _return:
}
int32_t
schedulerInit
(
SSchedulerCfg
*
cfg
)
{
if
(
schMgmt
.
jobRef
)
{
if
(
schMgmt
.
jobRef
>=
0
)
{
qError
(
"scheduler already initialized"
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
...
...
@@ -2732,7 +2758,9 @@ void schedulerFreeTaskList(SArray *taskList) {
}
void
schedulerDestroy
(
void
)
{
if
(
schMgmt
.
jobRef
)
{
atomic_store_8
((
int8_t
*
)
&
schMgmt
.
exit
,
1
);
if
(
schMgmt
.
jobRef
>=
0
)
{
SSchJob
*
pJob
=
taosIterateRef
(
schMgmt
.
jobRef
,
0
);
int64_t
refId
=
0
;
...
...
@@ -2745,9 +2773,6 @@ void schedulerDestroy(void) {
pJob
=
taosIterateRef
(
schMgmt
.
jobRef
,
refId
);
}
taosCloseRef
(
schMgmt
.
jobRef
);
schMgmt
.
jobRef
=
0
;
}
if
(
schMgmt
.
hbConnections
)
{
...
...
tests/script/api/batchprepare.c
浏览文件 @
190b7029
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录