Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
928c60e2
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
928c60e2
编写于
4月 26, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/vnode_refact1
上级
42c6f019
2ece388a
变更
31
隐藏空白更改
内联
并排
Showing
31 changed file
with
452 addition
and
495 deletion
+452
-495
include/common/tmsg.h
include/common/tmsg.h
+1
-2
include/libs/function/functionMgt.h
include/libs/function/functionMgt.h
+6
-5
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+142
-153
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+4
-1
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+2
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+5
-24
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+0
-2
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+28
-28
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+5
-54
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+27
-25
source/dnode/mnode/impl/src/mndTelem.c
source/dnode/mnode/impl/src/mndTelem.c
+0
-1
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+16
-16
source/dnode/mnode/impl/test/sma/sma.cpp
source/dnode/mnode/impl/test/sma/sma.cpp
+4
-7
source/dnode/mnode/sdb/src/sdb.c
source/dnode/mnode/sdb/src/sdb.c
+1
-4
source/dnode/mnode/sdb/src/sdbFile.c
source/dnode/mnode/sdb/src/sdbFile.c
+1
-1
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+2
-1
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+1
-1
source/libs/function/inc/functionMgtInt.h
source/libs/function/inc/functionMgtInt.h
+2
-1
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+1
-1
source/libs/function/src/functionMgt.c
source/libs/function/src/functionMgt.c
+24
-35
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+1
-0
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+14
-0
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+22
-11
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+3
-19
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+72
-84
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+22
-4
source/libs/planner/test/planSTableTest.cpp
source/libs/planner/test/planSTableTest.cpp
+1
-1
tests/script/sh/massiveTable/compileVersion.sh
tests/script/sh/massiveTable/compileVersion.sh
+1
-1
tests/script/tsim/db/alter_option.sim
tests/script/tsim/db/alter_option.sim
+43
-0
tools/shell/inc/shellInt.h
tools/shell/inc/shellInt.h
+0
-1
tools/shell/src/shellEngine.c
tools/shell/src/shellEngine.c
+1
-12
未找到文件。
include/common/tmsg.h
浏览文件 @
928c60e2
...
...
@@ -177,6 +177,7 @@ typedef struct SField {
char
name
[
TSDB_COL_NAME_LEN
];
uint8_t
type
;
int32_t
bytes
;
int8_t
flags
;
}
SField
;
typedef
struct
SRetention
{
...
...
@@ -302,13 +303,11 @@ typedef struct {
int32_t
ttl
;
int32_t
numOfColumns
;
int32_t
numOfTags
;
int32_t
numOfSmas
;
int32_t
commentLen
;
int32_t
ast1Len
;
int32_t
ast2Len
;
SArray
*
pColumns
;
// array of SField
SArray
*
pTags
;
// array of SField
SArray
*
pSmas
;
// array of SField
char
*
comment
;
char
*
pAst1
;
char
*
pAst2
;
...
...
include/libs/function/functionMgt.h
浏览文件 @
928c60e2
...
...
@@ -20,8 +20,8 @@
extern
"C"
{
#endif
#include "querynodes.h"
#include "function.h"
#include "querynodes.h"
typedef
enum
EFunctionType
{
// aggregate function
...
...
@@ -123,10 +123,10 @@ struct SCatalog;
typedef
struct
SFmGetFuncInfoParam
{
struct
SCatalog
*
pCtg
;
void
*
pRpc
;
const
SEpSet
*
pMgmtEps
;
char
*
pErrBuf
;
int32_t
errBufLen
;
void
*
pRpc
;
const
SEpSet
*
pMgmtEps
;
char
*
pErrBuf
;
int32_t
errBufLen
;
}
SFmGetFuncInfoParam
;
int32_t
fmFuncMgtInit
();
...
...
@@ -143,6 +143,7 @@ bool fmIsDatetimeFunc(int32_t funcId);
bool
fmIsTimelineFunc
(
int32_t
funcId
);
bool
fmIsTimeorderFunc
(
int32_t
funcId
);
bool
fmIsPseudoColumnFunc
(
int32_t
funcId
);
bool
fmIsScanPseudoColumnFunc
(
int32_t
funcId
);
bool
fmIsWindowPseudoColumnFunc
(
int32_t
funcId
);
bool
fmIsWindowClauseFunc
(
int32_t
funcId
);
bool
fmIsSpecialDataRequiredFunc
(
int32_t
funcId
);
...
...
include/libs/nodes/plannodes.h
浏览文件 @
928c60e2
...
...
@@ -20,50 +20,46 @@
extern
"C"
{
#endif
#include "querynodes.h"
#include "query.h"
#include "querynodes.h"
#include "tname.h"
typedef
struct
SLogicNode
{
ENodeType
type
;
SNodeList
*
pTargets
;
// SColumnNode
SNode
*
pConditions
;
SNodeList
*
pChildren
;
ENodeType
type
;
SNodeList
*
pTargets
;
// SColumnNode
SNode
*
pConditions
;
SNodeList
*
pChildren
;
struct
SLogicNode
*
pParent
;
int32_t
optimizedFlag
;
int32_t
optimizedFlag
;
}
SLogicNode
;
typedef
enum
EScanType
{
SCAN_TYPE_TAG
,
SCAN_TYPE_TABLE
,
SCAN_TYPE_SYSTEM_TABLE
,
SCAN_TYPE_STREAM
}
EScanType
;
typedef
enum
EScanType
{
SCAN_TYPE_TAG
=
1
,
SCAN_TYPE_TABLE
,
SCAN_TYPE_SYSTEM_TABLE
,
SCAN_TYPE_STREAM
}
EScanType
;
typedef
struct
SScanLogicNode
{
SLogicNode
node
;
SNodeList
*
pScanCols
;
SLogicNode
node
;
SNodeList
*
pScanCols
;
SNodeList
*
pScanPseudoCols
;
struct
STableMeta
*
pMeta
;
SVgroupsInfo
*
pVgroupList
;
EScanType
scanType
;
uint8_t
scanSeq
[
2
];
// first is scan count, and second is reverse scan count
STimeWindow
scanRange
;
SName
tableName
;
bool
showRewrite
;
double
ratio
;
SNodeList
*
pDynamicScanFuncs
;
int32_t
dataRequired
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
SVgroupsInfo
*
pVgroupList
;
EScanType
scanType
;
uint8_t
scanSeq
[
2
];
// first is scan count, and second is reverse scan count
STimeWindow
scanRange
;
SName
tableName
;
bool
showRewrite
;
double
ratio
;
SNodeList
*
pDynamicScanFuncs
;
int32_t
dataRequired
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
}
SScanLogicNode
;
typedef
struct
SJoinLogicNode
{
SLogicNode
node
;
EJoinType
joinType
;
SNode
*
pOnConditions
;
EJoinType
joinType
;
SNode
*
pOnConditions
;
}
SJoinLogicNode
;
typedef
struct
SAggLogicNode
{
...
...
@@ -75,47 +71,43 @@ typedef struct SAggLogicNode {
typedef
struct
SProjectLogicNode
{
SLogicNode
node
;
SNodeList
*
pProjections
;
char
stmtName
[
TSDB_TABLE_NAME_LEN
];
int64_t
limit
;
int64_t
offset
;
int64_t
slimit
;
int64_t
soffset
;
char
stmtName
[
TSDB_TABLE_NAME_LEN
];
int64_t
limit
;
int64_t
offset
;
int64_t
slimit
;
int64_t
soffset
;
}
SProjectLogicNode
;
typedef
struct
SVnodeModifLogicNode
{
SLogicNode
node
;
int32_t
msgType
;
SArray
*
pDataBlocks
;
SLogicNode
node
;
int32_t
msgType
;
SArray
*
pDataBlocks
;
SVgDataBlocks
*
pVgDataBlocks
;
}
SVnodeModifLogicNode
;
typedef
struct
SExchangeLogicNode
{
SLogicNode
node
;
int32_t
srcGroupId
;
uint8_t
precision
;
int32_t
srcGroupId
;
uint8_t
precision
;
}
SExchangeLogicNode
;
typedef
enum
EWindowType
{
WINDOW_TYPE_INTERVAL
=
1
,
WINDOW_TYPE_SESSION
,
WINDOW_TYPE_STATE
}
EWindowType
;
typedef
enum
EWindowType
{
WINDOW_TYPE_INTERVAL
=
1
,
WINDOW_TYPE_SESSION
,
WINDOW_TYPE_STATE
}
EWindowType
;
typedef
struct
SWindowLogicNode
{
SLogicNode
node
;
SLogicNode
node
;
EWindowType
winType
;
SNodeList
*
pFuncs
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
SFillNode
*
pFill
;
int64_t
sessionGap
;
SNode
*
pTspk
;
SNode
*
pStateExpr
;
int8_t
triggerType
;
int64_t
watermark
;
SNodeList
*
pFuncs
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
SFillNode
*
pFill
;
int64_t
sessionGap
;
SNode
*
pTspk
;
SNode
*
pStateExpr
;
int8_t
triggerType
;
int64_t
watermark
;
}
SWindowLogicNode
;
typedef
struct
SSortLogicNode
{
...
...
@@ -137,59 +129,60 @@ typedef enum ESubplanType {
typedef
struct
SSubplanId
{
uint64_t
queryId
;
int32_t
groupId
;
int32_t
subplanId
;
int32_t
groupId
;
int32_t
subplanId
;
}
SSubplanId
;
typedef
struct
SLogicSubplan
{
ENodeType
type
;
SSubplanId
id
;
SNodeList
*
pChildren
;
SNodeList
*
pParents
;
SLogicNode
*
pNode
;
ESubplanType
subplanType
;
ENodeType
type
;
SSubplanId
id
;
SNodeList
*
pChildren
;
SNodeList
*
pParents
;
SLogicNode
*
pNode
;
ESubplanType
subplanType
;
SVgroupsInfo
*
pVgroupList
;
int32_t
level
;
int32_t
splitFlag
;
int32_t
level
;
int32_t
splitFlag
;
}
SLogicSubplan
;
typedef
struct
SQueryLogicPlan
{
ENodeType
type
;
ENodeType
type
;
SNodeList
*
pTopSubplans
;
}
SQueryLogicPlan
;
typedef
struct
SSlotDescNode
{
ENodeType
type
;
int16_t
slotId
;
int16_t
slotId
;
SDataType
dataType
;
bool
reserve
;
bool
output
;
bool
tag
;
bool
reserve
;
bool
output
;
bool
tag
;
}
SSlotDescNode
;
typedef
struct
SDataBlockDescNode
{
ENodeType
type
;
int16_t
dataBlockId
;
ENodeType
type
;
int16_t
dataBlockId
;
SNodeList
*
pSlots
;
int32_t
totalRowSize
;
int32_t
outputRowSize
;
uint8_t
precision
;
int32_t
totalRowSize
;
int32_t
outputRowSize
;
uint8_t
precision
;
}
SDataBlockDescNode
;
typedef
struct
SPhysiNode
{
ENodeType
type
;
ENodeType
type
;
SDataBlockDescNode
*
pOutputDataBlockDesc
;
SNode
*
pConditions
;
SNodeList
*
pChildren
;
struct
SPhysiNode
*
pParent
;
SNode
*
pConditions
;
SNodeList
*
pChildren
;
struct
SPhysiNode
*
pParent
;
}
SPhysiNode
;
typedef
struct
SScanPhysiNode
{
SPhysiNode
node
;
SNodeList
*
pScanCols
;
uint64_t
uid
;
// unique id of the table
int8_t
tableType
;
SName
tableName
;
SNodeList
*
pScanPseudoCols
;
uint64_t
uid
;
// unique id of the table
int8_t
tableType
;
SName
tableName
;
}
SScanPhysiNode
;
typedef
SScanPhysiNode
STagScanPhysiNode
;
...
...
@@ -197,23 +190,23 @@ typedef SScanPhysiNode SStreamScanPhysiNode;
typedef
struct
SSystemTableScanPhysiNode
{
SScanPhysiNode
scan
;
SEpSet
mgmtEpSet
;
bool
showRewrite
;
int32_t
accountId
;
SEpSet
mgmtEpSet
;
bool
showRewrite
;
int32_t
accountId
;
}
SSystemTableScanPhysiNode
;
typedef
struct
STableScanPhysiNode
{
SScanPhysiNode
scan
;
uint8_t
scanSeq
[
2
];
// first is scan count, and second is reverse scan count
STimeWindow
scanRange
;
double
ratio
;
int32_t
dataRequired
;
SNodeList
*
pDynamicScanFuncs
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
uint8_t
scanSeq
[
2
];
// first is scan count, and second is reverse scan count
STimeWindow
scanRange
;
double
ratio
;
int32_t
dataRequired
;
SNodeList
*
pDynamicScanFuncs
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
}
STableScanPhysiNode
;
typedef
STableScanPhysiNode
STableSeqScanPhysiNode
;
...
...
@@ -221,89 +214,89 @@ typedef STableScanPhysiNode STableSeqScanPhysiNode;
typedef
struct
SProjectPhysiNode
{
SPhysiNode
node
;
SNodeList
*
pProjections
;
int64_t
limit
;
int64_t
offset
;
int64_t
slimit
;
int64_t
soffset
;
int64_t
limit
;
int64_t
offset
;
int64_t
slimit
;
int64_t
soffset
;
}
SProjectPhysiNode
;
typedef
struct
SJoinPhysiNode
{
SPhysiNode
node
;
EJoinType
joinType
;
SNode
*
pOnConditions
;
// in or out tuple ?
EJoinType
joinType
;
SNode
*
pOnConditions
;
// in or out tuple ?
SNodeList
*
pTargets
;
}
SJoinPhysiNode
;
typedef
struct
SAggPhysiNode
{
SPhysiNode
node
;
SNodeList
*
pExprs
;
// these are expression list of group_by_clause and parameter expression of aggregate function
SNodeList
*
pExprs
;
// these are expression list of group_by_clause and parameter expression of aggregate function
SNodeList
*
pGroupKeys
;
SNodeList
*
pAggFuncs
;
}
SAggPhysiNode
;
typedef
struct
SDownstreamSourceNode
{
ENodeType
type
;
ENodeType
type
;
SQueryNodeAddr
addr
;
uint64_t
taskId
;
uint64_t
schedId
;
uint64_t
taskId
;
uint64_t
schedId
;
}
SDownstreamSourceNode
;
typedef
struct
SExchangePhysiNode
{
SPhysiNode
node
;
int32_t
srcGroupId
;
// group id of datasource suplans
int32_t
srcGroupId
;
// group id of datasource suplans
SNodeList
*
pSrcEndPoints
;
// element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
}
SExchangePhysiNode
;
typedef
struct
SWinodwPhysiNode
{
SPhysiNode
node
;
SNodeList
*
pExprs
;
// these are expression list of parameter expression of function
SNodeList
*
pExprs
;
// these are expression list of parameter expression of function
SNodeList
*
pFuncs
;
SNode
*
pTspk
;
// timestamp primary key
int8_t
triggerType
;
int64_t
watermark
;
SNode
*
pTspk
;
// timestamp primary key
int8_t
triggerType
;
int64_t
watermark
;
}
SWinodwPhysiNode
;
typedef
struct
SIntervalPhysiNode
{
SWinodwPhysiNode
window
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
SFillNode
*
pFill
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
SFillNode
*
pFill
;
}
SIntervalPhysiNode
;
typedef
struct
SMultiTableIntervalPhysiNode
{
SIntervalPhysiNode
interval
;
SNodeList
*
pPartitionKeys
;
SNodeList
*
pPartitionKeys
;
}
SMultiTableIntervalPhysiNode
;
typedef
struct
SSessionWinodwPhysiNode
{
SWinodwPhysiNode
window
;
int64_t
gap
;
int64_t
gap
;
}
SSessionWinodwPhysiNode
;
typedef
struct
SStateWinodwPhysiNode
{
SWinodwPhysiNode
window
;
SNode
*
pStateKey
;
SNode
*
pStateKey
;
}
SStateWinodwPhysiNode
;
typedef
struct
SSortPhysiNode
{
SPhysiNode
node
;
SNodeList
*
pExprs
;
// these are expression list of order_by_clause and parameter expression of aggregate function
SNodeList
*
pSortKeys
;
// element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
SNodeList
*
pExprs
;
// these are expression list of order_by_clause and parameter expression of aggregate function
SNodeList
*
pSortKeys
;
// element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
SNodeList
*
pTargets
;
}
SSortPhysiNode
;
typedef
struct
SPartitionPhysiNode
{
SPhysiNode
node
;
SNodeList
*
pExprs
;
// these are expression list of partition_by_clause
SNodeList
*
pExprs
;
// these are expression list of partition_by_clause
SNodeList
*
pPartitionKeys
;
SNodeList
*
pTargets
;
}
SPartitionPhysiNode
;
typedef
struct
SDataSinkNode
{
ENodeType
type
;
ENodeType
type
;
SDataBlockDescNode
*
pInputDataBlockDesc
;
}
SDataSinkNode
;
...
...
@@ -313,45 +306,41 @@ typedef struct SDataDispatcherNode {
typedef
struct
SDataInserterNode
{
SDataSinkNode
sink
;
int32_t
numOfTables
;
uint32_t
size
;
char
*
pData
;
int32_t
numOfTables
;
uint32_t
size
;
char
*
pData
;
}
SDataInserterNode
;
typedef
struct
SSubplan
{
ENodeType
type
;
SSubplanId
id
;
// unique id of the subplan
ESubplanType
subplanType
;
int32_t
msgType
;
// message type for subplan, used to denote the send message type to vnode.
int32_t
level
;
// the execution level of current subplan, starting from 0 in a top-down manner.
char
dbFName
[
TSDB_DB_FNAME_LEN
];
SQueryNodeAddr
execNode
;
// for the scan/modify subplan, the optional execution node
SQueryNodeStat
execNodeStat
;
// only for scan subplan
SNodeList
*
pChildren
;
// the datasource subplan,from which to fetch the result
SNodeList
*
pParents
;
// the data destination subplan, get data from current subplan
SPhysiNode
*
pNode
;
// physical plan of current subplan
SDataSinkNode
*
pDataSink
;
// data of the subplan flow into the datasink
ENodeType
type
;
SSubplanId
id
;
// unique id of the subplan
ESubplanType
subplanType
;
int32_t
msgType
;
// message type for subplan, used to denote the send message type to vnode.
int32_t
level
;
// the execution level of current subplan, starting from 0 in a top-down manner.
char
dbFName
[
TSDB_DB_FNAME_LEN
];
SQueryNodeAddr
execNode
;
// for the scan/modify subplan, the optional execution node
SQueryNodeStat
execNodeStat
;
// only for scan subplan
SNodeList
*
pChildren
;
// the datasource subplan,from which to fetch the result
SNodeList
*
pParents
;
// the data destination subplan, get data from current subplan
SPhysiNode
*
pNode
;
// physical plan of current subplan
SDataSinkNode
*
pDataSink
;
// data of the subplan flow into the datasink
}
SSubplan
;
typedef
enum
EExplainMode
{
EXPLAIN_MODE_DISABLE
=
1
,
EXPLAIN_MODE_STATIC
,
EXPLAIN_MODE_ANALYZE
}
EExplainMode
;
typedef
enum
EExplainMode
{
EXPLAIN_MODE_DISABLE
=
1
,
EXPLAIN_MODE_STATIC
,
EXPLAIN_MODE_ANALYZE
}
EExplainMode
;
typedef
struct
SExplainInfo
{
EExplainMode
mode
;
bool
verbose
;
double
ratio
;
bool
verbose
;
double
ratio
;
}
SExplainInfo
;
typedef
struct
SQueryPlan
{
ENodeType
type
;
uint64_t
queryId
;
int32_t
numOfSubplans
;
SNodeList
*
pSubplans
;
// Element is SNodeListNode. The execution level of subplan, starting from 0.
ENodeType
type
;
uint64_t
queryId
;
int32_t
numOfSubplans
;
SNodeList
*
pSubplans
;
// Element is SNodeListNode. The execution level of subplan, starting from 0.
SExplainInfo
explainInfo
;
SNodeList
*
pPlaceholderValues
;
SNodeList
*
pPlaceholderValues
;
}
SQueryPlan
;
void
nodesWalkPhysiPlan
(
SNode
*
pNode
,
FNodeWalker
walker
,
void
*
pContext
);
...
...
include/libs/nodes/querynodes.h
浏览文件 @
928c60e2
...
...
@@ -293,7 +293,10 @@ typedef struct SExplainStmt {
void
nodesWalkSelectStmt
(
SSelectStmt
*
pSelect
,
ESqlClause
clause
,
FNodeWalker
walker
,
void
*
pContext
);
void
nodesRewriteSelectStmt
(
SSelectStmt
*
pSelect
,
ESqlClause
clause
,
FNodeRewriter
rewriter
,
void
*
pContext
);
int32_t
nodesCollectColumns
(
SSelectStmt
*
pSelect
,
ESqlClause
clause
,
const
char
*
pTableAlias
,
SNodeList
**
pCols
);
typedef
enum
ECollectColType
{
COLLECT_COL_TYPE_COL
=
1
,
COLLECT_COL_TYPE_TAG
,
COLLECT_COL_TYPE_ALL
}
ECollectColType
;
int32_t
nodesCollectColumns
(
SSelectStmt
*
pSelect
,
ESqlClause
clause
,
const
char
*
pTableAlias
,
ECollectColType
type
,
SNodeList
**
pCols
);
typedef
bool
(
*
FFuncClassifier
)(
int32_t
funcId
);
int32_t
nodesCollectFuncs
(
SSelectStmt
*
pSelect
,
FFuncClassifier
classifier
,
SNodeList
**
pFuncs
);
...
...
source/common/src/tdatablock.c
浏览文件 @
928c60e2
...
...
@@ -311,6 +311,8 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
memcpy
(
pColumnInfoData
->
pData
,
pSource
->
pData
,
pSource
->
info
.
bytes
*
numOfRows
);
}
pColumnInfoData
->
hasNull
=
pSource
->
hasNull
;
pColumnInfoData
->
info
=
pSource
->
info
;
return
0
;
}
...
...
source/common/src/tmsg.c
浏览文件 @
928c60e2
...
...
@@ -517,7 +517,6 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if
(
tEncodeI32
(
&
encoder
,
pReq
->
ttl
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfColumns
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfTags
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfSmas
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
commentLen
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
ast1Len
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
ast2Len
)
<
0
)
return
-
1
;
...
...
@@ -527,6 +526,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if
(
tEncodeI8
(
&
encoder
,
pField
->
type
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pField
->
bytes
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pField
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pField
->
flags
)
<
0
)
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfTags
;
++
i
)
{
...
...
@@ -534,13 +534,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if
(
tEncodeI8
(
&
encoder
,
pField
->
type
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pField
->
bytes
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pField
->
name
)
<
0
)
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfSmas
;
++
i
)
{
SField
*
pField
=
taosArrayGet
(
pReq
->
pSmas
,
i
);
if
(
tEncodeI8
(
&
encoder
,
pField
->
type
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pField
->
bytes
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pField
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pField
->
flags
)
<
0
)
return
-
1
;
}
if
(
pReq
->
commentLen
>
0
)
{
...
...
@@ -571,15 +565,13 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
ttl
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfColumns
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfTags
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfSmas
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
commentLen
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
ast1Len
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
ast2Len
)
<
0
)
return
-
1
;
pReq
->
pColumns
=
taosArrayInit
(
pReq
->
numOfColumns
,
sizeof
(
SField
));
pReq
->
pTags
=
taosArrayInit
(
pReq
->
numOfTags
,
sizeof
(
SField
));
pReq
->
pSmas
=
taosArrayInit
(
pReq
->
numOfSmas
,
sizeof
(
SField
));
if
(
pReq
->
pColumns
==
NULL
||
pReq
->
pTags
==
NULL
||
pReq
->
pSmas
==
NULL
)
{
if
(
pReq
->
pColumns
==
NULL
||
pReq
->
pTags
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
...
...
@@ -589,6 +581,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if
(
tDecodeI8
(
&
decoder
,
&
field
.
type
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
field
.
bytes
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
field
.
name
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
field
.
flags
)
<
0
)
return
-
1
;
if
(
taosArrayPush
(
pReq
->
pColumns
,
&
field
)
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
@@ -600,23 +593,13 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if
(
tDecodeI8
(
&
decoder
,
&
field
.
type
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
field
.
bytes
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
field
.
name
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
field
.
flags
)
<
0
)
return
-
1
;
if
(
taosArrayPush
(
pReq
->
pTags
,
&
field
)
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
}
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfSmas
;
++
i
)
{
SField
field
=
{
0
};
if
(
tDecodeI8
(
&
decoder
,
&
field
.
type
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
field
.
bytes
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
field
.
name
)
<
0
)
return
-
1
;
if
(
taosArrayPush
(
pReq
->
pSmas
,
&
field
)
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
}
if
(
pReq
->
commentLen
>
0
)
{
pReq
->
comment
=
taosMemoryMalloc
(
pReq
->
commentLen
);
if
(
pReq
->
comment
==
NULL
)
return
-
1
;
...
...
@@ -644,13 +627,11 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
void
tFreeSMCreateStbReq
(
SMCreateStbReq
*
pReq
)
{
taosArrayDestroy
(
pReq
->
pColumns
);
taosArrayDestroy
(
pReq
->
pTags
);
taosArrayDestroy
(
pReq
->
pSmas
);
taosMemoryFreeClear
(
pReq
->
comment
);
taosMemoryFreeClear
(
pReq
->
pAst1
);
taosMemoryFreeClear
(
pReq
->
pAst2
);
pReq
->
pColumns
=
NULL
;
pReq
->
pTags
=
NULL
;
pReq
->
pSmas
=
NULL
;
}
int32_t
tSerializeSMDropStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMDropStbReq
*
pReq
)
{
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
928c60e2
...
...
@@ -358,13 +358,11 @@ typedef struct {
int32_t
ttl
;
int32_t
numOfColumns
;
int32_t
numOfTags
;
int32_t
numOfSmas
;
int32_t
commentLen
;
int32_t
ast1Len
;
int32_t
ast2Len
;
SSchema
*
pColumns
;
SSchema
*
pTags
;
SSchema
*
pSmas
;
char
*
comment
;
char
*
pAst1
;
char
*
pAst2
;
...
...
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
928c60e2
...
...
@@ -40,12 +40,12 @@ extern "C" {
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_COL_NAME_LEN
((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
typedef
int32_t
(
*
MndMsgFp
)(
SNodeMsg
*
pMsg
);
typedef
int32_t
(
*
MndInitFp
)(
SMnode
*
pMnode
);
typedef
void
(
*
MndCleanupFp
)(
SMnode
*
pMnode
);
typedef
int32_t
(
*
ShowRetrieveFp
)(
SNodeMsg
*
pMsg
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
typedef
int32_t
(
*
ShowRetrieveFp
)(
SNodeMsg
*
pMsg
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
typedef
void
(
*
ShowFreeIterFp
)(
SMnode
*
pMnode
,
void
*
pIter
);
typedef
struct
SQWorkerMgmt
SQHandle
;
...
...
@@ -84,32 +84,32 @@ typedef struct {
int64_t
timeseriesAllowed
;
}
SGrantInfo
;
struct
SMnode
{
int32_t
selfId
;
int64_t
clusterId
;
int8_t
replica
;
int8_t
selfIndex
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
tmr_h
timer
;
tmr_h
transTimer
;
tmr_h
mqTimer
;
tmr_h
telemTimer
;
char
*
path
;
int64_t
checkTime
;
SSdb
*
pSdb
;
SMgmtWrapper
*
pWrapper
;
SArray
*
pSteps
;
SQHandle
*
pQuery
;
SShowMgmt
showMgmt
;
SProfileMgmt
profileMgmt
;
STelemMgmt
telemMgmt
;
SSyncMgmt
syncMgmt
;
SHashObj
*
infosMeta
;
SHashObj
*
perfsMeta
;
SGrantInfo
grant
;
MndMsgFp
msgFp
[
TDMT_MAX
];
SMsgCb
msgCb
;
};
typedef
struct
SMnode
{
int32_t
selfId
;
int64_t
clusterId
;
int8_t
replica
;
int8_t
selfIndex
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
tmr_h
timer
;
tmr_h
transTimer
;
tmr_h
mqTimer
;
tmr_h
telemTimer
;
char
*
path
;
int64_t
checkTime
;
SSdb
*
pSdb
;
SMgmtWrapper
*
pWrapper
;
SArray
*
pSteps
;
SQHandle
*
pQuery
;
SShowMgmt
showMgmt
;
SProfileMgmt
profileMgmt
;
STelemMgmt
telemMgmt
;
SSyncMgmt
syncMgmt
;
SHashObj
*
infosMeta
;
SHashObj
*
perfsMeta
;
SGrantInfo
grant
;
MndMsgFp
msgFp
[
TDMT_MAX
];
SMsgCb
msgCb
;
}
SMnode
;
void
mndSetMsgHandle
(
SMnode
*
pMnode
,
tmsg_t
msgType
,
MndMsgFp
fp
);
int64_t
mndGenerateUid
(
char
*
name
,
int32_t
len
);
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
928c60e2
...
...
@@ -72,8 +72,8 @@ void mndCleanupStb(SMnode *pMnode) {}
SSdbRaw
*
mndStbActionEncode
(
SStbObj
*
pStb
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
size
=
sizeof
(
SStbObj
)
+
(
pStb
->
numOfColumns
+
pStb
->
numOfTags
+
pStb
->
numOfSmas
)
*
sizeof
(
SSchema
)
+
+
pStb
->
commentLen
+
pStb
->
ast1Len
+
pStb
->
ast2Len
+
TSDB_STB_RESERVE_SIZE
;
int32_t
size
=
sizeof
(
SStbObj
)
+
(
pStb
->
numOfColumns
+
pStb
->
numOfTags
)
*
sizeof
(
SSchema
)
+
+
pStb
->
commentLen
+
pStb
->
ast1Len
+
pStb
->
ast2Len
+
TSDB_STB_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_STB
,
TSDB_STB_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
goto
_OVER
;
...
...
@@ -91,7 +91,6 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
ttl
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
numOfColumns
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
numOfTags
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
numOfSmas
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
commentLen
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
ast1Len
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pStb
->
ast2Len
,
_OVER
)
...
...
@@ -112,14 +111,6 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pSchema
->
name
,
TSDB_COL_NAME_LEN
,
_OVER
)
}
for
(
int32_t
i
=
0
;
i
<
pStb
->
numOfSmas
;
++
i
)
{
SSchema
*
pSchema
=
&
pStb
->
pSmas
[
i
];
SDB_SET_INT8
(
pRaw
,
dataPos
,
pSchema
->
type
,
_OVER
)
SDB_SET_INT16
(
pRaw
,
dataPos
,
pSchema
->
colId
,
_OVER
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pSchema
->
bytes
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pSchema
->
name
,
TSDB_COL_NAME_LEN
,
_OVER
)
}
if
(
pStb
->
commentLen
>
0
)
{
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pStb
->
comment
,
pStb
->
commentLen
,
_OVER
)
}
...
...
@@ -178,15 +169,13 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
ttl
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
numOfColumns
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
numOfTags
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
numOfSmas
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
commentLen
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
ast1Len
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pStb
->
ast2Len
,
_OVER
)
pStb
->
pColumns
=
taosMemoryCalloc
(
pStb
->
numOfColumns
,
sizeof
(
SSchema
));
pStb
->
pTags
=
taosMemoryCalloc
(
pStb
->
numOfTags
,
sizeof
(
SSchema
));
pStb
->
pSmas
=
taosMemoryCalloc
(
pStb
->
numOfSmas
,
sizeof
(
SSchema
));
if
(
pStb
->
pColumns
==
NULL
||
pStb
->
pTags
==
NULL
||
pStb
->
pSmas
==
NULL
)
{
if
(
pStb
->
pColumns
==
NULL
||
pStb
->
pTags
==
NULL
)
{
goto
_OVER
;
}
...
...
@@ -206,14 +195,6 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pSchema
->
name
,
TSDB_COL_NAME_LEN
,
_OVER
)
}
for
(
int32_t
i
=
0
;
i
<
pStb
->
numOfSmas
;
++
i
)
{
SSchema
*
pSchema
=
&
pStb
->
pSmas
[
i
];
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pSchema
->
type
,
_OVER
)
SDB_GET_INT16
(
pRaw
,
dataPos
,
&
pSchema
->
colId
,
_OVER
)
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pSchema
->
bytes
,
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pSchema
->
name
,
TSDB_COL_NAME_LEN
,
_OVER
)
}
if
(
pStb
->
commentLen
>
0
)
{
pStb
->
comment
=
taosMemoryCalloc
(
pStb
->
commentLen
,
1
);
if
(
pStb
->
comment
==
NULL
)
goto
_OVER
;
...
...
@@ -291,18 +272,6 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
}
}
if
(
pOld
->
numOfSmas
<
pNew
->
numOfSmas
)
{
void
*
pSmas
=
taosMemoryMalloc
(
pNew
->
numOfSmas
*
sizeof
(
SSchema
));
if
(
pSmas
!=
NULL
)
{
taosMemoryFree
(
pOld
->
pSmas
);
pOld
->
pSmas
=
pSmas
;
}
else
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
mTrace
(
"stb:%s, failed to perform update action since %s"
,
pOld
->
name
,
terrstr
());
taosWUnLockLatch
(
&
pOld
->
lock
);
}
}
if
(
pOld
->
commentLen
<
pNew
->
commentLen
)
{
void
*
comment
=
taosMemoryMalloc
(
pNew
->
commentLen
);
if
(
comment
!=
NULL
)
{
...
...
@@ -411,11 +380,6 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req
.
schemaTag
.
nCols
=
pStb
->
numOfTags
;
req
.
schemaTag
.
pSchema
=
pStb
->
pTags
;
// TODO: remove here
for
(
int
iCol
=
0
;
iCol
<
req
.
schema
.
nCols
;
iCol
++
)
{
req
.
schema
.
pSchema
[
iCol
].
flags
=
SCHEMA_SMA_ON
;
}
if
(
req
.
rollup
)
{
req
.
pRSmaParam
.
xFilesFactor
=
pStb
->
xFilesFactor
;
req
.
pRSmaParam
.
delay
=
pStb
->
delay
;
...
...
@@ -674,7 +638,6 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
stbObj
.
ttl
=
pCreate
->
ttl
;
stbObj
.
numOfColumns
=
pCreate
->
numOfColumns
;
stbObj
.
numOfTags
=
pCreate
->
numOfTags
;
stbObj
.
numOfSmas
=
pCreate
->
numOfSmas
;
stbObj
.
commentLen
=
pCreate
->
commentLen
;
if
(
stbObj
.
commentLen
>
0
)
{
stbObj
.
comment
=
taosMemoryCalloc
(
stbObj
.
commentLen
,
1
);
...
...
@@ -707,8 +670,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
stbObj
.
pColumns
=
taosMemoryMalloc
(
stbObj
.
numOfColumns
*
sizeof
(
SSchema
));
stbObj
.
pTags
=
taosMemoryMalloc
(
stbObj
.
numOfTags
*
sizeof
(
SSchema
));
stbObj
.
pSmas
=
taosMemoryMalloc
(
stbObj
.
numOfSmas
*
sizeof
(
SSchema
));
if
(
stbObj
.
pColumns
==
NULL
||
stbObj
.
pTags
==
NULL
||
stbObj
.
pSmas
==
NULL
)
{
if
(
stbObj
.
pColumns
==
NULL
||
stbObj
.
pTags
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
...
...
@@ -718,6 +680,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
SSchema
*
pSchema
=
&
stbObj
.
pColumns
[
i
];
pSchema
->
type
=
pField
->
type
;
pSchema
->
bytes
=
pField
->
bytes
;
pSchema
->
flags
=
pField
->
flags
;
memcpy
(
pSchema
->
name
,
pField
->
name
,
TSDB_COL_NAME_LEN
);
pSchema
->
colId
=
stbObj
.
nextColId
;
stbObj
.
nextColId
++
;
...
...
@@ -733,18 +696,6 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
stbObj
.
nextColId
++
;
}
for
(
int32_t
i
=
0
;
i
<
stbObj
.
numOfSmas
;
++
i
)
{
SField
*
pField
=
taosArrayGet
(
pCreate
->
pSmas
,
i
);
SSchema
*
pSchema
=
&
stbObj
.
pSmas
[
i
];
SSchema
*
pColSchema
=
mndFindStbColumns
(
&
stbObj
,
pField
->
name
);
if
(
pColSchema
==
NULL
)
{
mError
(
"stb:%s, sma:%s not found in columns"
,
stbObj
.
name
,
pField
->
name
);
terrno
=
TSDB_CODE_MND_INVALID_STB_OPTION
;
return
-
1
;
}
memcpy
(
pSchema
,
pColSchema
,
sizeof
(
SSchema
));
}
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_CREATE_STB
,
&
pReq
->
rpcMsg
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
928c60e2
...
...
@@ -22,13 +22,15 @@ static int32_t mndInitWal(SMnode *pMnode) {
char
path
[
PATH_MAX
]
=
{
0
};
snprintf
(
path
,
sizeof
(
path
),
"%s%swal"
,
pMnode
->
path
,
TD_DIRSEP
);
SWalCfg
cfg
=
{.
vgId
=
1
,
.
fsyncPeriod
=
0
,
.
rollPeriod
=
-
1
,
.
segSize
=
-
1
,
.
retentionPeriod
=
-
1
,
.
retentionSize
=
-
1
,
.
level
=
TAOS_WAL_FSYNC
};
SWalCfg
cfg
=
{
.
vgId
=
1
,
.
fsyncPeriod
=
0
,
.
rollPeriod
=
-
1
,
.
segSize
=
-
1
,
.
retentionPeriod
=
-
1
,
.
retentionSize
=
-
1
,
.
level
=
TAOS_WAL_FSYNC
,
};
pMgmt
->
pWal
=
walOpen
(
path
,
&
cfg
);
if
(
pMgmt
->
pWal
==
NULL
)
return
-
1
;
...
...
@@ -54,62 +56,62 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
int64_t
first
=
walGetFirstVer
(
pWal
);
int64_t
last
=
walGetLastVer
(
pWal
);
mDebug
(
"start to restore
sdb wal, sdb ver:%"
PRId64
", wal
first:%"
PRId64
" last:%"
PRId64
,
lastSdbVer
,
first
,
last
);
mDebug
(
"start to restore
wal, sdbver:%"
PRId64
",
first:%"
PRId64
" last:%"
PRId64
,
lastSdbVer
,
first
,
last
);
first
=
TMAX
(
lastSdbVer
+
1
,
first
);
for
(
int64_t
ver
=
first
;
ver
>=
0
&&
ver
<=
last
;
++
ver
)
{
if
(
walReadWithHandle
(
pHandle
,
ver
)
<
0
)
{
mError
(
"
failed to read by wal handle since %s, ver:%"
PRId64
,
terrstr
(),
ver
);
goto
WAL_RESTORE
_OVER
;
mError
(
"
ver:%"
PRId64
", failed to read from wal since %s"
,
ver
,
terrstr
()
);
goto
_OVER
;
}
SWalHead
*
pHead
=
pHandle
->
pHead
;
int64_t
sdbVer
=
sdbUpdateVer
(
pSdb
,
0
);
if
(
sdbVer
+
1
!=
ver
)
{
terrno
=
TSDB_CODE_SDB_INVALID_WAl_VER
;
mError
(
"
failed to read wal from sdb, sdbVer:%"
PRId64
" inconsistent with ver:%"
PRId64
,
sdbVer
,
v
er
);
goto
WAL_RESTORE
_OVER
;
mError
(
"
ver:%"
PRId64
", failed to write to sdb, since inconsistent with sdbver:%"
PRId64
,
ver
,
sdbV
er
);
goto
_OVER
;
}
mTrace
(
"
wal
:%"
PRId64
", will be restored, content:%p"
,
ver
,
pHead
->
head
.
body
);
mTrace
(
"
ver
:%"
PRId64
", will be restored, content:%p"
,
ver
,
pHead
->
head
.
body
);
if
(
sdbWriteWithoutFree
(
pSdb
,
(
void
*
)
pHead
->
head
.
body
)
<
0
)
{
mError
(
"
failed to read wal from sdb since %s, ver:%"
PRId64
,
terrstr
(),
ver
);
goto
WAL_RESTORE
_OVER
;
mError
(
"
ver:%"
PRId64
", failed to write to sdb since %s"
,
ver
,
terrstr
()
);
goto
_OVER
;
}
sdbUpdateVer
(
pSdb
,
1
);
mDebug
(
"
wal
:%"
PRId64
", is restored"
,
ver
);
mDebug
(
"
ver
:%"
PRId64
", is restored"
,
ver
);
}
int64_t
sdbVer
=
sdbUpdateVer
(
pSdb
,
0
);
mDebug
(
"restore
sdb wal finished, sdb
ver:%"
PRId64
,
sdbVer
);
mDebug
(
"restore
wal finished, sdb
ver:%"
PRId64
,
sdbVer
);
mndTransPullup
(
pMnode
);
sdbVer
=
sdbUpdateVer
(
pSdb
,
0
);
mDebug
(
"pullup trans finished, sdb
ver:%"
PRId64
,
sdbVer
);
mDebug
(
"pullup trans finished, sdbver:%"
PRId64
,
sdbVer
);
if
(
sdbVer
!=
lastSdbVer
)
{
mInfo
(
"sdb restored from %"
PRId64
" to %"
PRId64
", write file"
,
lastSdbVer
,
sdbVer
);
if
(
sdbWriteFile
(
pSdb
)
!=
0
)
{
goto
WAL_RESTORE
_OVER
;
goto
_OVER
;
}
if
(
walCommit
(
pWal
,
sdbVer
)
!=
0
)
{
goto
WAL_RESTORE
_OVER
;
goto
_OVER
;
}
if
(
walBeginSnapshot
(
pWal
,
sdbVer
)
<
0
)
{
goto
WAL_RESTORE
_OVER
;
goto
_OVER
;
}
if
(
walEndSnapshot
(
pWal
)
<
0
)
{
goto
WAL_RESTORE
_OVER
;
goto
_OVER
;
}
}
code
=
0
;
WAL_RESTORE
_OVER:
_OVER:
walCloseReadHandle
(
pHandle
);
return
code
;
}
...
...
@@ -158,11 +160,11 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
int64_t
ver
=
sdbUpdateVer
(
pSdb
,
1
);
if
(
walWrite
(
pWal
,
ver
,
1
,
pRaw
,
sdbGetRawTotalSize
(
pRaw
))
<
0
)
{
sdbUpdateVer
(
pSdb
,
-
1
);
mError
(
"
failed to write raw:%p since %s, ver:%"
PRId64
,
pRaw
,
terrstr
(),
ver
);
mError
(
"
ver:%"
PRId64
", failed to write raw:%p to wal since %s"
,
ver
,
pRaw
,
terrstr
()
);
return
-
1
;
}
mTrace
(
"
raw:%p, write to wal, ver:%"
PRId64
,
pRaw
,
ver
);
mTrace
(
"
ver:%"
PRId64
", write to wal, raw:%p"
,
ver
,
pRaw
);
walCommit
(
pWal
,
ver
);
walFsync
(
pWal
,
true
);
...
...
source/dnode/mnode/impl/src/mndTelem.c
浏览文件 @
928c60e2
...
...
@@ -146,7 +146,6 @@ int32_t mndInitTelem(SMnode* pMnode) {
taosGetEmail
(
pMgmt
->
email
,
sizeof
(
pMgmt
->
email
));
mndSetMsgHandle
(
pMnode
,
TDMT_MND_TELEM_TIMER
,
mndProcessTelemTimer
);
mDebug
(
"mnode telemetry is initialized"
);
return
0
;
}
...
...
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
928c60e2
...
...
@@ -21,8 +21,8 @@
#include "mndTrans.h"
#include "tbase64.h"
#define
TSDB_
USER_VER_NUMBER 1
#define
TSDB_
USER_RESERVE_SIZE 64
#define USER_VER_NUMBER 1
#define USER_RESERVE_SIZE 64
static
int32_t
mndCreateDefaultUsers
(
SMnode
*
pMnode
);
static
SSdbRaw
*
mndUserActionEncode
(
SUserObj
*
pUser
);
...
...
@@ -35,7 +35,7 @@ static int32_t mndProcessCreateUserReq(SNodeMsg *pReq);
static
int32_t
mndProcessAlterUserReq
(
SNodeMsg
*
pReq
);
static
int32_t
mndProcessDropUserReq
(
SNodeMsg
*
pReq
);
static
int32_t
mndProcessGetUserAuthReq
(
SNodeMsg
*
pReq
);
static
int32_t
mndRetrieveUsers
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
int32_t
mndRetrieveUsers
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextUser
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitUser
(
SMnode
*
pMnode
)
{
...
...
@@ -93,9 +93,9 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
int32_t
numOfReadDbs
=
taosHashGetSize
(
pUser
->
readDbs
);
int32_t
numOfWriteDbs
=
taosHashGetSize
(
pUser
->
writeDbs
);
int32_t
size
=
sizeof
(
SUserObj
)
+
TSDB_
USER_RESERVE_SIZE
+
(
numOfReadDbs
+
numOfWriteDbs
)
*
TSDB_DB_FNAME_LEN
;
int32_t
size
=
sizeof
(
SUserObj
)
+
USER_RESERVE_SIZE
+
(
numOfReadDbs
+
numOfWriteDbs
)
*
TSDB_DB_FNAME_LEN
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_USER
,
TSDB_
USER_VER_NUMBER
,
size
);
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_USER
,
USER_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
goto
USER_ENCODE_OVER
;
int32_t
dataPos
=
0
;
...
...
@@ -120,7 +120,7 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
db
=
taosHashIterate
(
pUser
->
writeDbs
,
db
);
}
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_
USER_RESERVE_SIZE
,
USER_ENCODE_OVER
)
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
USER_RESERVE_SIZE
,
USER_ENCODE_OVER
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
USER_ENCODE_OVER
)
terrno
=
0
;
...
...
@@ -142,7 +142,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
goto
USER_DECODE_OVER
;
if
(
sver
!=
TSDB_
USER_VER_NUMBER
)
{
if
(
sver
!=
USER_VER_NUMBER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
goto
USER_DECODE_OVER
;
}
...
...
@@ -184,7 +184,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
taosHashPut
(
pUser
->
writeDbs
,
db
,
len
,
db
,
TSDB_DB_FNAME_LEN
);
}
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
TSDB_
USER_RESERVE_SIZE
,
USER_DECODE_OVER
)
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
USER_RESERVE_SIZE
,
USER_DECODE_OVER
)
terrno
=
0
;
...
...
@@ -639,7 +639,7 @@ GET_AUTH_OVER:
return
code
;
}
static
int32_t
mndRetrieveUsers
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
static
int32_t
mndRetrieveUsers
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
...
...
@@ -652,29 +652,29 @@ static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pB
if
(
pShow
->
pIter
==
NULL
)
break
;
cols
=
0
;
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
);
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
);
char
name
[
TSDB_USER_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
STR_WITH_MAXSIZE_TO_VARSTR
(
name
,
pUser
->
user
,
pShow
->
bytes
[
cols
]);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
name
,
false
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
name
,
false
);
cols
++
;
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
);
const
char
*
src
=
pUser
->
superUser
?
"super"
:
"normal"
;
char
b
[
10
+
VARSTR_HEADER_SIZE
]
=
{
0
};
const
char
*
src
=
pUser
->
superUser
?
"super"
:
"normal"
;
char
b
[
10
+
VARSTR_HEADER_SIZE
]
=
{
0
};
STR_WITH_SIZE_TO_VARSTR
(
b
,
src
,
strlen
(
src
));
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
b
,
false
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
b
,
false
);
cols
++
;
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pUser
->
createdTime
,
false
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pUser
->
createdTime
,
false
);
cols
++
;
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
);
STR_WITH_MAXSIZE_TO_VARSTR
(
name
,
pUser
->
acct
,
pShow
->
bytes
[
cols
]);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
name
,
false
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
name
,
false
);
numOfRows
++
;
sdbRelease
(
pSdb
,
pUser
);
...
...
source/dnode/mnode/impl/test/sma/sma.cpp
浏览文件 @
928c60e2
...
...
@@ -114,18 +114,15 @@ void* MndTestSma::BuildCreateBSmaStbReq(const char* stbname, int32_t* pContLen)
SMCreateStbReq
createReq
=
{
0
};
createReq
.
numOfColumns
=
3
;
createReq
.
numOfTags
=
1
;
createReq
.
numOfSmas
=
1
;
createReq
.
igExists
=
0
;
createReq
.
pColumns
=
taosArrayInit
(
createReq
.
numOfColumns
,
sizeof
(
SField
));
createReq
.
pTags
=
taosArrayInit
(
createReq
.
numOfTags
,
sizeof
(
SField
));
createReq
.
pSmas
=
taosArrayInit
(
createReq
.
numOfSmas
,
sizeof
(
SField
));
strcpy
(
createReq
.
name
,
stbname
);
PushField
(
createReq
.
pColumns
,
8
,
TSDB_DATA_TYPE_TIMESTAMP
,
"ts"
);
PushField
(
createReq
.
pColumns
,
2
,
TSDB_DATA_TYPE_TINYINT
,
"col1"
);
PushField
(
createReq
.
pColumns
,
8
,
TSDB_DATA_TYPE_BIGINT
,
"col2"
);
PushField
(
createReq
.
pTags
,
2
,
TSDB_DATA_TYPE_TINYINT
,
"tag1"
);
PushField
(
createReq
.
pSmas
,
2
,
TSDB_DATA_TYPE_TINYINT
,
"col1"
);
int32_t
tlen
=
tSerializeSMCreateStbReq
(
NULL
,
0
,
&
createReq
);
void
*
pHead
=
rpcMallocCont
(
tlen
);
...
...
@@ -190,7 +187,7 @@ void* MndTestSma::BuildDropTSmaReq(const char* smaname, int8_t igNotExists, int3
}
TEST_F
(
MndTestSma
,
01
_Create_Show_Meta_Drop_Restart_Stb
)
{
#if 0
#if 0
const char* dbname = "1.d1";
const char* stbname = "1.d1.stb";
const char* smaname = "1.d1.sma";
...
...
@@ -244,7 +241,7 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) {
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 0);
}
#endif
#endif
}
TEST_F
(
MndTestSma
,
02
_Create_Show_Meta_Drop_Restart_BSma
)
{
...
...
@@ -264,7 +261,7 @@ TEST_F(MndTestSma, 02_Create_Show_Meta_Drop_Restart_BSma) {
pReq
=
BuildCreateBSmaStbReq
(
stbname
,
&
contLen
);
pRsp
=
test
.
SendReq
(
TDMT_MND_CREATE_STB
,
pReq
,
contLen
);
ASSERT_EQ
(
pRsp
->
code
,
0
);
test
.
SendShowReq
(
TSDB_MGMT_TABLE_STB
,
"user_stables"
,
dbname
);
test
.
SendShowReq
(
TSDB_MGMT_TABLE_STB
,
"user_stables"
,
dbname
);
EXPECT_EQ
(
test
.
GetShowRows
(),
1
);
}
...
...
@@ -280,7 +277,7 @@ TEST_F(MndTestSma, 02_Create_Show_Meta_Drop_Restart_BSma) {
pReq
=
BuildDropStbReq
(
stbname
,
&
contLen
);
pRsp
=
test
.
SendReq
(
TDMT_MND_DROP_STB
,
pReq
,
contLen
);
ASSERT_EQ
(
pRsp
->
code
,
0
);
test
.
SendShowReq
(
TSDB_MGMT_TABLE_STB
,
"user_stables"
,
dbname
);
test
.
SendShowReq
(
TSDB_MGMT_TABLE_STB
,
"user_stables"
,
dbname
);
EXPECT_EQ
(
test
.
GetShowRows
(),
0
);
}
...
...
source/dnode/mnode/sdb/src/sdb.c
浏览文件 @
928c60e2
...
...
@@ -162,7 +162,4 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
return
0
;
}
int64_t
sdbUpdateVer
(
SSdb
*
pSdb
,
int32_t
val
)
{
pSdb
->
curVer
+=
val
;
return
pSdb
->
curVer
;
}
\ No newline at end of file
int64_t
sdbUpdateVer
(
SSdb
*
pSdb
,
int32_t
val
)
{
return
atomic_add_fetch_64
(
&
pSdb
->
curVer
,
val
);
}
\ No newline at end of file
source/dnode/mnode/sdb/src/sdbFile.c
浏览文件 @
928c60e2
...
...
@@ -28,7 +28,7 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) {
if
(
fp
==
NULL
)
continue
;
if
((
*
fp
)(
pSdb
->
pMnode
)
!=
0
)
{
mError
(
"failed to deploy sdb:%
d since %s"
,
i
,
terrstr
());
mError
(
"failed to deploy sdb:%
s since %s"
,
sdbTableName
(
i
)
,
terrstr
());
return
-
1
;
}
}
...
...
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
928c60e2
...
...
@@ -123,7 +123,8 @@ _exit:
int32_t
vnodeGetLoad
(
SVnode
*
pVnode
,
SVnodeLoad
*
pLoad
)
{
pLoad
->
vgId
=
TD_VID
(
pVnode
);
pLoad
->
syncState
=
TAOS_SYNC_STATE_LEADER
;
//pLoad->syncState = TAOS_SYNC_STATE_LEADER;
pLoad
->
syncState
=
syncGetMyRole
(
pVnode
->
sync
);
// sync integration
pLoad
->
numOfTables
=
metaGetTbNum
(
pVnode
->
pMeta
);
pLoad
->
numOfTimeSeries
=
400
;
pLoad
->
totalStorage
=
300
;
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
928c60e2
...
...
@@ -220,7 +220,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
// The first row of a new block does not belongs to the previous existed group
if
(
!
equal
&&
j
==
0
)
{
if
(
j
==
0
)
{
num
++
;
recordNewGroupKeys
(
pInfo
->
pGroupCols
,
pInfo
->
pGroupColVals
,
pBlock
,
j
,
numOfGroupCols
);
continue
;
...
...
source/libs/function/inc/functionMgtInt.h
浏览文件 @
928c60e2
...
...
@@ -24,7 +24,7 @@ extern "C" {
#define FUNCTION_NAME_MAX_LENGTH 32
#define FUNC_MGT_FUNC_CLASSIFICATION_MASK(n)
(1 << n)
#define FUNC_MGT_FUNC_CLASSIFICATION_MASK(n) (1 << n)
#define FUNC_MGT_AGG_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(0)
#define FUNC_MGT_SCALAR_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(1)
...
...
@@ -38,6 +38,7 @@ extern "C" {
#define FUNC_MGT_SPECIAL_DATA_REQUIRED FUNC_MGT_FUNC_CLASSIFICATION_MASK(9)
#define FUNC_MGT_DYNAMIC_SCAN_OPTIMIZED FUNC_MGT_FUNC_CLASSIFICATION_MASK(10)
#define FUNC_MGT_MULTI_RES_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(11)
#define FUNC_MGT_SCAN_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(12)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
...
...
source/libs/function/src/builtins.c
浏览文件 @
928c60e2
...
...
@@ -807,7 +807,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
finalizeFunc
=
NULL
},
{.
name
=
"tbname"
,
.
type
=
FUNCTION_TYPE_TBNAME
,
.
classification
=
FUNC_MGT_PSEUDO_COLUMN_FUNC
,
.
classification
=
FUNC_MGT_PSEUDO_COLUMN_FUNC
|
FUNC_MGT_SCAN_PC_FUNC
,
.
translateFunc
=
translateTbnameColumn
,
.
getEnvFunc
=
NULL
,
.
initFunc
=
NULL
,
...
...
source/libs/function/src/functionMgt.c
浏览文件 @
928c60e2
...
...
@@ -15,12 +15,12 @@
#include "functionMgt.h"
#include "builtins.h"
#include "catalog.h"
#include "functionMgtInt.h"
#include "taos.h"
#include "taoserror.h"
#include "thash.h"
#include "builtins.h"
#include "catalog.h"
typedef
struct
SFuncMgtService
{
SHashObj
*
pFuncNameHashTable
;
...
...
@@ -28,22 +28,24 @@ typedef struct SFuncMgtService {
typedef
struct
SUdfInfo
{
SDataType
outputDt
;
int8_t
funcType
;
int8_t
funcType
;
}
SUdfInfo
;
static
SFuncMgtService
gFunMgtService
;
static
TdThreadOnce
functionHashTableInit
=
PTHREAD_ONCE_INIT
;
static
int32_t
initFunctionCode
=
0
;
static
TdThreadOnce
functionHashTableInit
=
PTHREAD_ONCE_INIT
;
static
int32_t
initFunctionCode
=
0
;
static
void
doInitFunctionTable
()
{
gFunMgtService
.
pFuncNameHashTable
=
taosHashInit
(
funcMgtBuiltinsNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
gFunMgtService
.
pFuncNameHashTable
=
taosHashInit
(
funcMgtBuiltinsNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
if
(
NULL
==
gFunMgtService
.
pFuncNameHashTable
)
{
initFunctionCode
=
TSDB_CODE_FAILED
;
return
;
}
for
(
int32_t
i
=
0
;
i
<
funcMgtBuiltinsNum
;
++
i
)
{
if
(
TSDB_CODE_SUCCESS
!=
taosHashPut
(
gFunMgtService
.
pFuncNameHashTable
,
funcMgtBuiltins
[
i
].
name
,
strlen
(
funcMgtBuiltins
[
i
].
name
),
&
i
,
sizeof
(
int32_t
)))
{
if
(
TSDB_CODE_SUCCESS
!=
taosHashPut
(
gFunMgtService
.
pFuncNameHashTable
,
funcMgtBuiltins
[
i
].
name
,
strlen
(
funcMgtBuiltins
[
i
].
name
),
&
i
,
sizeof
(
int32_t
)))
{
initFunctionCode
=
TSDB_CODE_FAILED
;
return
;
}
...
...
@@ -52,8 +54,9 @@ static void doInitFunctionTable() {
static
bool
isSpecificClassifyFunc
(
int32_t
funcId
,
uint64_t
classification
)
{
if
(
fmIsUserDefinedFunc
(
funcId
))
{
return
FUNC_MGT_AGG_FUNC
==
classification
?
FUNC_AGGREGATE_UDF_ID
==
funcId
:
(
FUNC_MGT_SCALAR_FUNC
==
classification
?
FUNC_SCALAR_UDF_ID
==
funcId
:
false
);
return
FUNC_MGT_AGG_FUNC
==
classification
?
FUNC_AGGREGATE_UDF_ID
==
funcId
:
(
FUNC_MGT_SCALAR_FUNC
==
classification
?
FUNC_SCALAR_UDF_ID
==
funcId
:
false
);
}
if
(
funcId
<
0
||
funcId
>=
funcMgtBuiltinsNum
)
{
return
false
;
...
...
@@ -63,7 +66,7 @@ static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
static
int32_t
getUdfInfo
(
SFmGetFuncInfoParam
*
pParam
,
SFunctionNode
*
pFunc
)
{
SFuncInfo
*
pInfo
=
NULL
;
int32_t
code
=
catalogGetUdfInfo
(
pParam
->
pCtg
,
pParam
->
pRpc
,
pParam
->
pMgmtEps
,
pFunc
->
functionName
,
&
pInfo
);
int32_t
code
=
catalogGetUdfInfo
(
pParam
->
pCtg
,
pParam
->
pRpc
,
pParam
->
pMgmtEps
,
pFunc
->
functionName
,
&
pInfo
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
}
...
...
@@ -122,33 +125,23 @@ int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) {
return
TSDB_CODE_FAILED
;
}
pFpSet
->
process
=
funcMgtBuiltins
[
funcId
].
sprocessFunc
;
pFpSet
->
getEnv
=
funcMgtBuiltins
[
funcId
].
getEnvFunc
;
pFpSet
->
getEnv
=
funcMgtBuiltins
[
funcId
].
getEnvFunc
;
return
TSDB_CODE_SUCCESS
;
}
bool
fmIsAggFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_AGG_FUNC
);
}
bool
fmIsAggFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_AGG_FUNC
);
}
bool
fmIsScalarFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_SCALAR_FUNC
);
}
bool
fmIsScalarFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_SCALAR_FUNC
);
}
bool
fmIsPseudoColumnFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_PSEUDO_COLUMN_FUNC
);
}
bool
fmIsPseudoColumnFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_PSEUDO_COLUMN_FUNC
);
}
bool
fmIsWindowPseudoColumnFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_WINDOW_PC_FUNC
);
}
bool
fmIsScanPseudoColumnFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_SCAN_PC_FUNC
);
}
bool
fmIsWindowClauseFunc
(
int32_t
funcId
)
{
return
fmIsAggFunc
(
funcId
)
||
fmIsWindowPseudoColumnFunc
(
funcId
);
}
bool
fmIsWindowPseudoColumnFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_WINDOW_PC_FUNC
);
}
bool
fmIs
NonstandardSQLFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_NONSTANDARD_SQL_FUNC
);
}
bool
fmIs
WindowClauseFunc
(
int32_t
funcId
)
{
return
fmIsAggFunc
(
funcId
)
||
fmIsWindowPseudoColumnFunc
(
funcId
);
}
bool
fmIsNonstandardSQLFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_NONSTANDARD_SQL_FUNC
);
}
bool
fmIsSpecialDataRequiredFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_SPECIAL_DATA_REQUIRED
);
...
...
@@ -158,13 +151,9 @@ bool fmIsDynamicScanOptimizedFunc(int32_t funcId) {
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_DYNAMIC_SCAN_OPTIMIZED
);
}
bool
fmIsMultiResFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_MULTI_RES_FUNC
);
}
bool
fmIsMultiResFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_MULTI_RES_FUNC
);
}
bool
fmIsUserDefinedFunc
(
int32_t
funcId
)
{
return
funcId
>
FUNC_UDF_ID_START
;
}
bool
fmIsUserDefinedFunc
(
int32_t
funcId
)
{
return
funcId
>
FUNC_UDF_ID_START
;
}
void
fmFuncMgtDestroy
()
{
void
*
m
=
gFunMgtService
.
pFuncNameHashTable
;
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
928c60e2
...
...
@@ -224,6 +224,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_ALL_SCALAR_FIELDS
;
COPY_BASE_OBJECT_FIELD
(
node
,
logicNodeCopy
);
CLONE_NODE_LIST_FIELD
(
pScanCols
);
CLONE_NODE_LIST_FIELD
(
pScanPseudoCols
);
CLONE_OBJECT_FIELD
(
pMeta
,
tableMetaClone
);
CLONE_OBJECT_FIELD
(
pVgroupList
,
vgroupsInfoClone
);
CLONE_NODE_LIST_FIELD
(
pDynamicScanFuncs
);
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
928c60e2
...
...
@@ -467,6 +467,7 @@ static int32_t jsonToLogicPlanNode(const SJson* pJson, void* pObj) {
}
static
const
char
*
jkScanLogicPlanScanCols
=
"ScanCols"
;
static
const
char
*
jkScanLogicPlanScanPseudoCols
=
"ScanPseudoCols"
;
static
const
char
*
jkScanLogicPlanTableMetaSize
=
"TableMetaSize"
;
static
const
char
*
jkScanLogicPlanTableMeta
=
"TableMeta"
;
...
...
@@ -477,6 +478,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkScanLogicPlanScanCols
,
pNode
->
pScanCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkScanLogicPlanScanPseudoCols
,
pNode
->
pScanPseudoCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkScanLogicPlanTableMetaSize
,
TABLE_META_SIZE
(
pNode
->
pMeta
));
}
...
...
@@ -495,6 +499,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkScanLogicPlanScanCols
,
&
pNode
->
pScanCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkScanLogicPlanScanPseudoCols
,
&
pNode
->
pScanPseudoCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkScanLogicPlanTableMetaSize
,
&
objSize
);
}
...
...
@@ -670,6 +677,7 @@ static int32_t jsonToName(const SJson* pJson, void* pObj) {
}
static
const
char
*
jkScanPhysiPlanScanCols
=
"ScanCols"
;
static
const
char
*
jkScanPhysiPlanScanPseudoCols
=
"ScanPseudoCols"
;
static
const
char
*
jkScanPhysiPlanTableId
=
"TableId"
;
static
const
char
*
jkScanPhysiPlanTableType
=
"TableType"
;
static
const
char
*
jkScanPhysiPlanTableName
=
"TableName"
;
...
...
@@ -681,6 +689,9 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkScanPhysiPlanScanCols
,
pNode
->
pScanCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkScanPhysiPlanScanPseudoCols
,
pNode
->
pScanPseudoCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkScanPhysiPlanTableId
,
pNode
->
uid
);
}
...
...
@@ -701,6 +712,9 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkScanPhysiPlanScanCols
,
&
pNode
->
pScanCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkScanPhysiPlanScanPseudoCols
,
&
pNode
->
pScanPseudoCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetUBigIntValue
(
pJson
,
jkScanPhysiPlanTableId
,
&
pNode
->
uid
);
}
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
928c60e2
...
...
@@ -1041,10 +1041,11 @@ bool nodesIsTimeorderQuery(const SNode* pQuery) { return false; }
bool
nodesIsTimelineQuery
(
const
SNode
*
pQuery
)
{
return
false
;
}
typedef
struct
SCollectColumnsCxt
{
int32_t
errCode
;
const
char
*
pTableAlias
;
SNodeList
*
pCols
;
SHashObj
*
pColHash
;
int32_t
errCode
;
const
char
*
pTableAlias
;
ECollectColType
collectType
;
SNodeList
*
pCols
;
SHashObj
*
pColHash
;
}
SCollectColumnsCxt
;
static
EDealRes
doCollect
(
SCollectColumnsCxt
*
pCxt
,
SColumnNode
*
pCol
,
SNode
*
pNode
)
{
...
...
@@ -1057,25 +1058,33 @@ static EDealRes doCollect(SCollectColumnsCxt* pCxt, SColumnNode* pCol, SNode* pN
if
(
NULL
==
taosHashGet
(
pCxt
->
pColHash
,
name
,
len
))
{
pCxt
->
errCode
=
taosHashPut
(
pCxt
->
pColHash
,
name
,
len
,
NULL
,
0
);
if
(
TSDB_CODE_SUCCESS
==
pCxt
->
errCode
)
{
pCxt
->
errCode
=
nodesList
Append
(
pCxt
->
pCols
,
pNode
);
pCxt
->
errCode
=
nodesList
StrictAppend
(
pCxt
->
pCols
,
nodesCloneNode
(
pNode
)
);
}
return
(
TSDB_CODE_SUCCESS
==
pCxt
->
errCode
?
DEAL_RES_IGNORE_CHILD
:
DEAL_RES_ERROR
);
}
return
DEAL_RES_CONTINUE
;
}
static
bool
isCollectType
(
ECollectColType
collectType
,
EColumnType
colType
)
{
return
COLLECT_COL_TYPE_ALL
==
collectType
?
true
:
(
COLLECT_COL_TYPE_TAG
==
collectType
?
COLUMN_TYPE_TAG
==
colType
:
COLUMN_TYPE_TAG
!=
colType
);
}
static
EDealRes
collectColumns
(
SNode
*
pNode
,
void
*
pContext
)
{
SCollectColumnsCxt
*
pCxt
=
(
SCollectColumnsCxt
*
)
pContext
;
if
(
QUERY_NODE_COLUMN
==
nodeType
(
pNode
))
{
SColumnNode
*
pCol
=
(
SColumnNode
*
)
pNode
;
if
(
NULL
==
pCxt
->
pTableAlias
||
0
==
strcmp
(
pCxt
->
pTableAlias
,
pCol
->
tableAlias
))
{
if
(
isCollectType
(
pCxt
->
collectType
,
pCol
->
colType
)
&&
(
NULL
==
pCxt
->
pTableAlias
||
0
==
strcmp
(
pCxt
->
pTableAlias
,
pCol
->
tableAlias
)))
{
return
doCollect
(
pCxt
,
pCol
,
pNode
);
}
}
return
DEAL_RES_CONTINUE
;
}
int32_t
nodesCollectColumns
(
SSelectStmt
*
pSelect
,
ESqlClause
clause
,
const
char
*
pTableAlias
,
SNodeList
**
pCols
)
{
int32_t
nodesCollectColumns
(
SSelectStmt
*
pSelect
,
ESqlClause
clause
,
const
char
*
pTableAlias
,
ECollectColType
type
,
SNodeList
**
pCols
)
{
if
(
NULL
==
pSelect
||
NULL
==
pCols
)
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1083,6 +1092,7 @@ int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char*
SCollectColumnsCxt
cxt
=
{
.
errCode
=
TSDB_CODE_SUCCESS
,
.
pTableAlias
=
pTableAlias
,
.
collectType
=
type
,
.
pCols
=
nodesMakeList
(),
.
pColHash
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
)};
if
(
NULL
==
cxt
.
pCols
||
NULL
==
cxt
.
pColHash
)
{
...
...
@@ -1092,11 +1102,11 @@ int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char*
nodesWalkSelectStmt
(
pSelect
,
clause
,
collectColumns
,
&
cxt
);
taosHashCleanup
(
cxt
.
pColHash
);
if
(
TSDB_CODE_SUCCESS
!=
cxt
.
errCode
)
{
nodes
Clear
List
(
cxt
.
pCols
);
nodes
Destroy
List
(
cxt
.
pCols
);
return
cxt
.
errCode
;
}
if
(
0
==
LIST_LENGTH
(
cxt
.
pCols
))
{
nodes
Clear
List
(
cxt
.
pCols
);
nodes
Destroy
List
(
cxt
.
pCols
);
cxt
.
pCols
=
NULL
;
}
*
pCols
=
cxt
.
pCols
;
...
...
@@ -1123,10 +1133,12 @@ int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNod
return
TSDB_CODE_SUCCESS
;
}
SCollectFuncsCxt
cxt
=
{.
errCode
=
TSDB_CODE_SUCCESS
,
.
classifier
=
classifier
,
.
pFuncs
=
nodesMakeList
()};
SCollectFuncsCxt
cxt
=
{
.
errCode
=
TSDB_CODE_SUCCESS
,
.
classifier
=
classifier
,
.
pFuncs
=
(
NULL
==
*
pFuncs
?
nodesMakeList
()
:
*
pFuncs
)};
if
(
NULL
==
cxt
.
pFuncs
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
*
pFuncs
=
NULL
;
nodesWalkSelectStmt
(
pSelect
,
SQL_CLAUSE_GROUP_BY
,
collectFuncs
,
&
cxt
);
if
(
TSDB_CODE_SUCCESS
!=
cxt
.
errCode
)
{
nodesDestroyList
(
cxt
.
pFuncs
);
...
...
@@ -1136,7 +1148,6 @@ int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNod
*
pFuncs
=
cxt
.
pFuncs
;
}
else
{
nodesDestroyList
(
cxt
.
pFuncs
);
*
pFuncs
=
NULL
;
}
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
928c60e2
...
...
@@ -1905,18 +1905,9 @@ static int32_t columnDefNodeToField(SNodeList* pList, SArray** pArray) {
SColumnDefNode
*
pCol
=
(
SColumnDefNode
*
)
pNode
;
SField
field
=
{.
type
=
pCol
->
dataType
.
type
,
.
bytes
=
calcTypeBytes
(
pCol
->
dataType
)};
strcpy
(
field
.
name
,
pCol
->
colName
);
taosArrayPush
(
*
pArray
,
&
field
);
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
columnNodeToField
(
SNodeList
*
pList
,
SArray
**
pArray
)
{
*
pArray
=
taosArrayInit
(
LIST_LENGTH
(
pList
),
sizeof
(
SField
));
SNode
*
pNode
;
FOREACH
(
pNode
,
pList
)
{
SColumnNode
*
pCol
=
(
SColumnNode
*
)
pNode
;
SField
field
=
{.
type
=
pCol
->
node
.
resType
.
type
,
.
bytes
=
calcTypeBytes
(
pCol
->
node
.
resType
)};
strcpy
(
field
.
name
,
pCol
->
colName
);
if
(
pCol
->
sma
)
{
field
.
flags
|=
SCHEMA_SMA_ON
;
}
taosArrayPush
(
*
pArray
,
&
field
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2249,13 +2240,6 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
columnDefNodeToField
(
pStmt
->
pTags
,
&
pReq
->
pTags
);
pReq
->
numOfColumns
=
LIST_LENGTH
(
pStmt
->
pCols
);
pReq
->
numOfTags
=
LIST_LENGTH
(
pStmt
->
pTags
);
if
(
NULL
==
pStmt
->
pOptions
->
pSma
)
{
columnDefNodeToField
(
pStmt
->
pCols
,
&
pReq
->
pSmas
);
pReq
->
numOfSmas
=
pReq
->
numOfColumns
;
}
else
{
columnNodeToField
(
pStmt
->
pOptions
->
pSma
,
&
pReq
->
pSmas
);
pReq
->
numOfSmas
=
LIST_LENGTH
(
pStmt
->
pOptions
->
pSma
);
}
SName
tableName
;
tNameExtractFullName
(
toName
(
pCxt
->
pParseCxt
->
acctId
,
pStmt
->
dbName
,
pStmt
->
tableName
,
&
tableName
),
pReq
->
name
);
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
928c60e2
...
...
@@ -129,14 +129,66 @@ static int32_t createChildLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelec
return
code
;
}
static
EScanType
getScanType
(
SLogicPlanContext
*
pCxt
,
SNodeList
*
pScanCols
,
STableMeta
*
pMeta
)
{
typedef
struct
SCreateColumnCxt
{
int32_t
errCode
;
SNodeList
*
pList
;
}
SCreateColumnCxt
;
static
EDealRes
doCreateColumn
(
SNode
*
pNode
,
void
*
pContext
)
{
SCreateColumnCxt
*
pCxt
=
(
SCreateColumnCxt
*
)
pContext
;
switch
(
nodeType
(
pNode
))
{
case
QUERY_NODE_COLUMN
:
{
SNode
*
pCol
=
nodesCloneNode
(
pNode
);
if
(
NULL
==
pCol
)
{
return
DEAL_RES_ERROR
;
}
return
(
TSDB_CODE_SUCCESS
==
nodesListAppend
(
pCxt
->
pList
,
pCol
)
?
DEAL_RES_IGNORE_CHILD
:
DEAL_RES_ERROR
);
}
case
QUERY_NODE_OPERATOR
:
case
QUERY_NODE_LOGIC_CONDITION
:
case
QUERY_NODE_FUNCTION
:
{
SExprNode
*
pExpr
=
(
SExprNode
*
)
pNode
;
SColumnNode
*
pCol
=
(
SColumnNode
*
)
nodesMakeNode
(
QUERY_NODE_COLUMN
);
if
(
NULL
==
pCol
)
{
return
DEAL_RES_ERROR
;
}
pCol
->
node
.
resType
=
pExpr
->
resType
;
strcpy
(
pCol
->
colName
,
pExpr
->
aliasName
);
return
(
TSDB_CODE_SUCCESS
==
nodesListAppend
(
pCxt
->
pList
,
pCol
)
?
DEAL_RES_IGNORE_CHILD
:
DEAL_RES_ERROR
);
}
default:
break
;
}
return
DEAL_RES_CONTINUE
;
}
static
int32_t
createColumnByRewriteExps
(
SLogicPlanContext
*
pCxt
,
SNodeList
*
pExprs
,
SNodeList
**
pList
)
{
SCreateColumnCxt
cxt
=
{.
errCode
=
TSDB_CODE_SUCCESS
,
.
pList
=
(
NULL
==
*
pList
?
nodesMakeList
()
:
*
pList
)};
if
(
NULL
==
cxt
.
pList
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
nodesWalkExprs
(
pExprs
,
doCreateColumn
,
&
cxt
);
if
(
TSDB_CODE_SUCCESS
!=
cxt
.
errCode
)
{
nodesDestroyList
(
cxt
.
pList
);
return
cxt
.
errCode
;
}
if
(
NULL
==
*
pList
)
{
*
pList
=
cxt
.
pList
;
}
return
cxt
.
errCode
;
}
static
EScanType
getScanType
(
SLogicPlanContext
*
pCxt
,
SNodeList
*
pScanPseudoCols
,
SNodeList
*
pScanCols
,
STableMeta
*
pMeta
)
{
if
(
pCxt
->
pPlanCxt
->
topicQuery
||
pCxt
->
pPlanCxt
->
streamQuery
)
{
return
SCAN_TYPE_STREAM
;
}
if
(
NULL
==
pScanCols
)
{
// select count(*) from t
return
SCAN_TYPE_TABLE
;
return
NULL
==
pScanPseudoCols
?
SCAN_TYPE_TABLE
:
SCAN_TYPE_TAG
;
}
if
(
TSDB_SYSTEM_TABLE
==
pMeta
->
tableType
)
{
...
...
@@ -186,7 +238,6 @@ static int32_t addPrimaryKeyCol(uint64_t tableId, SNodeList** pCols) {
if
(
!
found
)
{
if
(
TSDB_CODE_SUCCESS
!=
nodesListStrictAppend
(
*
pCols
,
createPrimaryKeyCol
(
tableId
)))
{
nodesDestroyList
(
*
pCols
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
...
...
@@ -214,30 +265,31 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pScan
->
dataRequired
=
FUNC_DATA_REQUIRED_DATA_LOAD
;
// set columns to scan
SNodeList
*
pCols
=
NULL
;
int32_t
code
=
nodesCollectColumns
(
pSelect
,
SQL_CLAUSE_FROM
,
pRealTable
->
table
.
tableAlias
,
&
pCols
);
int32_t
code
=
nodesCollectColumns
(
pSelect
,
SQL_CLAUSE_FROM
,
pRealTable
->
table
.
tableAlias
,
COLLECT_COL_TYPE_COL
,
&
pScan
->
pScanCols
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addPrimaryKeyCol
(
pScan
->
pMeta
->
uid
,
&
pCols
);
code
=
nodesCollectColumns
(
pSelect
,
SQL_CLAUSE_FROM
,
pRealTable
->
table
.
tableAlias
,
COLLECT_COL_TYPE_TAG
,
&
pScan
->
pScanPseudoCols
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pScan
->
pScanCols
=
nodesCloneList
(
pCols
);
if
(
NULL
==
pScan
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
code
=
nodesCollectFuncs
(
pSelect
,
fmIsScanPseudoColumnFunc
,
&
pScan
->
pScanPseudoCols
);
}
pScan
->
scanType
=
getScanType
(
pCxt
,
pCols
,
pScan
->
pMeta
);
pScan
->
scanType
=
getScanType
(
pCxt
,
p
Scan
->
pScanPseudoCols
,
pScan
->
pScan
Cols
,
pScan
->
pMeta
);
// set output
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pScan
->
node
.
pTargets
=
nodesCloneList
(
pCols
);
if
(
NULL
==
pScan
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
code
=
addPrimaryKeyCol
(
pScan
->
pMeta
->
uid
,
&
pScan
->
pScanCols
);
}
nodesClearList
(
pCols
);
// set output
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createColumnByRewriteExps
(
pCxt
,
pScan
->
pScanCols
,
&
pScan
->
node
.
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createColumnByRewriteExps
(
pCxt
,
pScan
->
pScanPseudoCols
,
&
pScan
->
node
.
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pLogicNode
=
(
SLogicNode
*
)
pScan
;
...
...
@@ -362,57 +414,6 @@ static SColumnNode* createColumnByExpr(const char* pStmtName, SExprNode* pExpr)
return
pCol
;
}
typedef
struct
SCreateColumnCxt
{
int32_t
errCode
;
SNodeList
*
pList
;
}
SCreateColumnCxt
;
static
EDealRes
doCreateColumn
(
SNode
*
pNode
,
void
*
pContext
)
{
SCreateColumnCxt
*
pCxt
=
(
SCreateColumnCxt
*
)
pContext
;
switch
(
nodeType
(
pNode
))
{
case
QUERY_NODE_COLUMN
:
{
SNode
*
pCol
=
nodesCloneNode
(
pNode
);
if
(
NULL
==
pCol
)
{
return
DEAL_RES_ERROR
;
}
return
(
TSDB_CODE_SUCCESS
==
nodesListAppend
(
pCxt
->
pList
,
pCol
)
?
DEAL_RES_IGNORE_CHILD
:
DEAL_RES_ERROR
);
}
case
QUERY_NODE_OPERATOR
:
case
QUERY_NODE_LOGIC_CONDITION
:
case
QUERY_NODE_FUNCTION
:
{
SExprNode
*
pExpr
=
(
SExprNode
*
)
pNode
;
SColumnNode
*
pCol
=
(
SColumnNode
*
)
nodesMakeNode
(
QUERY_NODE_COLUMN
);
if
(
NULL
==
pCol
)
{
return
DEAL_RES_ERROR
;
}
pCol
->
node
.
resType
=
pExpr
->
resType
;
strcpy
(
pCol
->
colName
,
pExpr
->
aliasName
);
return
(
TSDB_CODE_SUCCESS
==
nodesListAppend
(
pCxt
->
pList
,
pCol
)
?
DEAL_RES_IGNORE_CHILD
:
DEAL_RES_ERROR
);
}
default:
break
;
}
return
DEAL_RES_CONTINUE
;
}
static
int32_t
createColumnByRewriteExps
(
SLogicPlanContext
*
pCxt
,
SNodeList
*
pExprs
,
SNodeList
**
pList
)
{
SCreateColumnCxt
cxt
=
{.
errCode
=
TSDB_CODE_SUCCESS
,
.
pList
=
(
NULL
==
*
pList
?
nodesMakeList
()
:
*
pList
)};
if
(
NULL
==
cxt
.
pList
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
nodesWalkExprs
(
pExprs
,
doCreateColumn
,
&
cxt
);
if
(
TSDB_CODE_SUCCESS
!=
cxt
.
errCode
)
{
nodesDestroyList
(
cxt
.
pList
);
return
cxt
.
errCode
;
}
if
(
NULL
==
*
pList
)
{
*
pList
=
cxt
.
pList
;
}
return
cxt
.
errCode
;
}
static
int32_t
createAggLogicNode
(
SLogicPlanContext
*
pCxt
,
SSelectStmt
*
pSelect
,
SLogicNode
**
pLogicNode
)
{
if
(
!
pSelect
->
hasAggFuncs
&&
NULL
==
pSelect
->
pGroupByList
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -598,14 +599,7 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
return
TSDB_CODE_OUT_OF_MEMORY
;
}
SNodeList
*
pCols
=
NULL
;
int32_t
code
=
nodesCollectColumns
(
pSelect
,
SQL_CLAUSE_ORDER_BY
,
NULL
,
&
pCols
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pCols
)
{
pSort
->
node
.
pTargets
=
nodesCloneList
(
pCols
);
if
(
NULL
==
pSort
->
node
.
pTargets
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
}
int32_t
code
=
nodesCollectColumns
(
pSelect
,
SQL_CLAUSE_ORDER_BY
,
NULL
,
COLLECT_COL_TYPE_ALL
,
&
pSort
->
node
.
pTargets
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pSort
->
pSortKeys
=
nodesCloneList
(
pSelect
->
pOrderByList
);
...
...
@@ -695,14 +689,8 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
return
TSDB_CODE_OUT_OF_MEMORY
;
}
SNodeList
*
pCols
=
NULL
;
int32_t
code
=
nodesCollectColumns
(
pSelect
,
SQL_CLAUSE_PARTITION_BY
,
NULL
,
&
pCols
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pCols
)
{
pPartition
->
node
.
pTargets
=
nodesCloneList
(
pCols
);
if
(
NULL
==
pPartition
->
node
.
pTargets
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
}
int32_t
code
=
nodesCollectColumns
(
pSelect
,
SQL_CLAUSE_PARTITION_BY
,
NULL
,
COLLECT_COL_TYPE_ALL
,
&
pPartition
->
node
.
pTargets
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pPartition
->
pPartitionKeys
=
nodesCloneList
(
pSelect
->
pPartitionByList
);
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
928c60e2
...
...
@@ -380,6 +380,10 @@ static int32_t sortScanCols(SNodeList* pScanCols) {
}
static
int32_t
createScanCols
(
SPhysiPlanContext
*
pCxt
,
SScanPhysiNode
*
pScanPhysiNode
,
SNodeList
*
pScanCols
)
{
if
(
NULL
==
pScanCols
)
{
return
TSDB_CODE_SUCCESS
;
}
pScanPhysiNode
->
pScanCols
=
nodesCloneList
(
pScanCols
);
if
(
NULL
==
pScanPhysiNode
->
pScanCols
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -394,9 +398,22 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNo
// Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
code
=
addDataBlockSlots
(
pCxt
,
pScanPhysiNode
->
pScanCols
,
pScanPhysiNode
->
node
.
pOutputDataBlockDesc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pScanLogicNode
->
pScanPseudoCols
)
{
pScanPhysiNode
->
pScanPseudoCols
=
nodesCloneList
(
pScanLogicNode
->
pScanPseudoCols
);
if
(
NULL
==
pScanPhysiNode
->
pScanPseudoCols
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
addDataBlockSlots
(
pCxt
,
pScanPhysiNode
->
pScanPseudoCols
,
pScanPhysiNode
->
node
.
pOutputDataBlockDesc
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
setConditionsSlotId
(
pCxt
,
(
const
SLogicNode
*
)
pScanLogicNode
,
(
SPhysiNode
*
)
pScanPhysiNode
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pScanPhysiNode
->
uid
=
pScanLogicNode
->
pMeta
->
uid
;
pScanPhysiNode
->
tableType
=
pScanLogicNode
->
pMeta
->
tableType
;
...
...
@@ -1190,6 +1207,11 @@ static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubp
++
(
pQueryPlan
->
numOfSubplans
);
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
nodesDestroyNode
(
pSubplan
);
return
code
;
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pParent
)
{
code
=
nodesListMakeAppend
(
&
pParent
->
pChildren
,
pSubplan
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
@@ -1207,10 +1229,6 @@ static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubp
}
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
nodesDestroyNode
(
pSubplan
);
}
return
code
;
}
...
...
source/libs/planner/test/planSTableTest.cpp
浏览文件 @
928c60e2
...
...
@@ -19,7 +19,7 @@ using namespace std;
class
PlanSuperTableTest
:
public
PlannerTestBase
{};
TEST_F
(
PlanSuperTableTest
,
unionAll
)
{
TEST_F
(
PlanSuperTableTest
,
tbname
)
{
useDb
(
"root"
,
"test"
);
run
(
"select tbname from st1"
);
...
...
tests/script/sh/massiveTable/compileVersion.sh
浏览文件 @
928c60e2
...
...
@@ -44,7 +44,7 @@ function gitPullBranchInfo () {
## git submodule update --init --recursive
git pull origin
$branch_name
||
:
echo
"==== git pull
$branch_name
end ===="
git pull
--recurse-submodules
#
git pull --recurse-submodules
}
function
compileTDengineVersion
()
{
...
...
tests/script/tsim/db/alter_option.sim
浏览文件 @
928c60e2
...
...
@@ -126,6 +126,49 @@ if $data16_db != ns then # precision
return -1
endi
sleep 3000
sql show db.vgroups
if $data[0][4] == LEADER then
if $data[0][6] != FOLLOWER then
return -1
endi
if $data[0][8] != FOLLOWER then
return -1
endi
endi
if $data[0][6] == LEADER then
if $data[0][4] != FOLLOWER then
return -1
endi
if $data[0][8] != FOLLOWER then
return -1
endi
endi
if $data[0][8] == LEADER then
if $data[0][4] != FOLLOWER then
return -1
endi
if $data[0][6] != FOLLOWER then
return -1
endi
endi
if $data[0][4] != LEADER then
if $data[0][4] != FOLLOWER then
return -1
endi
endi
if $data[0][6] != LEADER then
if $data[0][6] != FOLLOWER then
return -1
endi
endi
if $data[0][8] != LEADER then
if $data[0][8] != FOLLOWER then
return -1
endi
endi
print ============== not support modify options: name, create_time, vgroups, ntables
sql_error alter database db name dba
sql_error alter database db create_time "2022-03-03 15:08:13.329"
...
...
tools/shell/inc/shellInt.h
浏览文件 @
928c60e2
...
...
@@ -85,7 +85,6 @@ typedef struct {
TAOS
*
conn
;
TdThread
pid
;
tsem_t
cancelSem
;
int64_t
result
;
}
SShellObj
;
// shellArguments.c
...
...
tools/shell/src/shellEngine.c
浏览文件 @
928c60e2
...
...
@@ -213,13 +213,10 @@ void shellRunSingleCommandImp(char *command) {
return
;
}
int64_t
oresult
=
atomic_load_64
(
&
shell
.
result
);
if
(
shellRegexMatch
(
command
,
"^
\\
s*use
\\
s+[a-zA-Z0-9_]+
\\
s*;
\\
s*$"
,
REG_EXTENDED
|
REG_ICASE
))
{
fprintf
(
stdout
,
"Database changed.
\n\n
"
);
fflush
(
stdout
);
atomic_store_64
(
&
shell
.
result
,
0
);
taos_free_result
(
pSql
);
return
;
...
...
@@ -230,10 +227,7 @@ void shellRunSingleCommandImp(char *command) {
int32_t
error_no
=
0
;
int32_t
numOfRows
=
shellDumpResult
(
pSql
,
fname
,
&
error_no
,
printMode
);
if
(
numOfRows
<
0
)
{
atomic_store_64
(
&
shell
.
result
,
0
);
return
;
}
if
(
numOfRows
<
0
)
return
;
et
=
taosGetTimestampUs
();
if
(
error_no
==
0
)
{
...
...
@@ -250,8 +244,6 @@ void shellRunSingleCommandImp(char *command) {
}
printf
(
"
\n
"
);
atomic_store_64
(
&
shell
.
result
,
0
);
}
char
*
shellFormatTimestamp
(
char
*
buf
,
int64_t
val
,
int32_t
precision
)
{
...
...
@@ -398,7 +390,6 @@ int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) {
row
=
taos_fetch_row
(
tres
);
}
while
(
row
!=
NULL
);
atomic_store_64
(
&
shell
.
result
,
0
);
taosCloseFile
(
&
pFile
);
return
numOfRows
;
...
...
@@ -766,7 +757,6 @@ void shellWriteHistory() {
void
shellPrintError
(
TAOS_RES
*
tres
,
int64_t
st
)
{
int64_t
et
=
taosGetTimestampUs
();
atomic_store_ptr
(
&
shell
.
result
,
0
);
fprintf
(
stderr
,
"
\n
DB error: %s (%.6fs)
\n
"
,
taos_errstr
(
tres
),
(
et
-
st
)
/
1E6
);
taos_free_result
(
tres
);
}
...
...
@@ -872,7 +862,6 @@ void shellGetGrantInfo() {
fprintf
(
stdout
,
"Server is Enterprise %s Edition, %s and will expire at %s.
\n
"
,
serverVersion
,
sinfo
,
expiretime
);
}
atomic_store_64
(
&
shell
.
result
,
0
);
taos_free_result
(
tres
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录