Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
352965f7
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
352965f7
编写于
7月 25, 2022
作者:
P
plum-lihui
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
上级
3614db50
aefdf8bb
变更
31
隐藏空白更改
内联
并排
Showing
31 changed file
with
501 addition
and
273 deletion
+501
-273
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+5
-4
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+1
-0
include/libs/wal/wal.h
include/libs/wal/wal.h
+19
-15
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+2
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+22
-6
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+9
-2
source/libs/command/src/explain.c
source/libs/command/src/explain.c
+8
-7
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+42
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+14
-9
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+13
-13
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+34
-67
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+2
-0
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+2
-2
source/libs/nodes/src/nodesTraverseFuncs.c
source/libs/nodes/src/nodesTraverseFuncs.c
+1
-1
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+2
-2
source/libs/parser/inc/parAst.h
source/libs/parser/inc/parAst.h
+1
-1
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+3
-1
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+8
-6
source/libs/parser/src/parUtil.c
source/libs/parser/src/parUtil.c
+1
-1
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+5
-3
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+38
-19
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+5
-3
source/libs/planner/src/planSpliter.c
source/libs/planner/src/planSpliter.c
+58
-13
source/libs/planner/src/planUtil.c
source/libs/planner/src/planUtil.c
+2
-1
source/libs/planner/test/planBasicTest.cpp
source/libs/planner/test/planBasicTest.cpp
+4
-3
source/libs/planner/test/planOptimizeTest.cpp
source/libs/planner/test/planOptimizeTest.cpp
+2
-0
source/libs/wal/src/walMgmt.c
source/libs/wal/src/walMgmt.c
+1
-1
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+84
-79
source/libs/wal/src/walRef.c
source/libs/wal/src/walRef.c
+89
-0
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+23
-12
source/util/src/terror.c
source/util/src/terror.c
+1
-1
未找到文件。
include/libs/nodes/plannodes.h
浏览文件 @
352965f7
...
...
@@ -104,6 +104,7 @@ typedef struct SJoinLogicNode {
SNode
*
pMergeCondition
;
SNode
*
pOnConditions
;
bool
isSingleTableJoin
;
EOrder
inputTsOrder
;
}
SJoinLogicNode
;
typedef
struct
SAggLogicNode
{
...
...
@@ -201,6 +202,7 @@ typedef struct SWindowLogicNode {
int64_t
watermark
;
int8_t
igExpired
;
EWindowAlgorithm
windowAlgo
;
EOrder
inputTsOrder
;
}
SWindowLogicNode
;
typedef
struct
SFillLogicNode
{
...
...
@@ -356,15 +358,14 @@ typedef struct SInterpFuncPhysiNode {
SNode
*
pTimeSeries
;
// SColumnNode
}
SInterpFuncPhysiNode
;
typedef
struct
SJoinPhysiNode
{
typedef
struct
S
SortMerge
JoinPhysiNode
{
SPhysiNode
node
;
EJoinType
joinType
;
SNode
*
pMergeCondition
;
SNode
*
pOnConditions
;
SNodeList
*
pTargets
;
}
SJoinPhysiNode
;
typedef
SJoinPhysiNode
SSortMergeJoinPhysiNode
;
EOrder
inputTsOrder
;
}
SSortMergeJoinPhysiNode
;
typedef
struct
SAggPhysiNode
{
SPhysiNode
node
;
...
...
include/libs/nodes/querynodes.h
浏览文件 @
352965f7
...
...
@@ -255,6 +255,7 @@ typedef struct SSelectStmt {
int32_t
selectFuncNum
;
bool
isEmptyResult
;
bool
isTimeLineResult
;
bool
isSubquery
;
bool
hasAggFuncs
;
bool
hasRepeatScanFuncs
;
bool
hasIndefiniteRowsFunc
;
...
...
include/libs/wal/wal.h
浏览文件 @
352965f7
...
...
@@ -114,21 +114,30 @@ typedef struct SWal {
int64_t
refId
;
TdThreadMutex
mutex
;
// ref
SHashObj
*
pRefHash
;
// ref -> SWalRef
SHashObj
*
pRefHash
;
// ref
Id
-> SWalRef
// path
char
path
[
WAL_PATH_LEN
];
// reusable write head
SWalCkHead
writeHead
;
}
SWal
;
// WAL HANDLE
}
SWal
;
typedef
struct
{
int64_t
refId
;
int64_t
refVer
;
int64_t
refFile
;
SWal
*
pWal
;
}
SWalRef
;
typedef
struct
{
int8_t
scanUncommited
;
int8_t
scanNotApplied
;
int8_t
scanMeta
;
int8_t
enableRef
;
}
SWalFilterCond
;
typedef
struct
{
SWal
*
pWal
;
int64_t
readerId
;
TdFilePtr
pLogFile
;
TdFilePtr
pIdxFile
;
int64_t
curFileFirstVer
;
...
...
@@ -138,7 +147,8 @@ typedef struct {
int8_t
curStopped
;
TdThreadMutex
mutex
;
SWalFilterCond
cond
;
SWalCkHead
*
pHead
;
// TODO remove it
SWalCkHead
*
pHead
;
}
SWalReader
;
// module initialization
...
...
@@ -157,11 +167,7 @@ int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_
int32_t
walWriteWithSyncInfo
(
SWal
*
,
int64_t
index
,
tmsg_t
msgType
,
SWalSyncInfo
syncMeta
,
const
void
*
body
,
int32_t
bodyLen
);
// This interface assign version automatically and return to caller.
// When using this interface with concurrent writes,
// wal will write all logs atomically,
// but not sure which one will be actually write first,
// and then the unique index of successful writen is returned.
// Assign version automatically and return to caller,
// -1 will be returned for failed writes
int64_t
walAppendLog
(
SWal
*
,
tmsg_t
msgType
,
SWalSyncInfo
syncMeta
,
const
void
*
body
,
int32_t
bodyLen
);
...
...
@@ -191,17 +197,15 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
int32_t
walFetchHead
(
SWalReader
*
pRead
,
int64_t
ver
,
SWalCkHead
*
pHead
);
int32_t
walFetchBody
(
SWalReader
*
pRead
,
SWalCkHead
**
ppHead
);
int32_t
walSkipFetchBody
(
SWalReader
*
pRead
,
const
SWalCkHead
*
pHead
);
typedef
struct
{
int64_t
refId
;
int64_t
ver
;
}
SWalRef
;
SWalRef
*
walRefCommittedVer
(
SWal
*
);
SWalRef
*
walOpenRef
(
SWal
*
);
void
walCloseRef
(
SWal
Ref
*
);
void
walCloseRef
(
SWal
*
pWal
,
int64_t
refId
);
int32_t
walRefVer
(
SWalRef
*
,
int64_t
ver
);
int32_t
walUnrefVer
(
SWal
*
);
void
walUnrefVer
(
SWalRef
*
);
// help function for raft
// help
er
function for raft
bool
walLogExist
(
SWal
*
,
int64_t
ver
);
bool
walIsEmpty
(
SWal
*
);
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
352965f7
...
...
@@ -104,6 +104,8 @@ typedef struct {
// TODO remove
SWalReader
*
pWalReader
;
SWalRef
*
pRef
;
// push
STqPushHandle
pushHandle
;
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
352965f7
...
...
@@ -212,6 +212,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
ASSERT
(
0
);
return
-
1
;
}
if
(
offset
.
val
.
type
==
TMQ_OFFSET__LOG
)
{
STqHandle
*
pHandle
=
taosHashGet
(
pTq
->
handles
,
offset
.
subKey
,
strlen
(
offset
.
subKey
));
if
(
walRefVer
(
pHandle
->
pRef
,
offset
.
val
.
version
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
}
/*}*/
/*}*/
...
...
@@ -376,8 +385,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
}
if
(
pHandle
->
execHandle
.
subType
!=
TOPIC_SUB_TYPE__COLUMN
)
{
int64_t
fetchVer
=
fetchOffsetNew
.
version
+
1
;
SWalCkHead
*
pCkHead
=
taosMemoryMalloc
(
sizeof
(
SWalCkHead
)
+
2048
);
int64_t
fetchVer
=
fetchOffsetNew
.
version
+
1
;
pCkHead
=
taosMemoryMalloc
(
sizeof
(
SWalCkHead
)
+
2048
);
if
(
pCkHead
==
NULL
)
{
code
=
-
1
;
goto
OVER
;
...
...
@@ -534,11 +543,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle
->
execHandle
.
subType
=
req
.
subType
;
pHandle
->
fetchMeta
=
req
.
withMeta
;
// TODO version should be assigned and refed during preprocess
SWalRef
*
pRef
=
walRefCommittedVer
(
pTq
->
pVnode
->
pWal
);
if
(
pRef
==
NULL
)
{
ASSERT
(
0
);
}
int64_t
ver
=
pRef
->
refVer
;
pHandle
->
pRef
=
pRef
;
pHandle
->
pWalReader
=
walOpenReader
(
pTq
->
pVnode
->
pWal
,
NULL
);
// TODO version should be assigned in preprocess
int64_t
ver
=
walGetCommittedVer
(
pTq
->
pVnode
->
pWal
);
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
pHandle
->
execHandle
.
execCol
.
qmsg
=
req
.
qmsg
;
pHandle
->
snapshotVer
=
ver
;
...
...
@@ -560,10 +572,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle
->
execHandle
.
pExecReader
=
qExtractReaderFromStreamScanner
(
scanner
);
ASSERT
(
pHandle
->
execHandle
.
pExecReader
);
}
else
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__DB
)
{
pHandle
->
pWalReader
=
walOpenReader
(
pTq
->
pVnode
->
pWal
,
NULL
);
pHandle
->
execHandle
.
pExecReader
=
tqOpenReader
(
pTq
->
pVnode
);
pHandle
->
execHandle
.
execDb
.
pFilterOutTbUid
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
}
else
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
pHandle
->
pWalReader
=
walOpenReader
(
pTq
->
pVnode
->
pWal
,
NULL
);
pHandle
->
execHandle
.
execTb
.
suid
=
req
.
suid
;
SArray
*
tbUidList
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
vnodeGetCtbIdList
(
pTq
->
pVnode
,
req
.
suid
,
tbUidList
);
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
352965f7
...
...
@@ -52,7 +52,7 @@ int32_t tqMetaOpen(STQ* pTq) {
ASSERT
(
0
);
}
TXN
txn
;
TXN
txn
=
{
0
}
;
if
(
tdbTxnOpen
(
&
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
0
)
<
0
)
{
ASSERT
(
0
);
...
...
@@ -75,7 +75,13 @@ int32_t tqMetaOpen(STQ* pTq) {
STqHandle
handle
;
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
pVal
,
vLen
);
tDecodeSTqHandle
(
&
decoder
,
&
handle
);
handle
.
pWalReader
=
walOpenReader
(
pTq
->
pVnode
->
pWal
,
NULL
);
handle
.
pRef
=
walOpenRef
(
pTq
->
pVnode
->
pWal
);
if
(
handle
.
pRef
==
NULL
)
{
ASSERT
(
0
);
}
walRefVer
(
handle
.
pRef
,
handle
.
snapshotVer
);
if
(
handle
.
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
SReadHandle
reader
=
{
.
meta
=
pTq
->
pVnode
->
pMeta
,
...
...
@@ -94,6 +100,7 @@ int32_t tqMetaOpen(STQ* pTq) {
handle
.
execHandle
.
pExecReader
=
qExtractReaderFromStreamScanner
(
scanner
);
ASSERT
(
handle
.
execHandle
.
pExecReader
);
}
else
{
handle
.
pWalReader
=
walOpenReader
(
pTq
->
pVnode
->
pWal
,
NULL
);
handle
.
execHandle
.
execDb
.
pFilterOutTbUid
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
}
...
...
source/libs/command/src/explain.c
浏览文件 @
352965f7
...
...
@@ -135,7 +135,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
:
{
S
JoinPhysiNode
*
pJoinNode
=
(
S
JoinPhysiNode
*
)
pNode
;
S
SortMergeJoinPhysiNode
*
pJoinNode
=
(
SSortMerge
JoinPhysiNode
*
)
pNode
;
pPhysiChildren
=
pJoinNode
->
node
.
pChildren
;
break
;
}
...
...
@@ -434,7 +434,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
case
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
:
{
STableScanPhysiNode
*
pTblScanNode
=
(
STableScanPhysiNode
*
)
pNode
;
EXPLAIN_ROW_NEW
(
level
,
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
==
pNode
->
type
?
EXPLAIN_TBL_MERGE_SCAN_FORMAT
:
EXPLAIN_TBL_SCAN_FORMAT
,
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
==
pNode
->
type
?
EXPLAIN_TBL_MERGE_SCAN_FORMAT
:
EXPLAIN_TBL_SCAN_FORMAT
,
pTblScanNode
->
scan
.
tableName
.
tname
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_LEFT_PARENTHESIS_FORMAT
);
if
(
pResNode
->
pExecInfo
)
{
...
...
@@ -551,7 +552,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
if
(
pSTblScanNode
->
scan
.
pScanPseudoCols
)
{
EXPLAIN_ROW_APPEND
(
EXPLAIN_PSEUDO_COLUMNS_FORMAT
,
pSTblScanNode
->
scan
.
pScanPseudoCols
->
length
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
}
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
pSTblScanNode
->
scan
.
node
.
pOutputDataBlockDesc
->
totalRowSize
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
...
...
@@ -613,7 +614,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
:
{
S
JoinPhysiNode
*
pJoinNode
=
(
S
JoinPhysiNode
*
)
pNode
;
S
SortMergeJoinPhysiNode
*
pJoinNode
=
(
SSortMerge
JoinPhysiNode
*
)
pNode
;
EXPLAIN_ROW_NEW
(
level
,
EXPLAIN_JOIN_FORMAT
,
EXPLAIN_JOIN_STRING
(
pJoinNode
->
joinType
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_LEFT_PARENTHESIS_FORMAT
);
if
(
pResNode
->
pExecInfo
)
{
...
...
@@ -1180,7 +1181,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
if
(
pDistScanNode
->
pScanPseudoCols
)
{
EXPLAIN_ROW_APPEND
(
EXPLAIN_PSEUDO_COLUMNS_FORMAT
,
pDistScanNode
->
pScanPseudoCols
->
length
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
}
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
pDistScanNode
->
node
.
pOutputDataBlockDesc
->
totalRowSize
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
...
...
@@ -1367,7 +1368,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND
(
EXPLAIN_FUNCTIONS_FORMAT
,
pInterpNode
->
pFuncs
->
length
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
}
EXPLAIN_ROW_APPEND
(
EXPLAIN_MODE_FORMAT
,
nodesGetFillModeString
(
pInterpNode
->
fillMode
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
...
...
@@ -1419,7 +1420,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
}
break
;
}
}
default:
qError
(
"not supported physical node type %d"
,
pNode
->
type
);
return
TSDB_CODE_QRY_APP_ERROR
;
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
352965f7
...
...
@@ -320,6 +320,47 @@ typedef struct STableScanInfo {
int8_t
noTable
;
}
STableScanInfo
;
typedef
struct
STableMergeScanInfo
{
STableListInfo
*
tableListInfo
;
int32_t
tableStartIndex
;
int32_t
tableEndIndex
;
bool
hasGroupId
;
uint64_t
groupId
;
SArray
*
dataReaders
;
// array of tsdbReaderT*
SReadHandle
readHandle
;
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SSDataBlock
*
pSortInputBlock
;
int64_t
startTs
;
// sort start time
SArray
*
sortSourceParams
;
SFileBlockLoadRecorder
readRecorder
;
int64_t
numOfRows
;
SScanInfo
scanInfo
;
int32_t
scanTimes
;
SNode
*
pFilterNode
;
// filter info, which is push down by optimizer
SqlFunctionCtx
*
pCtx
;
// which belongs to the direct upstream operator operator query context
SResultRowInfo
*
pResultRowInfo
;
int32_t
*
rowEntryInfoOffset
;
SExprInfo
*
pExpr
;
SSDataBlock
*
pResBlock
;
SArray
*
pColMatchInfo
;
int32_t
numOfOutput
;
SExprSupp
pseudoSup
;
SQueryTableDataCond
cond
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
int32_t
dataBlockLoadFlag
;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
SInterval
interval
;
SSampleExecInfo
sample
;
// sample execution info
}
STableMergeScanInfo
;
typedef
struct
STagScanInfo
{
SColumnInfo
*
pCols
;
SSDataBlock
*
pRes
;
...
...
@@ -886,7 +927,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
SOperatorInfo
*
createTimeSliceOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMergeJoinOperatorInfo
(
SOperatorInfo
**
pDownstream
,
int32_t
numOfDownstream
,
SJoinPhysiNode
*
pJoinNode
,
SOperatorInfo
*
createMergeJoinOperatorInfo
(
SOperatorInfo
**
pDownstream
,
int32_t
numOfDownstream
,
S
SortMerge
JoinPhysiNode
*
pJoinNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
352965f7
...
...
@@ -1325,7 +1325,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM
extractQualifiedTupleByFilterResult
(
pBlock
,
rowRes
,
keep
);
if
(
pColMatchInfo
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pColMatchInfo
);
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pColMatchInfo
);
++
i
)
{
SColMatchInfo
*
pInfo
=
taosArrayGet
(
pColMatchInfo
,
i
);
if
(
pInfo
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
SColumnInfoData
*
pColData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
targetSlotId
);
...
...
@@ -1646,10 +1646,10 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
SFileBlockLoadRecorder
*
pRecorder
=
pSummary
->
pRecoder
;
if
(
pSummary
->
pRecoder
!=
NULL
)
{
qDebug
(
"%s :cost summary: elapsed time:%.2f ms, total blocks:%d, load block SMA:%d, load data block:%d, total
rows:%
"
PRId64
", check rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
/
1000
.
0
,
pRecorder
->
totalBlocks
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRow
s
,
pRecorder
->
totalCheckedRows
);
"%s :cost summary: elapsed time:%.2f ms, total blocks:%d, load block SMA:%d, load data block:%d, total "
"rows:%"
PRId64
", check rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
/
1000
.
0
,
pRecorder
->
totalBlocks
,
pRecorder
->
loadBlockStati
s
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRows
,
pRecorder
->
totalCheckedRows
);
}
// qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
...
...
@@ -2783,11 +2783,16 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
*
order
=
TSDB_ORDER_ASC
;
*
scanFlag
=
MAIN_SCAN
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
||
type
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
)
{
}
else
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
)
{
STableScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
*
order
=
pTableScanInfo
->
cond
.
order
;
*
scanFlag
=
pTableScanInfo
->
scanFlag
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
)
{
STableMergeScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
*
order
=
pTableScanInfo
->
cond
.
order
;
*
scanFlag
=
pTableScanInfo
->
scanFlag
;
return
TSDB_CODE_SUCCESS
;
}
else
{
if
(
pOperator
->
pDownstream
==
NULL
||
pOperator
->
pDownstream
[
0
]
==
NULL
)
{
return
TSDB_CODE_INVALID_PARA
;
...
...
@@ -3728,7 +3733,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
}
// this the tags and pseudo function columns, we only keep the tag columns
for
(
int32_t
i
=
0
;
i
<
numOfTags
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfTags
;
++
i
)
{
STargetNode
*
pNode
=
(
STargetNode
*
)
nodesListGetNode
(
pScanNode
->
pScanPseudoCols
,
i
);
int32_t
type
=
nodeType
(
pNode
->
pExpr
);
...
...
@@ -3844,7 +3849,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
int32_t
groupNum
=
0
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
i
++
)
{
STableKeyInfo
*
info
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
i
);
int32_t
code
=
getGroupIdFromTagsVal
(
pHandle
->
meta
,
info
->
uid
,
group
,
keyBuf
,
&
info
->
groupId
);
int32_t
code
=
getGroupIdFromTagsVal
(
pHandle
->
meta
,
info
->
uid
,
group
,
keyBuf
,
&
info
->
groupId
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -4165,7 +4170,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE
==
type
)
{
pOptr
=
createStreamStateAggOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
==
type
)
{
pOptr
=
createMergeJoinOperatorInfo
(
ops
,
size
,
(
SJoinPhysiNode
*
)
pPhyNode
,
pTaskInfo
);
pOptr
=
createMergeJoinOperatorInfo
(
ops
,
size
,
(
S
SortMerge
JoinPhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_FILL
==
type
)
{
pOptr
=
createFillOperatorInfo
(
ops
[
0
],
(
SFillPhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC
==
type
)
{
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
352965f7
...
...
@@ -28,30 +28,30 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
static
void
destroyMergeJoinOperator
(
void
*
param
,
int32_t
numOfOutput
);
static
void
extractTimeCondition
(
SJoinOperatorInfo
*
Info
,
SLogicConditionNode
*
pLogicConditionNode
);
SOperatorInfo
*
createMergeJoinOperatorInfo
(
SOperatorInfo
**
pDownstream
,
int32_t
numOfDownstream
,
SJoinPhysiNode
*
pJoinNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createMergeJoinOperatorInfo
(
SOperatorInfo
**
pDownstream
,
int32_t
numOfDownstream
,
SSortMergeJoinPhysiNode
*
pJoinNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SJoinOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SJoinOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pOperator
==
NULL
||
pInfo
==
NULL
)
{
goto
_error
;
}
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pJoinNode
->
node
.
pOutputDataBlockDesc
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pJoinNode
->
node
.
pOutputDataBlockDesc
);
int32_t
numOfCols
=
0
;
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pJoinNode
->
pTargets
,
NULL
,
&
numOfCols
);
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
pInfo
->
pRes
=
pResBlock
;
pOperator
->
name
=
"MergeJoinOperator"
;
pInfo
->
pRes
=
pResBlock
;
pOperator
->
name
=
"MergeJoinOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
SNode
*
pMergeCondition
=
pJoinNode
->
pMergeCondition
;
if
(
nodeType
(
pMergeCondition
)
==
QUERY_NODE_OPERATOR
)
{
...
...
@@ -104,7 +104,7 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
void
destroyMergeJoinOperator
(
void
*
param
,
int32_t
numOfOutput
)
{
SJoinOperatorInfo
*
pJoinOperator
=
(
SJoinOperatorInfo
*
)
param
;
nodesDestroyNode
(
pJoinOperator
->
pCondAfterMerge
);
taosMemoryFreeClear
(
param
);
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
352965f7
...
...
@@ -274,7 +274,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
qDebug
(
"%s data block filter out, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
}
else
{
qDebug
(
"%s data block filter out, elapsed time:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
(
et
-
st
));
qDebug
(
"%s data block filter out, elapsed time:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
(
et
-
st
));
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1838,11 +1838,14 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
int8_t
tagType
=
smr
.
me
.
stbEntry
.
schemaTag
.
pSchema
[
i
].
type
;
pColInfoData
=
taosArrayGet
(
p
->
pDataBlock
,
4
);
char
tagTypeStr
[
VARSTR_HEADER_SIZE
+
32
];
int
tagTypeLen
=
sprintf
(
varDataVal
(
tagTypeStr
),
"%s"
,
tDataTypes
[
tagType
].
name
);
int
tagTypeLen
=
sprintf
(
varDataVal
(
tagTypeStr
),
"%s"
,
tDataTypes
[
tagType
].
name
);
if
(
tagType
==
TSDB_DATA_TYPE_VARCHAR
)
{
tagTypeLen
+=
sprintf
(
varDataVal
(
tagTypeStr
)
+
tagTypeLen
,
"(%d)"
,
(
int32_t
)(
smr
.
me
.
stbEntry
.
schemaTag
.
pSchema
[
i
].
bytes
-
VARSTR_HEADER_SIZE
));
tagTypeLen
+=
sprintf
(
varDataVal
(
tagTypeStr
)
+
tagTypeLen
,
"(%d)"
,
(
int32_t
)(
smr
.
me
.
stbEntry
.
schemaTag
.
pSchema
[
i
].
bytes
-
VARSTR_HEADER_SIZE
));
}
else
if
(
tagType
==
TSDB_DATA_TYPE_NCHAR
)
{
tagTypeLen
+=
sprintf
(
varDataVal
(
tagTypeStr
)
+
tagTypeLen
,
"(%d)"
,
(
int32_t
)((
smr
.
me
.
stbEntry
.
schemaTag
.
pSchema
[
i
].
bytes
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
));
tagTypeLen
+=
sprintf
(
varDataVal
(
tagTypeStr
)
+
tagTypeLen
,
"(%d)"
,
(
int32_t
)((
smr
.
me
.
stbEntry
.
schemaTag
.
pSchema
[
i
].
bytes
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
));
}
varDataSetLen
(
tagTypeStr
,
tagTypeLen
);
colDataAppend
(
pColInfoData
,
numOfRows
,
(
char
*
)
tagTypeStr
,
false
);
...
...
@@ -2527,49 +2530,6 @@ _error:
return
NULL
;
}
typedef
struct
STableMergeScanInfo
{
STableListInfo
*
tableListInfo
;
int32_t
tableStartIndex
;
int32_t
tableEndIndex
;
bool
hasGroupId
;
uint64_t
groupId
;
SArray
*
dataReaders
;
// array of tsdbReaderT*
SReadHandle
readHandle
;
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SSDataBlock
*
pSortInputBlock
;
int64_t
startTs
;
// sort start time
SArray
*
sortSourceParams
;
SFileBlockLoadRecorder
readRecorder
;
int64_t
numOfRows
;
SScanInfo
scanInfo
;
int32_t
scanTimes
;
SNode
*
pFilterNode
;
// filter info, which is push down by optimizer
SqlFunctionCtx
*
pCtx
;
// which belongs to the direct upstream operator operator query context
SResultRowInfo
*
pResultRowInfo
;
int32_t
*
rowEntryInfoOffset
;
SExprInfo
*
pExpr
;
SSDataBlock
*
pResBlock
;
SArray
*
pColMatchInfo
;
int32_t
numOfOutput
;
SExprInfo
*
pPseudoExpr
;
int32_t
numOfPseudoExpr
;
SqlFunctionCtx
*
pPseudoCtx
;
SQueryTableDataCond
cond
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
int32_t
dataBlockLoadFlag
;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
SInterval
interval
;
SSampleExecInfo
sample
;
// sample execution info
}
STableMergeScanInfo
;
int32_t
createScanTableListInfo
(
SScanPhysiNode
*
pScanNode
,
SNodeList
*
pGroupTags
,
bool
groupSort
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
const
char
*
idStr
)
{
...
...
@@ -2700,9 +2660,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
relocateColumnData
(
pBlock
,
pTableScanInfo
->
pColMatchInfo
,
pCols
,
true
);
// currently only the tbname pseudo column
if
(
pTableScanInfo
->
numOfPseudoExpr
>
0
)
{
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pTableScanInfo
->
p
PseudoExpr
,
pTableScanInfo
->
numOfPseudoExpr
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pTableScanInfo
->
p
seudoSup
.
pExprInfo
,
pTableScanInfo
->
pseudoSup
.
numOfExprs
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -2869,29 +2829,31 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
tsortDestroySortHandle
(
pInfo
->
pSortHandle
);
size_t
numReaders
=
taosArrayGetSize
(
pInfo
->
dataReaders
);
for
(
int32_t
i
=
0
;
i
<
numReaders
;
++
i
)
{
STableMergeScanSortSourceParam
*
param
=
taosArrayGet
(
pInfo
->
sortSourceParams
,
i
);
blockDataDestroy
(
param
->
inputBlock
);
}
taosArrayClear
(
pInfo
->
sortSourceParams
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
dataReaders
);
++
i
)
{
tsortDestroySortHandle
(
pInfo
->
pSortHandle
);
for
(
int32_t
i
=
0
;
i
<
numReaders
;
++
i
)
{
STsdbReader
*
reader
=
taosArrayGetP
(
pInfo
->
dataReaders
,
i
);
tsdbReaderClose
(
reader
);
}
taosArrayDestroy
(
pInfo
->
dataReaders
);
pInfo
->
dataReaders
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
SSDataBlock
*
getSortedTableMergeScanBlockData
(
SSortHandle
*
pHandle
,
int32_t
capacity
,
SOperatorInfo
*
pOperator
)
{
SSDataBlock
*
getSortedTableMergeScanBlockData
(
SSortHandle
*
pHandle
,
SSDataBlock
*
pResBlock
,
int32_t
capacity
,
SOperatorInfo
*
pOperator
)
{
STableMergeScanInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SSDataBlock
*
p
=
tsortGetSortedDataBlock
(
pHandle
);
if
(
p
==
NULL
)
{
return
NULL
;
}
blockDataEnsureCapacity
(
p
,
capacity
);
blockDataCleanup
(
pResBlock
);
blockDataEnsureCapacity
(
pResBlock
,
capacity
);
while
(
1
)
{
STupleHandle
*
pTupleHandle
=
tsortNextTuple
(
pHandle
);
...
...
@@ -2899,14 +2861,15 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa
break
;
}
appendOneRowToDataBlock
(
p
,
pTupleHandle
);
if
(
p
->
info
.
rows
>=
capacity
)
{
appendOneRowToDataBlock
(
p
ResBlock
,
pTupleHandle
);
if
(
p
ResBlock
->
info
.
rows
>=
capacity
)
{
break
;
}
}
qDebug
(
"%s get sorted row blocks, rows:%d"
,
GET_TASKID
(
pTaskInfo
),
p
->
info
.
rows
);
return
(
p
->
info
.
rows
>
0
)
?
p
:
NULL
;
qDebug
(
"%s get sorted row blocks, rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pResBlock
->
info
.
rows
);
return
(
pResBlock
->
info
.
rows
>
0
)
?
pResBlock
:
NULL
;
}
SSDataBlock
*
doTableMergeScan
(
SOperatorInfo
*
pOperator
)
{
...
...
@@ -2935,7 +2898,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
}
SSDataBlock
*
pBlock
=
NULL
;
while
(
pInfo
->
tableStartIndex
<
tableListSize
)
{
pBlock
=
getSortedTableMergeScanBlockData
(
pInfo
->
pSortHandle
,
pOperator
->
resultInfo
.
capacity
,
pOperator
);
pBlock
=
getSortedTableMergeScanBlockData
(
pInfo
->
pSortHandle
,
p
Info
->
pResBlock
,
p
Operator
->
resultInfo
.
capacity
,
pOperator
);
if
(
pBlock
!=
NULL
)
{
pBlock
->
info
.
groupId
=
pInfo
->
groupId
;
pOperator
->
resultInfo
.
totalRows
+=
pBlock
->
info
.
rows
;
...
...
@@ -2959,6 +2922,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
void
destroyTableMergeScanOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
STableMergeScanInfo
*
pTableScanInfo
=
(
STableMergeScanInfo
*
)
param
;
cleanupQueryTableDataCond
(
&
pTableScanInfo
->
cond
);
taosArrayDestroy
(
pTableScanInfo
->
sortSourceParams
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTableScanInfo
->
dataReaders
);
++
i
)
{
STsdbReader
*
reader
=
taosArrayGetP
(
pTableScanInfo
->
dataReaders
,
i
);
...
...
@@ -2974,7 +2938,9 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
pTableScanInfo
->
pSortInputBlock
=
blockDataDestroy
(
pTableScanInfo
->
pSortInputBlock
);
taosArrayDestroy
(
pTableScanInfo
->
pSortInfo
);
cleanupExprSupp
(
&
pTableScanInfo
->
pseudoSup
);
taosMemoryFreeClear
(
pTableScanInfo
->
rowEntryInfoOffset
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -3031,8 +2997,9 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
}
if
(
pTableScanNode
->
scan
.
pScanPseudoCols
!=
NULL
)
{
pInfo
->
pPseudoExpr
=
createExprInfo
(
pTableScanNode
->
scan
.
pScanPseudoCols
,
NULL
,
&
pInfo
->
numOfPseudoExpr
);
pInfo
->
pPseudoCtx
=
createSqlFunctionCtx
(
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
&
pInfo
->
rowEntryInfoOffset
);
SExprSupp
*
pSup
=
&
pInfo
->
pseudoSup
;
pSup
->
pExprInfo
=
createExprInfo
(
pTableScanNode
->
scan
.
pScanPseudoCols
,
NULL
,
&
pSup
->
numOfExprs
);
pSup
->
pCtx
=
createSqlFunctionCtx
(
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
&
pSup
->
rowEntryInfoOffset
);
}
pInfo
->
scanInfo
=
(
SScanInfo
){.
numOfAsc
=
pTableScanNode
->
scanSeq
[
0
],
.
numOfDesc
=
pTableScanNode
->
scanSeq
[
1
]};
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
352965f7
...
...
@@ -375,6 +375,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
CLONE_NODE_FIELD
(
pMergeCondition
);
CLONE_NODE_FIELD
(
pOnConditions
);
COPY_SCALAR_FIELD
(
isSingleTableJoin
);
COPY_SCALAR_FIELD
(
inputTsOrder
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -440,6 +441,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
COPY_SCALAR_FIELD
(
watermark
);
COPY_SCALAR_FIELD
(
igExpired
);
COPY_SCALAR_FIELD
(
windowAlgo
);
COPY_SCALAR_FIELD
(
inputTsOrder
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
352965f7
...
...
@@ -1717,7 +1717,7 @@ static const char* jkJoinPhysiPlanOnConditions = "OnConditions";
static
const
char
*
jkJoinPhysiPlanTargets
=
"Targets"
;
static
int32_t
physiJoinNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
S
JoinPhysiNode
*
pNode
=
(
const
S
JoinPhysiNode
*
)
pObj
;
const
S
SortMergeJoinPhysiNode
*
pNode
=
(
const
SSortMerge
JoinPhysiNode
*
)
pObj
;
int32_t
code
=
physicPlanNodeToJson
(
pObj
,
pJson
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
@@ -1737,7 +1737,7 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
}
static
int32_t
jsonToPhysiJoinNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
S
JoinPhysiNode
*
pNode
=
(
S
JoinPhysiNode
*
)
pObj
;
S
SortMergeJoinPhysiNode
*
pNode
=
(
SSortMerge
JoinPhysiNode
*
)
pObj
;
int32_t
code
=
jsonToPhysicPlanNode
(
pJson
,
pObj
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
source/libs/nodes/src/nodesTraverseFuncs.c
浏览文件 @
352965f7
...
...
@@ -468,7 +468,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
:
{
S
JoinPhysiNode
*
pJoin
=
(
S
JoinPhysiNode
*
)
pNode
;
S
SortMergeJoinPhysiNode
*
pJoin
=
(
SSortMerge
JoinPhysiNode
*
)
pNode
;
res
=
walkPhysiNode
((
SPhysiNode
*
)
pNode
,
order
,
walker
,
pContext
);
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
walkPhysiPlan
(
pJoin
->
pMergeCondition
,
order
,
walker
,
pContext
);
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
352965f7
...
...
@@ -287,7 +287,7 @@ SNode* nodesMakeNode(ENodeType type) {
case
QUERY_NODE_PHYSICAL_PLAN_PROJECT
:
return
makeNode
(
type
,
sizeof
(
SProjectPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
:
return
makeNode
(
type
,
sizeof
(
SJoinPhysiNode
));
return
makeNode
(
type
,
sizeof
(
S
SortMerge
JoinPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG
:
return
makeNode
(
type
,
sizeof
(
SAggPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
:
...
...
@@ -883,7 +883,7 @@ void nodesDestroyNode(SNode* pNode) {
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
:
{
S
JoinPhysiNode
*
pPhyNode
=
(
S
JoinPhysiNode
*
)
pNode
;
S
SortMergeJoinPhysiNode
*
pPhyNode
=
(
SSortMerge
JoinPhysiNode
*
)
pNode
;
destroyPhysiNode
((
SPhysiNode
*
)
pPhyNode
);
nodesDestroyNode
(
pPhyNode
->
pMergeCondition
);
nodesDestroyNode
(
pPhyNode
->
pOnConditions
);
...
...
source/libs/parser/inc/parAst.h
浏览文件 @
352965f7
...
...
@@ -90,7 +90,7 @@ SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken*
SNode
*
createDurationValueNode
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pLiteral
);
SNode
*
createDefaultDatabaseCondValue
(
SAstCreateContext
*
pCxt
);
SNode
*
createPlaceholderValueNode
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pLiteral
);
SNode
*
setProjectionAlias
(
SAstCreateContext
*
pCxt
,
SNode
*
pNode
,
const
SToken
*
pAlias
);
SNode
*
setProjectionAlias
(
SAstCreateContext
*
pCxt
,
SNode
*
pNode
,
SToken
*
pAlias
);
SNode
*
createLogicConditionNode
(
SAstCreateContext
*
pCxt
,
ELogicConditionType
type
,
SNode
*
pParam1
,
SNode
*
pParam2
);
SNode
*
createOperatorNode
(
SAstCreateContext
*
pCxt
,
EOperatorType
type
,
SNode
*
pLeft
,
SNode
*
pRight
);
SNode
*
createBetweenAnd
(
SAstCreateContext
*
pCxt
,
SNode
*
pExpr
,
SNode
*
pLeft
,
SNode
*
pRight
);
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
352965f7
...
...
@@ -527,6 +527,7 @@ SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const STok
}
if
(
QUERY_NODE_SELECT_STMT
==
nodeType
(
pSubquery
))
{
strcpy
(((
SSelectStmt
*
)
pSubquery
)
->
stmtName
,
tempTable
->
table
.
tableAlias
);
((
SSelectStmt
*
)
pSubquery
)
->
isSubquery
=
true
;
}
else
if
(
QUERY_NODE_SET_OPERATOR
==
nodeType
(
pSubquery
))
{
strcpy
(((
SSetOperator
*
)
pSubquery
)
->
stmtName
,
tempTable
->
table
.
tableAlias
);
}
...
...
@@ -637,8 +638,9 @@ SNode* createInterpTimeRange(SAstCreateContext* pCxt, SNode* pStart, SNode* pEnd
return
createBetweenAnd
(
pCxt
,
createPrimaryKeyCol
(
pCxt
),
pStart
,
pEnd
);
}
SNode
*
setProjectionAlias
(
SAstCreateContext
*
pCxt
,
SNode
*
pNode
,
const
SToken
*
pAlias
)
{
SNode
*
setProjectionAlias
(
SAstCreateContext
*
pCxt
,
SNode
*
pNode
,
SToken
*
pAlias
)
{
CHECK_PARSER_STATUS
(
pCxt
);
trimEscape
(
pAlias
);
int32_t
len
=
TMIN
(
sizeof
(((
SExprNode
*
)
pNode
)
->
aliasName
)
-
1
,
pAlias
->
n
);
strncpy
(((
SExprNode
*
)
pNode
)
->
aliasName
,
pAlias
->
z
,
len
);
((
SExprNode
*
)
pNode
)
->
aliasName
[
len
]
=
'\0'
;
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
352965f7
...
...
@@ -739,12 +739,13 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
return
TSDB_CODE_SUCCESS
;
}
static
void
buildCreateTbReq
(
SVCreateTbReq
*
pTbReq
,
const
char
*
tname
,
STag
*
pTag
,
int64_t
suid
,
const
char
*
sname
,
SArray
*
tagName
,
uint8_t
tagNum
)
{
static
void
buildCreateTbReq
(
SVCreateTbReq
*
pTbReq
,
const
char
*
tname
,
STag
*
pTag
,
int64_t
suid
,
const
char
*
sname
,
SArray
*
tagName
,
uint8_t
tagNum
)
{
pTbReq
->
type
=
TD_CHILD_TABLE
;
pTbReq
->
name
=
strdup
(
tname
);
pTbReq
->
ctb
.
suid
=
suid
;
pTbReq
->
ctb
.
tagNum
=
tagNum
;
if
(
sname
)
pTbReq
->
ctb
.
name
=
strdup
(
sname
);
if
(
sname
)
pTbReq
->
ctb
.
name
=
strdup
(
sname
);
pTbReq
->
ctb
.
pTag
=
(
uint8_t
*
)
pTag
;
pTbReq
->
ctb
.
tagName
=
taosArrayDup
(
tagName
);
pTbReq
->
commentLen
=
-
1
;
...
...
@@ -969,7 +970,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
}
SSchema
*
pTagSchema
=
&
pSchema
[
pCxt
->
tags
.
boundColumns
[
i
]];
char
tmpTokenBuf
[
TSDB_MAX_BYTES_PER_ROW
]
=
{
0
};
// todo this can be optimize with parse column
char
tmpTokenBuf
[
TSDB_MAX_BYTES_PER_ROW
]
=
{
0
};
// todo this can be optimize with parse column
code
=
checkAndTrimValue
(
&
sToken
,
tmpTokenBuf
,
&
pCxt
->
msg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
...
...
@@ -1012,7 +1013,8 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
goto
end
;
}
buildCreateTbReq
(
&
pCxt
->
createTblReq
,
tName
,
pTag
,
pCxt
->
pTableMeta
->
suid
,
pCxt
->
sTableName
,
tagName
,
pCxt
->
pTableMeta
->
tableInfo
.
numOfTags
);
buildCreateTbReq
(
&
pCxt
->
createTblReq
,
tName
,
pTag
,
pCxt
->
pTableMeta
->
suid
,
pCxt
->
sTableName
,
tagName
,
pCxt
->
pTableMeta
->
tableInfo
.
numOfTags
);
end:
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pTagVals
);
++
i
)
{
...
...
@@ -1650,7 +1652,6 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) {
static
int32_t
collectTableMetaKey
(
SInsertParseSyntaxCxt
*
pCxt
,
SToken
*
pTbToken
)
{
SName
name
;
CHECK_CODE
(
createSName
(
&
name
,
pTbToken
,
pCxt
->
pComCxt
->
acctId
,
pCxt
->
pComCxt
->
db
,
&
pCxt
->
msg
));
CHECK_CODE
(
reserveDbCfgInCache
(
pCxt
->
pComCxt
->
acctId
,
name
.
dbname
,
pCxt
->
pMetaCache
));
CHECK_CODE
(
reserveUserAuthInCacheExt
(
pCxt
->
pComCxt
->
pUser
,
&
name
,
AUTH_TYPE_WRITE
,
pCxt
->
pMetaCache
));
CHECK_CODE
(
reserveTableMetaInCacheExt
(
&
name
,
pCxt
->
pMetaCache
));
CHECK_CODE
(
reserveTableVgroupInCacheExt
(
&
name
,
pCxt
->
pMetaCache
));
...
...
@@ -2332,7 +2333,8 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
return
ret
;
}
buildCreateTbReq
(
&
smlHandle
->
tableExecHandle
.
createTblReq
,
tableName
,
pTag
,
pTableMeta
->
suid
,
NULL
,
tagName
,
pTableMeta
->
tableInfo
.
numOfTags
);
buildCreateTbReq
(
&
smlHandle
->
tableExecHandle
.
createTblReq
,
tableName
,
pTag
,
pTableMeta
->
suid
,
NULL
,
tagName
,
pTableMeta
->
tableInfo
.
numOfTags
);
taosArrayDestroy
(
tagName
);
smlHandle
->
tableExecHandle
.
createTblReq
.
ctb
.
name
=
taosMemoryMalloc
(
sTableNameLen
+
1
);
...
...
source/libs/parser/src/parUtil.c
浏览文件 @
352965f7
...
...
@@ -92,7 +92,7 @@ static char* getSyntaxErrFormat(int32_t errCode) {
case
TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG
:
return
"sliding value no larger than the interval value"
;
case
TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL
:
return
"sliding value can not less than 1% of interval value"
;
return
"sliding value can not less than 1%
%
of interval value"
;
case
TSDB_CODE_PAR_ONLY_ONE_JSON_TAG
:
return
"Only one tag if there is a json tag"
;
case
TSDB_CODE_PAR_INCORRECT_NUM_OF_COL
:
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
352965f7
...
...
@@ -339,6 +339,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pJoin
->
joinType
=
pJoinTable
->
joinType
;
pJoin
->
isSingleTableJoin
=
pJoinTable
->
table
.
singleTable
;
pJoin
->
inputTsOrder
=
ORDER_ASC
;
pJoin
->
node
.
groupAction
=
GROUP_ACTION_CLEAR
;
pJoin
->
node
.
requireDataOrder
=
DATA_ORDER_LEVEL_GLOBAL
;
pJoin
->
node
.
requireDataOrder
=
DATA_ORDER_LEVEL_GLOBAL
;
...
...
@@ -625,14 +626,14 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
static
int32_t
createWindowLogicNodeFinalize
(
SLogicPlanContext
*
pCxt
,
SSelectStmt
*
pSelect
,
SWindowLogicNode
*
pWindow
,
SLogicNode
**
pLogicNode
)
{
int32_t
code
=
nodesCollectFuncs
(
pSelect
,
SQL_CLAUSE_WINDOW
,
fmIsWindowClauseFunc
,
&
pWindow
->
pFuncs
);
if
(
pCxt
->
pPlanCxt
->
streamQuery
)
{
pWindow
->
triggerType
=
pCxt
->
pPlanCxt
->
triggerType
;
pWindow
->
watermark
=
pCxt
->
pPlanCxt
->
watermark
;
pWindow
->
igExpired
=
pCxt
->
pPlanCxt
->
igExpired
;
}
pWindow
->
inputTsOrder
=
ORDER_ASC
;
int32_t
code
=
nodesCollectFuncs
(
pSelect
,
SQL_CLAUSE_WINDOW
,
fmIsWindowClauseFunc
,
&
pWindow
->
pFuncs
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
rewriteExprsForSelect
(
pWindow
->
pFuncs
,
pSelect
,
SQL_CLAUSE_WINDOW
);
}
...
...
@@ -861,7 +862,8 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel
TSWAP
(
pProject
->
node
.
pLimit
,
pSelect
->
pLimit
);
TSWAP
(
pProject
->
node
.
pSlimit
,
pSelect
->
pSlimit
);
pProject
->
node
.
groupAction
=
GROUP_ACTION_CLEAR
;
pProject
->
node
.
groupAction
=
(
!
pSelect
->
isSubquery
&&
pCxt
->
pPlanCxt
->
streamQuery
)
?
GROUP_ACTION_KEEP
:
GROUP_ACTION_CLEAR
;
pProject
->
node
.
requireDataOrder
=
DATA_ORDER_LEVEL_NONE
;
pProject
->
node
.
resultDataOrder
=
DATA_ORDER_LEVEL_NONE
;
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
352965f7
...
...
@@ -993,25 +993,28 @@ static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) {
}
static
int32_t
sortPriKeyOptGetScanNodesImpl
(
SLogicNode
*
pNode
,
bool
*
pNotOptimize
,
SNodeList
**
pScanNodes
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
switch
(
nodeType
(
pNode
))
{
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
if
(
TSDB_SUPER_TABLE
!=
((
SScanLogicNode
*
)
pNode
)
->
tableType
)
{
return
nodesListMakeAppend
(
pScanNodes
,
(
SNode
*
)
pNode
);
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
{
SScanLogicNode
*
pScan
=
(
SScanLogicNode
*
)
pNode
;
if
(
NULL
!=
pScan
->
pGroupTags
)
{
*
pNotOptimize
=
true
;
return
TSDB_CODE_SUCCESS
;
}
break
;
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
code
=
return
nodesListMakeAppend
(
pScanNodes
,
(
SNode
*
)
pNode
);
}
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
{
int32_t
code
=
sortPriKeyOptGetScanNodesImpl
((
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
0
),
pNotOptimize
,
pScanNodes
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
sortPriKeyOptGetScanNodesImpl
((
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
1
),
pNotOptimize
,
pScanNodes
);
}
return
code
;
}
case
QUERY_NODE_LOGIC_PLAN_AGG
:
case
QUERY_NODE_LOGIC_PLAN_PARTITION
:
*
pNotOptimize
=
true
;
return
code
;
return
TSDB_CODE_SUCCESS
;
default:
break
;
}
...
...
@@ -1037,17 +1040,33 @@ static EOrder sortPriKeyOptGetPriKeyOrder(SSortLogicNode* pSort) {
return
((
SOrderByExprNode
*
)
nodesListGetNode
(
pSort
->
pSortKeys
,
0
))
->
order
;
}
static
void
sortPriKeyOptSetParentOrder
(
SLogicNode
*
pNode
,
EOrder
order
)
{
if
(
NULL
==
pNode
)
{
return
;
}
if
(
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pNode
))
{
((
SWindowLogicNode
*
)
pNode
)
->
inputTsOrder
=
order
;
}
else
if
(
QUERY_NODE_LOGIC_PLAN_JOIN
==
nodeType
(
pNode
))
{
((
SJoinLogicNode
*
)
pNode
)
->
inputTsOrder
=
order
;
}
sortPriKeyOptSetParentOrder
(
pNode
->
pParent
,
order
);
}
static
int32_t
sortPriKeyOptApply
(
SOptimizeContext
*
pCxt
,
SLogicSubplan
*
pLogicSubplan
,
SSortLogicNode
*
pSort
,
SNodeList
*
pScanNodes
)
{
EOrder
order
=
sortPriKeyOptGetPriKeyOrder
(
pSort
);
if
(
ORDER_DESC
==
order
)
{
SNode
*
pScanNode
=
NULL
;
FOREACH
(
pScanNode
,
pScanNodes
)
{
SScanLogicNode
*
pScan
=
(
SScanLogicNode
*
)
pScanNode
;
if
(
pScan
->
scanSeq
[
0
]
>
0
)
{
TSWAP
(
pScan
->
scanSeq
[
0
],
pScan
->
scanSeq
[
1
]);
}
SNode
*
pScanNode
=
NULL
;
FOREACH
(
pScanNode
,
pScanNodes
)
{
SScanLogicNode
*
pScan
=
(
SScanLogicNode
*
)
pScanNode
;
if
(
ORDER_DESC
==
order
&&
pScan
->
scanSeq
[
0
]
>
0
)
{
TSWAP
(
pScan
->
scanSeq
[
0
],
pScan
->
scanSeq
[
1
]);
}
if
(
TSDB_SUPER_TABLE
==
pScan
->
tableType
)
{
pScan
->
scanType
=
SCAN_TYPE_TABLE_MERGE
;
pScan
->
node
.
resultDataOrder
=
DATA_ORDER_LEVEL_GLOBAL
;
pScan
->
node
.
requireDataOrder
=
DATA_ORDER_LEVEL_GLOBAL
;
}
sortPriKeyOptSetParentOrder
(
pScan
->
node
.
pParent
,
order
);
}
SLogicNode
*
pChild
=
(
SLogicNode
*
)
nodesListGetNode
(
pSort
->
node
.
pChildren
,
0
);
...
...
@@ -1613,10 +1632,10 @@ static void alignProjectionWithTarget(SLogicNode* pNode) {
}
SProjectLogicNode
*
pProjectNode
=
(
SProjectLogicNode
*
)
pNode
;
SNode
*
pProjection
=
NULL
;
SNode
*
pProjection
=
NULL
;
FOREACH
(
pProjection
,
pProjectNode
->
pProjections
)
{
SNode
*
pTarget
=
NULL
;
bool
keep
=
false
;
bool
keep
=
false
;
FOREACH
(
pTarget
,
pNode
->
pTargets
)
{
if
(
0
==
strcmp
(((
SColumnNode
*
)
pProjection
)
->
node
.
aliasName
,
((
SColumnNode
*
)
pTarget
)
->
colName
))
{
keep
=
true
;
...
...
@@ -2214,7 +2233,7 @@ static bool tagScanMayBeOptimized(SLogicNode* pNode) {
!
planOptNodeListHasTbname
(
pAgg
->
pGroupKeys
))
{
return
false
;
}
SNode
*
pGroupKey
=
NULL
;
FOREACH
(
pGroupKey
,
pAgg
->
pGroupKeys
)
{
SNode
*
pGroup
=
NULL
;
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
352965f7
...
...
@@ -415,7 +415,6 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pS
SScanPhysiNode
*
pScanPhysiNode
,
SPhysiNode
**
pPhyNode
)
{
int32_t
code
=
createScanCols
(
pCxt
,
pScanPhysiNode
,
pScanLogicNode
->
pScanCols
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
// Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
code
=
addDataBlockSlots
(
pCxt
,
pScanPhysiNode
->
pScanCols
,
pScanPhysiNode
->
node
.
pOutputDataBlockDesc
);
}
...
...
@@ -622,8 +621,8 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
static
int32_t
createJoinPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SJoinLogicNode
*
pJoinLogicNode
,
SPhysiNode
**
pPhyNode
)
{
SJoinPhysiNode
*
pJoin
=
(
SJoinPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pJoinLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
);
S
SortMerge
JoinPhysiNode
*
pJoin
=
(
S
SortMerge
JoinPhysiNode
*
)
makePhysiNode
(
pCxt
,
(
SLogicNode
*
)
pJoinLogicNode
,
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
);
if
(
NULL
==
pJoin
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -975,6 +974,9 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
}
static
bool
projectCanMergeDataBlock
(
SProjectLogicNode
*
pProject
)
{
if
(
GROUP_ACTION_KEEP
==
pProject
->
node
.
groupAction
)
{
return
false
;
}
if
(
DATA_ORDER_LEVEL_NONE
==
pProject
->
node
.
resultDataOrder
)
{
return
true
;
}
...
...
source/libs/planner/src/planSpliter.c
浏览文件 @
352965f7
...
...
@@ -469,7 +469,7 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent
return
code
;
}
static
int32_t
stbSplCreateMergeKeysByPrimaryKey
(
SNode
*
pPrimaryKey
,
SNodeList
**
pMergeKeys
)
{
static
int32_t
stbSplCreateMergeKeysByPrimaryKey
(
SNode
*
pPrimaryKey
,
EOrder
order
,
SNodeList
**
pMergeKeys
)
{
SOrderByExprNode
*
pMergeKey
=
(
SOrderByExprNode
*
)
nodesMakeNode
(
QUERY_NODE_ORDER_BY_EXPR
);
if
(
NULL
==
pMergeKey
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -479,7 +479,7 @@ static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, SNodeList**
nodesDestroyNode
((
SNode
*
)
pMergeKey
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pMergeKey
->
order
=
ORDER_ASC
;
pMergeKey
->
order
=
order
;
pMergeKey
->
nullOrder
=
NULL_ORDER_FIRST
;
return
nodesListMakeStrictAppend
(
pMergeKeys
,
(
SNode
*
)
pMergeKey
);
}
...
...
@@ -491,7 +491,8 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
((
SWindowLogicNode
*
)
pPartWindow
)
->
windowAlgo
=
INTERVAL_ALGO_HASH
;
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
windowAlgo
=
INTERVAL_ALGO_MERGE
;
SNodeList
*
pMergeKeys
=
NULL
;
code
=
stbSplCreateMergeKeysByPrimaryKey
(((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
pTspk
,
&
pMergeKeys
);
code
=
stbSplCreateMergeKeysByPrimaryKey
(((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
pTspk
,
((
SWindowLogicNode
*
)
pInfo
->
pSplitNode
)
->
inputTsOrder
,
&
pMergeKeys
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
stbSplCreateMergeNode
(
pCxt
,
NULL
,
pInfo
->
pSplitNode
,
pMergeKeys
,
pPartWindow
,
true
);
}
...
...
@@ -579,7 +580,8 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl
SLogicNode
*
pChild
=
(
SLogicNode
*
)
nodesListGetNode
(
pWindow
->
pChildren
,
0
);
SNodeList
*
pMergeKeys
=
NULL
;
int32_t
code
=
stbSplCreateMergeKeysByPrimaryKey
(((
SWindowLogicNode
*
)
pWindow
)
->
pTspk
,
&
pMergeKeys
);
int32_t
code
=
stbSplCreateMergeKeysByPrimaryKey
(((
SWindowLogicNode
*
)
pWindow
)
->
pTspk
,
((
SWindowLogicNode
*
)
pWindow
)
->
inputTsOrder
,
&
pMergeKeys
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
stbSplCreateMergeNode
(
pCxt
,
pInfo
->
pSubplan
,
pChild
,
pMergeKeys
,
(
SLogicNode
*
)
pChild
,
true
);
...
...
@@ -913,27 +915,70 @@ static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplit
}
static
SNode
*
stbSplFindPrimaryKeyFromScan
(
SScanLogicNode
*
pScan
)
{
bool
find
=
false
;
SNode
*
pCol
=
NULL
;
FOREACH
(
pCol
,
pScan
->
pScanCols
)
{
if
(
PRIMARYKEY_TIMESTAMP_COL_ID
==
((
SColumnNode
*
)
pCol
)
->
colId
)
{
find
=
true
;
break
;
}
}
if
(
!
find
)
{
return
NULL
;
}
SNode
*
pTarget
=
NULL
;
FOREACH
(
pTarget
,
pScan
->
node
.
pTargets
)
{
if
(
nodesEqualNode
(
pTarget
,
pCol
))
{
return
pCol
;
}
}
return
NULL
;
nodesListStrictAppend
(
pScan
->
node
.
pTargets
,
nodesCloneNode
(
pCol
));
return
pCol
;
}
static
int32_t
stbSplCreateMergeScanNode
(
SScanLogicNode
*
pScan
,
SLogicNode
**
pOutputMergeScan
,
SNodeList
**
pOutputMergeKeys
)
{
SNodeList
*
pChildren
=
pScan
->
node
.
pChildren
;
pScan
->
node
.
pChildren
=
NULL
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SScanLogicNode
*
pMergeScan
=
(
SScanLogicNode
*
)
nodesCloneNode
((
SNode
*
)
pScan
);
if
(
NULL
==
pMergeScan
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
SNodeList
*
pMergeKeys
=
NULL
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pMergeScan
->
scanType
=
SCAN_TYPE_TABLE_MERGE
;
pMergeScan
->
node
.
pChildren
=
pChildren
;
splSetParent
((
SLogicNode
*
)
pMergeScan
);
code
=
stbSplCreateMergeKeysByPrimaryKey
(
stbSplFindPrimaryKeyFromScan
(
pMergeScan
),
pMergeScan
->
scanSeq
[
0
]
>
0
?
ORDER_ASC
:
ORDER_DESC
,
&
pMergeKeys
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pOutputMergeScan
=
(
SLogicNode
*
)
pMergeScan
;
*
pOutputMergeKeys
=
pMergeKeys
;
}
else
{
nodesDestroyNode
((
SNode
*
)
pMergeScan
);
nodesDestroyList
(
pMergeKeys
);
}
return
code
;
}
static
int32_t
stbSplSplitMergeScanNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SScanLogicNode
*
pScan
,
bool
groupSort
)
{
SNodeList
*
pMergeKeys
=
NULL
;
int32_t
code
=
stbSplCreateMergeKeysByPrimaryKey
(
stbSplFindPrimaryKeyFromScan
(
pScan
),
&
pMergeKeys
);
SLogicNode
*
pMergeScan
=
NULL
;
SNodeList
*
pMergeKeys
=
NULL
;
int32_t
code
=
stbSplCreateMergeScanNode
(
pScan
,
&
pMergeScan
,
&
pMergeKeys
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
stbSplCreateMergeNode
(
pCxt
,
pSubplan
,
(
SLogicNode
*
)
pScan
,
pMergeKeys
,
(
SLogicNode
*
)
p
Scan
,
groupSort
);
code
=
stbSplCreateMergeNode
(
pCxt
,
pSubplan
,
(
SLogicNode
*
)
pScan
,
pMergeKeys
,
pMerge
Scan
,
groupSort
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesListMakeStrictAppend
(
&
pSubplan
->
pChildren
,
(
SNode
*
)
splCreateScanSubplan
(
pCxt
,
(
SLogicNode
*
)
p
Scan
,
SPLIT_FLAG_STABLE_SPLIT
));
(
SNode
*
)
splCreateScanSubplan
(
pCxt
,
pMerge
Scan
,
SPLIT_FLAG_STABLE_SPLIT
));
}
pScan
->
scanType
=
SCAN_TYPE_TABLE_MERGE
;
++
(
pCxt
->
groupId
);
return
code
;
}
...
...
@@ -978,14 +1023,14 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
}
static
int32_t
stbSplCreateMergeKeysForPartitionNode
(
SLogicNode
*
pPart
,
SNodeList
**
pMergeKeys
)
{
S
Node
*
pPrimaryKey
=
nodesCloneNode
(
stbSplFindPrimaryKeyFromScan
((
SScanLogicNode
*
)
nodesListGetNode
(
pPart
->
pChildren
,
0
)
));
S
ScanLogicNode
*
pScan
=
(
SScanLogicNode
*
)
nodesListGetNode
(
pPart
->
pChildren
,
0
);
SNode
*
pPrimaryKey
=
nodesCloneNode
(
stbSplFindPrimaryKeyFromScan
(
pScan
));
if
(
NULL
==
pPrimaryKey
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
code
=
nodesListAppend
(
pPart
->
pTargets
,
pPrimaryKey
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
stbSplCreateMergeKeysByPrimaryKey
(
pPrimaryKey
,
pMergeKeys
);
code
=
stbSplCreateMergeKeysByPrimaryKey
(
pPrimaryKey
,
p
Scan
->
scanSeq
[
0
]
>
0
?
ORDER_ASC
:
ORDER_DESC
,
p
MergeKeys
);
}
return
code
;
}
...
...
source/libs/planner/src/planUtil.c
浏览文件 @
352965f7
...
...
@@ -124,7 +124,8 @@ int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode*
}
static
int32_t
adjustScanDataRequirement
(
SScanLogicNode
*
pScan
,
EDataOrderLevel
requirement
)
{
if
(
SCAN_TYPE_TABLE
!=
pScan
->
scanType
&&
SCAN_TYPE_TABLE_MERGE
!=
pScan
->
scanType
)
{
if
((
SCAN_TYPE_TABLE
!=
pScan
->
scanType
&&
SCAN_TYPE_TABLE_MERGE
!=
pScan
->
scanType
)
||
DATA_ORDER_LEVEL_GLOBAL
==
pScan
->
node
.
requireDataOrder
)
{
return
TSDB_CODE_SUCCESS
;
}
// The lowest sort level of scan output data is DATA_ORDER_LEVEL_IN_BLOCK
...
...
source/libs/planner/test/planBasicTest.cpp
浏览文件 @
352965f7
...
...
@@ -24,9 +24,10 @@ TEST_F(PlanBasicTest, selectClause) {
useDb
(
"root"
,
"test"
);
run
(
"SELECT * FROM t1"
);
run
(
"SELECT 1 FROM t1"
);
run
(
"SELECT * FROM st1"
);
run
(
"SELECT 1 FROM st1"
);
run
(
"SELECT MAX(c1) c2, c2 FROM t1"
);
run
(
"SELECT MAX(c1) c2, c2 FROM st1"
);
}
TEST_F
(
PlanBasicTest
,
whereClause
)
{
...
...
source/libs/planner/test/planOptimizeTest.cpp
浏览文件 @
352965f7
...
...
@@ -53,6 +53,8 @@ TEST_F(PlanOptimizeTest, sortPrimaryKey) {
run
(
"SELECT c1 FROM t1 ORDER BY ts"
);
run
(
"SELECT c1 FROM st1 ORDER BY ts"
);
run
(
"SELECT c1 FROM t1 ORDER BY ts DESC"
);
run
(
"SELECT COUNT(*) FROM t1 INTERVAL(10S) ORDER BY _WSTART DESC"
);
...
...
source/libs/wal/src/walMgmt.c
浏览文件 @
352965f7
...
...
@@ -93,7 +93,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
}
// init ref
pWal
->
pRefHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_
U
BIGINT
),
true
,
HASH_ENTRY_LOCK
);
pWal
->
pRefHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_ENTRY_LOCK
);
if
(
pWal
->
pRefHash
==
NULL
)
{
taosMemoryFree
(
pWal
);
return
NULL
;
...
...
source/libs/wal/src/walRead.c
浏览文件 @
352965f7
...
...
@@ -21,107 +21,112 @@ static int32_t walFetchBodyNew(SWalReader *pRead);
static
int32_t
walSkipFetchBodyNew
(
SWalReader
*
pRead
);
SWalReader
*
walOpenReader
(
SWal
*
pWal
,
SWalFilterCond
*
cond
)
{
SWalReader
*
pRead
=
taosMemoryCalloc
(
1
,
sizeof
(
SWalReader
));
if
(
pRead
==
NULL
)
{
SWalReader
*
pRead
er
=
taosMemoryCalloc
(
1
,
sizeof
(
SWalReader
));
if
(
pRead
er
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pRead
->
pWal
=
pWal
;
pRead
->
pIdxFile
=
NULL
;
pRead
->
pLogFile
=
NULL
;
pRead
->
curVersion
=
-
1
;
pRead
->
curFileFirstVer
=
-
1
;
pRead
->
curInvalid
=
1
;
pRead
->
capacity
=
0
;
pReader
->
pWal
=
pWal
;
pReader
->
readerId
=
tGenIdPI64
();
pReader
->
pIdxFile
=
NULL
;
pReader
->
pLogFile
=
NULL
;
pReader
->
curVersion
=
-
1
;
pReader
->
curFileFirstVer
=
-
1
;
pReader
->
curInvalid
=
1
;
pReader
->
capacity
=
0
;
if
(
cond
)
{
pRead
->
cond
=
*
cond
;
pRead
er
->
cond
=
*
cond
;
}
else
{
pRead
->
cond
.
scanMeta
=
0
;
pRead
->
cond
.
scanUncommited
=
0
;
pRead
->
cond
.
enableRef
=
0
;
pReader
->
cond
.
scanUncommited
=
0
;
pReader
->
cond
.
scanNotApplied
=
0
;
pReader
->
cond
.
scanMeta
=
0
;
pReader
->
cond
.
enableRef
=
0
;
}
taosThreadMutexInit
(
&
pRead
->
mutex
,
NULL
);
taosThreadMutexInit
(
&
pRead
er
->
mutex
,
NULL
);
/*if (pRead->cond.enableRef) {*/
/*walOpenRef(pWal);*/
/*}*/
pRead
->
pHead
=
taosMemoryMalloc
(
sizeof
(
SWalCkHead
));
if
(
pRead
->
pHead
==
NULL
)
{
pReader
->
pHead
=
taosMemoryMalloc
(
sizeof
(
SWalCkHead
));
if
(
pReader
->
pHead
==
NULL
)
{
terrno
=
TSDB_CODE_WAL_OUT_OF_MEMORY
;
taosMemoryFree
(
pRead
);
taosMemoryFree
(
pRead
er
);
return
NULL
;
}
return
pRead
;
/*if (pReader->cond.enableRef) {*/
/* taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/
/*}*/
return
pReader
;
}
void
walCloseReader
(
SWalReader
*
pRead
)
{
taosCloseFile
(
&
pRead
->
pIdxFile
);
taosCloseFile
(
&
pRead
->
pLogFile
);
taosMemoryFreeClear
(
pRead
->
pHead
);
taosMemoryFree
(
pRead
);
void
walCloseReader
(
SWalReader
*
pReader
)
{
taosCloseFile
(
&
pReader
->
pIdxFile
);
taosCloseFile
(
&
pReader
->
pLogFile
);
/*if (pReader->cond.enableRef) {*/
/*taosHashRemove(pReader->pWal->pRefHash, &pReader->readerId, sizeof(int64_t));*/
/*}*/
taosMemoryFreeClear
(
pReader
->
pHead
);
taosMemoryFree
(
pReader
);
}
int32_t
walNextValidMsg
(
SWalReader
*
pRead
)
{
int64_t
fetchVer
=
pRead
->
curVersion
;
int64_t
lastVer
=
walGetLastVer
(
pRead
->
pWal
);
int64_t
committedVer
=
walGetCommittedVer
(
pRead
->
pWal
);
int64_t
appliedVer
=
walGetAppliedVer
(
pRead
->
pWal
);
int64_t
endVer
=
pRead
->
cond
.
scanUncommited
?
lastVer
:
committedVer
;
int32_t
walNextValidMsg
(
SWalReader
*
pRead
er
)
{
int64_t
fetchVer
=
pRead
er
->
curVersion
;
int64_t
lastVer
=
walGetLastVer
(
pRead
er
->
pWal
);
int64_t
committedVer
=
walGetCommittedVer
(
pRead
er
->
pWal
);
int64_t
appliedVer
=
walGetAppliedVer
(
pRead
er
->
pWal
);
int64_t
endVer
=
pRead
er
->
cond
.
scanUncommited
?
lastVer
:
committedVer
;
endVer
=
TMIN
(
appliedVer
,
endVer
);
wDebug
(
"vgId:%d wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld"
,
pRead
->
pWal
->
cfg
.
vgId
,
fetchVer
,
lastVer
,
committedVer
,
appliedVer
,
endVer
);
pRead
->
curStopped
=
0
;
pRead
er
->
pWal
->
cfg
.
vgId
,
fetchVer
,
lastVer
,
committedVer
,
appliedVer
,
endVer
);
pRead
er
->
curStopped
=
0
;
while
(
fetchVer
<=
endVer
)
{
if
(
walFetchHeadNew
(
pRead
,
fetchVer
)
<
0
)
{
if
(
walFetchHeadNew
(
pRead
er
,
fetchVer
)
<
0
)
{
return
-
1
;
}
if
(
pRead
->
pHead
->
head
.
msgType
==
TDMT_VND_SUBMIT
||
(
IS_META_MSG
(
pRead
->
pHead
->
head
.
msgType
)
&&
pRead
->
cond
.
scanMeta
))
{
if
(
walFetchBodyNew
(
pRead
)
<
0
)
{
if
(
pRead
er
->
pHead
->
head
.
msgType
==
TDMT_VND_SUBMIT
||
(
IS_META_MSG
(
pRead
er
->
pHead
->
head
.
msgType
)
&&
pReader
->
cond
.
scanMeta
))
{
if
(
walFetchBodyNew
(
pRead
er
)
<
0
)
{
return
-
1
;
}
return
0
;
}
else
{
if
(
walSkipFetchBodyNew
(
pRead
)
<
0
)
{
if
(
walSkipFetchBodyNew
(
pRead
er
)
<
0
)
{
return
-
1
;
}
fetchVer
++
;
ASSERT
(
fetchVer
==
pRead
->
curVersion
);
ASSERT
(
fetchVer
==
pRead
er
->
curVersion
);
}
}
pRead
->
curStopped
=
1
;
pRead
er
->
curStopped
=
1
;
return
-
1
;
}
static
int64_t
walReadSeekFilePos
(
SWalReader
*
pRead
,
int64_t
fileFirstVer
,
int64_t
ver
)
{
static
int64_t
walReadSeekFilePos
(
SWalReader
*
pRead
er
,
int64_t
fileFirstVer
,
int64_t
ver
)
{
int64_t
ret
=
0
;
TdFilePtr
pIdxTFile
=
pRead
->
pIdxFile
;
TdFilePtr
pLogTFile
=
pRead
->
pLogFile
;
TdFilePtr
pIdxTFile
=
pRead
er
->
pIdxFile
;
TdFilePtr
pLogTFile
=
pRead
er
->
pLogFile
;
// seek position
int64_t
offset
=
(
ver
-
fileFirstVer
)
*
sizeof
(
SWalIdxEntry
);
ret
=
taosLSeekFile
(
pIdxTFile
,
offset
,
SEEK_SET
);
if
(
ret
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, failed to seek idx file, index:%"
PRId64
", pos:%"
PRId64
", since %s"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
offset
,
terrstr
());
wError
(
"vgId:%d, failed to seek idx file, index:%"
PRId64
", pos:%"
PRId64
", since %s"
,
pRead
er
->
pWal
->
cfg
.
vgId
,
ver
,
offset
,
terrstr
());
return
-
1
;
}
SWalIdxEntry
entry
=
{
0
};
if
((
ret
=
taosReadFile
(
pIdxTFile
,
&
entry
,
sizeof
(
SWalIdxEntry
)))
!=
sizeof
(
SWalIdxEntry
))
{
if
(
ret
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, failed to read idx file, since %s"
,
pRead
->
pWal
->
cfg
.
vgId
,
terrstr
());
wError
(
"vgId:%d, failed to read idx file, since %s"
,
pRead
er
->
pWal
->
cfg
.
vgId
,
terrstr
());
}
else
{
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
wError
(
"vgId:%d, read idx file incompletely, read bytes %"
PRId64
", bytes should be %"
PRIu64
,
pRead
->
pWal
->
cfg
.
vgId
,
ret
,
sizeof
(
SWalIdxEntry
));
pRead
er
->
pWal
->
cfg
.
vgId
,
ret
,
sizeof
(
SWalIdxEntry
));
}
return
-
1
;
}
...
...
@@ -130,79 +135,79 @@ static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64
ret
=
taosLSeekFile
(
pLogTFile
,
entry
.
offset
,
SEEK_SET
);
if
(
ret
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, failed to seek log file, index:%"
PRId64
", pos:%"
PRId64
", since %s"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
entry
.
offset
,
terrstr
());
wError
(
"vgId:%d, failed to seek log file, index:%"
PRId64
", pos:%"
PRId64
", since %s"
,
pRead
er
->
pWal
->
cfg
.
vgId
,
ver
,
entry
.
offset
,
terrstr
());
return
-
1
;
}
return
ret
;
}
static
int32_t
walReadChangeFile
(
SWalReader
*
pRead
,
int64_t
fileFirstVer
)
{
static
int32_t
walReadChangeFile
(
SWalReader
*
pRead
er
,
int64_t
fileFirstVer
)
{
char
fnameStr
[
WAL_FILE_LEN
];
taosCloseFile
(
&
pRead
->
pIdxFile
);
taosCloseFile
(
&
pRead
->
pLogFile
);
taosCloseFile
(
&
pRead
er
->
pIdxFile
);
taosCloseFile
(
&
pRead
er
->
pLogFile
);
walBuildLogName
(
pRead
->
pWal
,
fileFirstVer
,
fnameStr
);
TdFilePtr
pLog
T
File
=
taosOpenFile
(
fnameStr
,
TD_FILE_READ
);
if
(
pLog
T
File
==
NULL
)
{
walBuildLogName
(
pRead
er
->
pWal
,
fileFirstVer
,
fnameStr
);
TdFilePtr
pLogFile
=
taosOpenFile
(
fnameStr
,
TD_FILE_READ
);
if
(
pLogFile
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, cannot open file %s, since %s"
,
pRead
->
pWal
->
cfg
.
vgId
,
fnameStr
,
terrstr
());
wError
(
"vgId:%d, cannot open file %s, since %s"
,
pRead
er
->
pWal
->
cfg
.
vgId
,
fnameStr
,
terrstr
());
return
-
1
;
}
pRead
->
pLogFile
=
pLogT
File
;
pRead
er
->
pLogFile
=
pLog
File
;
walBuildIdxName
(
pRead
->
pWal
,
fileFirstVer
,
fnameStr
);
TdFilePtr
pIdx
T
File
=
taosOpenFile
(
fnameStr
,
TD_FILE_READ
);
if
(
pIdx
T
File
==
NULL
)
{
walBuildIdxName
(
pRead
er
->
pWal
,
fileFirstVer
,
fnameStr
);
TdFilePtr
pIdxFile
=
taosOpenFile
(
fnameStr
,
TD_FILE_READ
);
if
(
pIdxFile
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, cannot open file %s, since %s"
,
pRead
->
pWal
->
cfg
.
vgId
,
fnameStr
,
terrstr
());
wError
(
"vgId:%d, cannot open file %s, since %s"
,
pRead
er
->
pWal
->
cfg
.
vgId
,
fnameStr
,
terrstr
());
return
-
1
;
}
pRead
->
pIdxFile
=
pIdxT
File
;
pRead
er
->
pIdxFile
=
pIdx
File
;
return
0
;
}
int32_t
walReadSeekVerImpl
(
SWalReader
*
pRead
,
int64_t
ver
)
{
SWal
*
pWal
=
pRead
->
pWal
;
int32_t
walReadSeekVerImpl
(
SWalReader
*
pRead
er
,
int64_t
ver
)
{
SWal
*
pWal
=
pRead
er
->
pWal
;
// bsearch in fileSet
SWalFileInfo
tmpInfo
;
tmpInfo
.
firstVer
=
ver
;
// bsearch in fileSet
SWalFileInfo
*
pRet
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmpInfo
,
compareWalFileInfo
,
TD_LE
);
ASSERT
(
pRet
!=
NULL
);
if
(
pRead
->
curFileFirstVer
!=
pRet
->
firstVer
)
{
if
(
pRead
er
->
curFileFirstVer
!=
pRet
->
firstVer
)
{
// error code was set inner
if
(
walReadChangeFile
(
pRead
,
pRet
->
firstVer
)
<
0
)
{
if
(
walReadChangeFile
(
pRead
er
,
pRet
->
firstVer
)
<
0
)
{
return
-
1
;
}
}
// error code was set inner
if
(
walReadSeekFilePos
(
pRead
,
pRet
->
firstVer
,
ver
)
<
0
)
{
if
(
walReadSeekFilePos
(
pRead
er
,
pRet
->
firstVer
,
ver
)
<
0
)
{
return
-
1
;
}
wDebug
(
"wal version reset from %ld(invalid: %d) to %ld"
,
pRead
->
curVersion
,
pRead
->
curInvalid
,
ver
);
wDebug
(
"wal version reset from %ld(invalid: %d) to %ld"
,
pRead
er
->
curVersion
,
pReader
->
curInvalid
,
ver
);
pRead
->
curVersion
=
ver
;
pRead
er
->
curVersion
=
ver
;
return
0
;
}
int32_t
walReadSeekVer
(
SWalReader
*
pRead
,
int64_t
ver
)
{
SWal
*
pWal
=
pRead
->
pWal
;
if
(
!
pRead
->
curInvalid
&&
ver
==
pRead
->
curVersion
)
{
int32_t
walReadSeekVer
(
SWalReader
*
pRead
er
,
int64_t
ver
)
{
SWal
*
pWal
=
pRead
er
->
pWal
;
if
(
!
pRead
er
->
curInvalid
&&
ver
==
pReader
->
curVersion
)
{
wDebug
(
"wal version %ld match, no need to reset"
,
ver
);
return
0
;
}
pRead
->
curInvalid
=
1
;
pRead
->
curVersion
=
ver
;
pRead
er
->
curInvalid
=
1
;
pRead
er
->
curVersion
=
ver
;
if
(
ver
>
pWal
->
vers
.
lastVer
||
ver
<
pWal
->
vers
.
firstVer
)
{
wDebug
(
"vgId:%d, invalid index:%"
PRId64
", first index:%"
PRId64
", last index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
wDebug
(
"vgId:%d, invalid index:%"
PRId64
", first index:%"
PRId64
", last index:%"
PRId64
,
pRead
er
->
pWal
->
cfg
.
vgId
,
ver
,
pWal
->
vers
.
firstVer
,
pWal
->
vers
.
lastVer
);
terrno
=
TSDB_CODE_WAL_LOG_NOT_EXIST
;
return
-
1
;
...
...
@@ -210,7 +215,7 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
if
(
ver
<
pWal
->
vers
.
snapshotVer
)
{
}
if
(
walReadSeekVerImpl
(
pRead
,
ver
)
<
0
)
{
if
(
walReadSeekVerImpl
(
pRead
er
,
ver
)
<
0
)
{
return
-
1
;
}
...
...
source/libs/wal/src/walRef.c
0 → 100644
浏览文件 @
352965f7
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cJSON.h"
#include "os.h"
#include "taoserror.h"
#include "tutil.h"
#include "walInt.h"
SWalRef
*
walOpenRef
(
SWal
*
pWal
)
{
SWalRef
*
pRef
=
taosMemoryCalloc
(
1
,
sizeof
(
SWalRef
));
if
(
pRef
==
NULL
)
{
return
NULL
;
}
pRef
->
refId
=
tGenIdPI64
();
pRef
->
refVer
=
-
1
;
pRef
->
refFile
=
-
1
;
pRef
->
pWal
=
pWal
;
taosHashPut
(
pWal
->
pRefHash
,
&
pRef
->
refId
,
sizeof
(
int64_t
),
&
pRef
,
sizeof
(
void
*
));
return
pRef
;
}
void
walCloseRef
(
SWal
*
pWal
,
int64_t
refId
)
{
SWalRef
*
pRef
=
*
(
SWalRef
**
)
taosHashGet
(
pWal
->
pRefHash
,
&
refId
,
sizeof
(
int64_t
));
taosHashRemove
(
pWal
->
pRefHash
,
&
refId
,
sizeof
(
int64_t
));
taosMemoryFree
(
pRef
);
}
int32_t
walRefVer
(
SWalRef
*
pRef
,
int64_t
ver
)
{
SWal
*
pWal
=
pRef
->
pWal
;
if
(
pRef
->
refVer
!=
ver
)
{
taosThreadMutexLock
(
&
pWal
->
mutex
);
if
(
ver
<
pWal
->
vers
.
firstVer
||
ver
>
pWal
->
vers
.
lastVer
)
{
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
terrno
=
TSDB_CODE_WAL_INVALID_VER
;
return
-
1
;
}
pRef
->
refVer
=
ver
;
// bsearch in fileSet
SWalFileInfo
tmpInfo
;
tmpInfo
.
firstVer
=
ver
;
SWalFileInfo
*
pRet
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmpInfo
,
compareWalFileInfo
,
TD_LE
);
ASSERT
(
pRet
!=
NULL
);
pRef
->
refFile
=
pRet
->
firstVer
;
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
}
return
0
;
}
void
walUnrefVer
(
SWalRef
*
pRef
)
{
pRef
->
refId
=
-
1
;
pRef
->
refFile
=
-
1
;
}
SWalRef
*
walRefCommittedVer
(
SWal
*
pWal
)
{
SWalRef
*
pRef
=
walOpenRef
(
pWal
);
if
(
pRef
==
NULL
)
{
return
NULL
;
}
taosThreadMutexLock
(
&
pWal
->
mutex
);
int64_t
ver
=
walGetCommittedVer
(
pWal
);
pRef
->
refVer
=
ver
;
// bsearch in fileSet
SWalFileInfo
tmpInfo
;
tmpInfo
.
firstVer
=
ver
;
SWalFileInfo
*
pRet
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmpInfo
,
compareWalFileInfo
,
TD_LE
);
ASSERT
(
pRet
!=
NULL
);
pRef
->
refFile
=
pRet
->
firstVer
;
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
pRef
;
}
source/libs/wal/src/walWrite.c
浏览文件 @
352965f7
...
...
@@ -26,7 +26,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
pIter
=
taosHashIterate
(
pWal
->
pRefHash
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SWalRef
*
pRef
=
(
SWalRef
*
)
pIter
;
if
(
pRef
->
v
er
!=
-
1
)
{
if
(
pRef
->
refV
er
!=
-
1
)
{
taosHashCancelIterate
(
pWal
->
pRefHash
,
pIter
);
return
-
1
;
}
...
...
@@ -215,22 +215,23 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
static
FORCE_INLINE
int32_t
walCheckAndRoll
(
SWal
*
pWal
)
{
if
(
taosArrayGetSize
(
pWal
->
fileInfoSet
)
==
0
)
{
/*pWal->vers.firstVer = index;*/
if
(
walRollImpl
(
pWal
)
<
0
)
{
return
-
1
;
}
}
else
{
int64_t
passed
=
walGetSeq
()
-
pWal
->
lastRollSeq
;
if
(
pWal
->
cfg
.
rollPeriod
!=
-
1
&&
pWal
->
cfg
.
rollPeriod
!=
0
&&
passed
>
pWal
->
cfg
.
rollPeriod
)
{
if
(
walRollImpl
(
pWal
)
<
0
)
{
return
-
1
;
}
}
else
if
(
pWal
->
cfg
.
segSize
!=
-
1
&&
pWal
->
cfg
.
segSize
!=
0
&&
walGetLastFileSize
(
pWal
)
>
pWal
->
cfg
.
segSize
)
{
if
(
walRollImpl
(
pWal
)
<
0
)
{
return
-
1
;
}
return
0
;
}
int64_t
passed
=
walGetSeq
()
-
pWal
->
lastRollSeq
;
if
(
pWal
->
cfg
.
rollPeriod
!=
-
1
&&
pWal
->
cfg
.
rollPeriod
!=
0
&&
passed
>
pWal
->
cfg
.
rollPeriod
)
{
if
(
walRollImpl
(
pWal
)
<
0
)
{
return
-
1
;
}
}
else
if
(
pWal
->
cfg
.
segSize
!=
-
1
&&
pWal
->
cfg
.
segSize
!=
0
&&
walGetLastFileSize
(
pWal
)
>
pWal
->
cfg
.
segSize
)
{
if
(
walRollImpl
(
pWal
)
<
0
)
{
return
-
1
;
}
}
return
0
;
}
...
...
@@ -260,6 +261,16 @@ int32_t walEndSnapshot(SWal *pWal) {
pWal
->
vers
.
snapshotVer
=
ver
;
int
ts
=
taosGetTimestampSec
();
int64_t
minVerToDelete
=
ver
;
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pWal
->
pRefHash
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SWalRef
*
pRef
=
*
(
SWalRef
**
)
pIter
;
if
(
pRef
->
refVer
==
-
1
)
continue
;
minVerToDelete
=
TMIN
(
minVerToDelete
,
pRef
->
refVer
);
}
int
deleteCnt
=
0
;
int64_t
newTotSize
=
pWal
->
totSize
;
SWalFileInfo
tmp
;
...
...
source/util/src/terror.c
浏览文件 @
352965f7
...
...
@@ -512,7 +512,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_OFFSET_UNIT, "Cannot use 'year' as
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG
,
"Interval offset should be shorter than interval"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INTER_SLIDING_UNIT
,
"Does not support sliding when interval is natural month/year"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG
,
"sliding value no larger than the interval value"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL
,
"sliding value can not less than 1% of interval value"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL
,
"sliding value can not less than 1%
%
of interval value"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_ONLY_ONE_JSON_TAG
,
"Only one tag if there is a json tag"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INCORRECT_NUM_OF_COL
,
"Query block has incorrect number of result columns"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL
,
"Incorrect TIMESTAMP value"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录