Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5d98a319
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
5d98a319
编写于
6月 01, 2023
作者:
H
Haojun Liao
提交者:
GitHub
6月 01, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #21515 from taosdata/feature/TD-19042
feat: support create topic as stable with conditions
上级
46f731e4
e4866d18
变更
28
展开全部
隐藏空白更改
内联
并排
Showing
28 changed file
with
2599 addition
and
983 deletion
+2599
-983
include/common/tmsg.h
include/common/tmsg.h
+4
-4
include/common/ttokendef.h
include/common/ttokendef.h
+1
-0
include/libs/executor/executor.h
include/libs/executor/executor.h
+2
-0
include/libs/nodes/cmdnodes.h
include/libs/nodes/cmdnodes.h
+1
-0
include/libs/qcom/query.h
include/libs/qcom/query.h
+7
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+13
-7
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+17
-1
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+2
-2
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+8
-0
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+2
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+24
-12
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+19
-10
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+8
-27
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+17
-1
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+1
-0
source/libs/parser/inc/parAst.h
source/libs/parser/inc/parAst.h
+1
-1
source/libs/parser/inc/parUtil.h
source/libs/parser/inc/parUtil.h
+1
-0
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+2
-2
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+5
-10
source/libs/parser/src/parAstParser.c
source/libs/parser/src/parAstParser.c
+5
-0
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+108
-2
source/libs/parser/src/parUtil.c
source/libs/parser/src/parUtil.c
+16
-0
source/libs/parser/src/sql.c
source/libs/parser/src/sql.c
+2075
-904
source/libs/parser/test/parInitialCTest.cpp
source/libs/parser/test/parInitialCTest.cpp
+9
-0
source/libs/qcom/src/queryUtil.c
source/libs/qcom/src/queryUtil.c
+12
-0
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+1
-0
tests/script/tsim/tmq/topic.sim
tests/script/tsim/tmq/topic.sim
+11
-0
tests/system-test/7-tmq/stbFilterWhere.py
tests/system-test/7-tmq/stbFilterWhere.py
+227
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
5d98a319
...
...
@@ -2009,10 +2009,8 @@ typedef struct {
int8_t
withMeta
;
char
*
sql
;
char
subDbName
[
TSDB_DB_FNAME_LEN
];
union
{
char
*
ast
;
char
subStbName
[
TSDB_TABLE_FNAME_LEN
];
};
char
*
ast
;
char
subStbName
[
TSDB_TABLE_FNAME_LEN
];
}
SCMCreateTopicReq
;
int32_t
tSerializeSCMCreateTopicReq
(
void
*
buf
,
int32_t
bufLen
,
const
SCMCreateTopicReq
*
pReq
);
...
...
@@ -2822,6 +2820,7 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR
tlen
+=
taosEncodeString
(
buf
,
pReq
->
qmsg
);
}
else
if
(
pReq
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
suid
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
qmsg
);
}
return
tlen
;
}
...
...
@@ -2838,6 +2837,7 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
buf
=
taosDecodeString
(
buf
,
&
pReq
->
qmsg
);
}
else
if
(
pReq
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
suid
);
buf
=
taosDecodeString
(
buf
,
&
pReq
->
qmsg
);
}
return
(
void
*
)
buf
;
}
...
...
include/common/ttokendef.h
浏览文件 @
5d98a319
...
...
@@ -354,6 +354,7 @@
#define TK_WAL 336
#define TK_NK_SPACE 600
#define TK_NK_COMMENT 601
#define TK_NK_ILLEGAL 602
...
...
include/libs/executor/executor.h
浏览文件 @
5d98a319
...
...
@@ -82,6 +82,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
pReaderHandle
,
int32_t
vgId
,
int32_t
*
numOfCols
,
uint64_t
id
);
int32_t
qGetTableList
(
int64_t
suid
,
void
*
pVnode
,
void
*
node
,
SArray
**
tableList
,
void
*
pTaskInfo
);
/**
* set the task Id, usually used by message queue process
* @param tinfo
...
...
include/libs/nodes/cmdnodes.h
浏览文件 @
5d98a319
...
...
@@ -364,6 +364,7 @@ typedef struct SCreateTopicStmt {
bool
ignoreExists
;
bool
withMeta
;
SNode
*
pQuery
;
SNode
*
pWhere
;
}
SCreateTopicStmt
;
typedef
struct
SDropTopicStmt
{
...
...
include/libs/qcom/query.h
浏览文件 @
5d98a319
...
...
@@ -51,6 +51,12 @@ typedef enum {
TARGET_TYPE_OTHER
,
}
ETargetType
;
typedef
enum
{
TCOL_TYPE_COLUMN
=
1
,
TCOL_TYPE_TAG
,
TCOL_TYPE_NONE
,
}
ETableColumnType
;
#define QUERY_POLICY_VNODE 1
#define QUERY_POLICY_HYBRID 2
#define QUERY_POLICY_QNODE 3
...
...
@@ -253,6 +259,7 @@ void destroyQueryExecRes(SExecResult* pRes);
int32_t
dataConverToStr
(
char
*
str
,
int
type
,
void
*
buf
,
int32_t
bufSize
,
int32_t
*
len
);
char
*
parseTagDatatoJson
(
void
*
p
);
int32_t
cloneTableMeta
(
STableMeta
*
pSrc
,
STableMeta
**
pDst
);
void
getColumnTypeFromMeta
(
STableMeta
*
pMeta
,
char
*
pName
,
ETableColumnType
*
pType
);
int32_t
cloneDbVgInfo
(
SDBVgInfo
*
pSrc
,
SDBVgInfo
**
pDst
);
int32_t
cloneSVreateTbReq
(
SVCreateTbReq
*
pSrc
,
SVCreateTbReq
**
pDst
);
void
freeVgInfo
(
SDBVgInfo
*
vgInfo
);
...
...
source/common/src/tmsg.c
浏览文件 @
5d98a319
...
...
@@ -4002,11 +4002,16 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo
if
(
tEncodeI8
(
&
encoder
,
pReq
->
withMeta
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
subDbName
)
<
0
)
return
-
1
;
if
(
TOPIC_SUB_TYPE__DB
==
pReq
->
subType
)
{
}
else
if
(
TOPIC_SUB_TYPE__TABLE
==
pReq
->
subType
)
{
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
subStbName
)
<
0
)
return
-
1
;
}
else
{
if
(
tEncodeI32
(
&
encoder
,
strlen
(
pReq
->
ast
))
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
ast
)
<
0
)
return
-
1
;
if
(
TOPIC_SUB_TYPE__TABLE
==
pReq
->
subType
)
{
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
subStbName
)
<
0
)
return
-
1
;
}
if
(
pReq
->
ast
&&
strlen
(
pReq
->
ast
)
>
0
)
{
if
(
tEncodeI32
(
&
encoder
,
strlen
(
pReq
->
ast
))
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
ast
)
<
0
)
return
-
1
;
}
else
{
if
(
tEncodeI32
(
&
encoder
,
0
)
<
0
)
return
-
1
;
}
}
if
(
tEncodeI32
(
&
encoder
,
strlen
(
pReq
->
sql
))
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
sql
)
<
0
)
return
-
1
;
...
...
@@ -4032,9 +4037,10 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
withMeta
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
subDbName
)
<
0
)
return
-
1
;
if
(
TOPIC_SUB_TYPE__DB
==
pReq
->
subType
)
{
}
else
if
(
TOPIC_SUB_TYPE__TABLE
==
pReq
->
subType
)
{
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
subStbName
)
<
0
)
return
-
1
;
}
else
{
if
(
TOPIC_SUB_TYPE__TABLE
==
pReq
->
subType
)
{
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
subStbName
)
<
0
)
return
-
1
;
}
if
(
tDecodeI32
(
&
decoder
,
&
astLen
)
<
0
)
return
-
1
;
if
(
astLen
>
0
)
{
pReq
->
ast
=
taosMemoryCalloc
(
1
,
astLen
+
1
);
...
...
@@ -4057,7 +4063,7 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
void
tFreeSCMCreateTopicReq
(
SCMCreateTopicReq
*
pReq
)
{
taosMemoryFreeClear
(
pReq
->
sql
);
if
(
TOPIC_SUB_TYPE__
COLUMN
=
=
pReq
->
subType
)
{
if
(
TOPIC_SUB_TYPE__
DB
!
=
pReq
->
subType
)
{
taosMemoryFreeClear
(
pReq
->
ast
);
}
}
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
5d98a319
...
...
@@ -513,7 +513,23 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
}
else
if
(
pTopic
->
subType
==
TOPIC_SUB_TYPE__TABLE
&&
pTopic
->
ast
!=
NULL
){
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
pTopic
->
ast
,
&
pAst
)
!=
0
)
{
mError
(
"topic:%s, failed to create since %s"
,
pTopic
->
name
,
terrstr
());
return
-
1
;
}
SPlanContext
cxt
=
{.
pAstRoot
=
pAst
,
.
topicQuery
=
true
};
if
(
qCreateQueryPlan
(
&
cxt
,
&
pPlan
,
NULL
)
!=
0
)
{
mError
(
"failed to create topic:%s since %s"
,
pTopic
->
name
,
terrstr
());
nodesDestroyNode
(
pAst
);
return
-
1
;
}
nodesDestroyNode
(
pAst
);
}
if
(
pPlan
){
int32_t
levelNum
=
LIST_LENGTH
(
pPlan
->
pSubplans
);
if
(
levelNum
!=
1
)
{
qDestroyQueryPlan
(
pPlan
);
...
...
@@ -554,7 +570,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
mDebug
(
"init subscription %s for topic:%s assign vgId:%d"
,
pSub
->
key
,
pTopic
->
name
,
pVgEp
->
vgId
);
if
(
p
Topic
->
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
if
(
p
Subplan
)
{
int32_t
msgLen
;
pSubplan
->
execNode
.
epSet
=
pVgEp
->
epSet
;
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
5d98a319
...
...
@@ -1230,7 +1230,7 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
mInfo
(
"topic:%s, check tag and column modifiable, stb:%s suid:%"
PRId64
" colId:%d, subType:%d sql:%s"
,
pTopic
->
name
,
stbFullName
,
suid
,
colId
,
pTopic
->
subType
,
pTopic
->
sql
);
if
(
pTopic
->
subType
!=
TOPIC_SUB_TYPE__COLUMN
)
{
if
(
pTopic
->
ast
==
NULL
)
{
sdbRelease
(
pSdb
,
pTopic
);
continue
;
}
...
...
@@ -2272,7 +2272,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
}
}
if
(
pTopic
->
subType
!=
TOPIC_SUB_TYPE__COLUMN
)
{
if
(
pTopic
->
ast
==
NULL
)
{
sdbRelease
(
pSdb
,
pTopic
);
continue
;
}
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
5d98a319
...
...
@@ -422,6 +422,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
mError
(
"failed to create topic:%s since %s"
,
pCreate
->
name
,
terrstr
());
taosMemoryFree
(
topicObj
.
ast
);
taosMemoryFree
(
topicObj
.
sql
);
nodesDestroyNode
(
pAst
);
return
-
1
;
}
...
...
@@ -429,6 +430,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
if
(
topicObj
.
ntbColIds
==
NULL
)
{
taosMemoryFree
(
topicObj
.
ast
);
taosMemoryFree
(
topicObj
.
sql
);
nodesDestroyNode
(
pAst
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
...
...
@@ -444,6 +446,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
taosMemoryFree
(
topicObj
.
ast
);
taosMemoryFree
(
topicObj
.
sql
);
nodesDestroyNode
(
pAst
);
return
-
1
;
}
...
...
@@ -465,6 +468,11 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
strcpy
(
topicObj
.
stbName
,
pCreate
->
subStbName
);
topicObj
.
stbUid
=
pStb
->
uid
;
mndReleaseStb
(
pMnode
,
pStb
);
if
(
pCreate
->
ast
!=
NULL
){
qDebugL
(
"topic:%s ast %s"
,
topicObj
.
name
,
pCreate
->
ast
);
topicObj
.
ast
=
taosStrdup
(
pCreate
->
ast
);
topicObj
.
astLen
=
strlen
(
pCreate
->
ast
)
+
1
;
}
}
/*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
/*topicObj.ast = NULL;*/
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
5d98a319
...
...
@@ -72,6 +72,8 @@ typedef struct {
typedef
struct
{
int64_t
suid
;
char
*
qmsg
;
// SubPlanToString
SNode
*
node
;
}
STqExecTb
;
typedef
struct
{
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
5d98a319
...
...
@@ -75,6 +75,8 @@ static void destroyTqHandle(void* data) {
}
else
if
(
pData
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
walCloseReader
(
pData
->
pWalReader
);
tqReaderClose
(
pData
->
execHandle
.
pTqReader
);
taosMemoryFreeClear
(
pData
->
execHandle
.
execTb
.
qmsg
);
nodesDestroyNode
(
pData
->
execHandle
.
execTb
.
node
);
}
if
(
pData
->
msg
!=
NULL
)
{
rpcFreeCont
(
pData
->
msg
->
pCont
);
...
...
@@ -655,7 +657,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
STqHandle
tqHandle
=
{
0
};
pHandle
=
&
tqHandle
;
uint64_t
oldConsumerId
=
pHandle
->
consumerId
;
memcpy
(
pHandle
->
subKey
,
req
.
subKey
,
TSDB_SUBSCRIBE_KEY_LEN
);
pHandle
->
consumerId
=
req
.
newConsumerId
;
pHandle
->
epoch
=
-
1
;
...
...
@@ -700,26 +701,37 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
}
else
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
pHandle
->
pWalReader
=
walOpenReader
(
pVnode
->
pWal
,
NULL
);
pHandle
->
execHandle
.
execTb
.
suid
=
req
.
suid
;
pHandle
->
execHandle
.
execTb
.
qmsg
=
req
.
qmsg
;
req
.
qmsg
=
NULL
;
SArray
*
tbUidList
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
vnodeGetCtbIdList
(
pVnode
,
req
.
suid
,
tbUidList
);
tqDebug
(
"vgId:%d, tq try to get all ctb, suid:%"
PRId64
,
pVnode
->
config
.
vgId
,
req
.
suid
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tbUidList
);
i
++
)
{
int64_t
tbUid
=
*
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
)
;
tqDebug
(
"vgId:%d, idx %d, uid:%"
PRId64
,
vgId
,
i
,
tbUid
);
if
(
strcmp
(
pHandle
->
execHandle
.
execTb
.
qmsg
,
""
)
!=
0
)
{
if
(
nodesStringToNode
(
pHandle
->
execHandle
.
execTb
.
qmsg
,
&
pHandle
->
execHandle
.
execTb
.
node
)
!=
0
)
{
tqError
(
"nodesStringToNode error in sub stable, since %s, vgId:%d, subkey:%s consumer:0x%"
PRIx64
,
terrstr
(),
pVnode
->
config
.
vgId
,
req
.
subKey
,
pHandle
->
consumerId
);
return
-
1
;
}
}
pHandle
->
execHandle
.
pTqReader
=
tqReaderOpen
(
pVnode
);
tqReaderSetTbUidList
(
pHandle
->
execHandle
.
pTqReader
,
tbUidList
);
taosArrayDestroy
(
tbUidList
);
buildSnapContext
(
handle
.
vnode
,
handle
.
version
,
req
.
suid
,
pHandle
->
execHandle
.
subType
,
pHandle
->
fetchMeta
,
(
SSnapContext
**
)(
&
handle
.
sContext
));
pHandle
->
execHandle
.
task
=
qCreateQueueExecTaskInfo
(
NULL
,
&
handle
,
vgId
,
NULL
,
req
.
newConsumerId
);
SArray
*
tbUidList
=
NULL
;
ret
=
qGetTableList
(
req
.
suid
,
pVnode
,
pHandle
->
execHandle
.
execTb
.
node
,
&
tbUidList
,
pHandle
->
execHandle
.
task
);
if
(
ret
!=
TDB_CODE_SUCCESS
)
{
tqError
(
"qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%"
PRIx64
,
ret
,
pVnode
->
config
.
vgId
,
req
.
subKey
,
pHandle
->
consumerId
);
taosArrayDestroy
(
tbUidList
);
goto
end
;
}
tqDebug
(
"tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%"
PRIx64
" suid:%"
PRId64
,
pVnode
->
config
.
vgId
,
req
.
subKey
,
pHandle
->
consumerId
,
req
.
suid
);
pHandle
->
execHandle
.
pTqReader
=
tqReaderOpen
(
pVnode
);
tqReaderSetTbUidList
(
pHandle
->
execHandle
.
pTqReader
,
tbUidList
);
taosArrayDestroy
(
tbUidList
);
}
taosHashPut
(
pTq
->
pHandle
,
req
.
subKey
,
strlen
(
req
.
subKey
),
pHandle
,
sizeof
(
STqHandle
));
tqDebug
(
"try to persist handle %s consumer:0x%"
PRIx64
" , old consumer:0x%"
PRIx64
,
req
.
subKey
,
pHandle
->
consumerId
,
oldConsumerId
);
tqDebug
(
"try to persist handle %s consumer:0x%"
PRIx64
,
req
.
subKey
,
pHandle
->
consumerId
);
ret
=
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
);
goto
end
;
}
else
{
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
5d98a319
...
...
@@ -37,6 +37,7 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
}
}
else
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
if
(
tEncodeI64
(
pEncoder
,
pHandle
->
execHandle
.
execTb
.
suid
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pHandle
->
execHandle
.
execTb
.
qmsg
)
<
0
)
return
-
1
;
}
tEndEncode
(
pEncoder
);
return
pEncoder
->
pos
;
...
...
@@ -64,6 +65,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
}
}
else
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
if
(
tDecodeI64
(
pDecoder
,
&
pHandle
->
execHandle
.
execTb
.
suid
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pHandle
->
execHandle
.
execTb
.
qmsg
)
<
0
)
return
-
1
;
}
tEndDecode
(
pDecoder
);
return
0
;
...
...
@@ -337,20 +339,27 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
}
else
if
(
handle
.
execHandle
.
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
handle
.
pWalReader
=
walOpenReader
(
pTq
->
pVnode
->
pWal
,
NULL
);
SArray
*
tbUidList
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
vnodeGetCtbIdList
(
pTq
->
pVnode
,
handle
.
execHandle
.
execTb
.
suid
,
tbUidList
);
tqDebug
(
"vgId:%d, tq try to get all ctb, suid:%"
PRId64
,
pTq
->
pVnode
->
config
.
vgId
,
handle
.
execHandle
.
execTb
.
suid
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tbUidList
);
i
++
)
{
int64_t
tbUid
=
*
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
tqDebug
(
"vgId:%d, idx %d, uid:%"
PRId64
,
vgId
,
i
,
tbUid
);
if
(
strcmp
(
handle
.
execHandle
.
execTb
.
qmsg
,
""
)
!=
0
)
{
if
(
nodesStringToNode
(
handle
.
execHandle
.
execTb
.
qmsg
,
&
handle
.
execHandle
.
execTb
.
node
)
!=
0
)
{
tqError
(
"nodesStringToNode error in sub stable, since %s"
,
terrstr
());
return
-
1
;
}
}
handle
.
execHandle
.
pTqReader
=
tqReaderOpen
(
pTq
->
pVnode
);
tqReaderSetTbUidList
(
handle
.
execHandle
.
pTqReader
,
tbUidList
);
taosArrayDestroy
(
tbUidList
);
buildSnapContext
(
reader
.
vnode
,
reader
.
version
,
handle
.
execHandle
.
execTb
.
suid
,
handle
.
execHandle
.
subType
,
handle
.
fetchMeta
,
(
SSnapContext
**
)(
&
reader
.
sContext
));
handle
.
execHandle
.
task
=
qCreateQueueExecTaskInfo
(
NULL
,
&
reader
,
vgId
,
NULL
,
0
);
SArray
*
tbUidList
=
NULL
;
int
ret
=
qGetTableList
(
handle
.
execHandle
.
execTb
.
suid
,
pTq
->
pVnode
,
handle
.
execHandle
.
execTb
.
node
,
&
tbUidList
,
handle
.
execHandle
.
task
);
if
(
ret
!=
TDB_CODE_SUCCESS
)
{
tqError
(
"qGetTableList error:%d handle %s consumer:0x%"
PRIx64
,
ret
,
handle
.
subKey
,
handle
.
consumerId
);
taosArrayDestroy
(
tbUidList
);
goto
end
;
}
tqDebug
(
"vgId:%d, tq try to get ctb for stb subscribe, suid:%"
PRId64
,
pTq
->
pVnode
->
config
.
vgId
,
handle
.
execHandle
.
execTb
.
suid
);
handle
.
execHandle
.
pTqReader
=
tqReaderOpen
(
pTq
->
pVnode
);
tqReaderSetTbUidList
(
handle
.
execHandle
.
pTqReader
,
tbUidList
);
taosArrayDestroy
(
tbUidList
);
}
tqDebug
(
"tq restore %s consumer %"
PRId64
" vgId:%d"
,
handle
.
subKey
,
handle
.
consumerId
,
vgId
);
taosWLockLatch
(
&
pTq
->
lock
);
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
5d98a319
...
...
@@ -1083,34 +1083,15 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
}
else
if
(
pTqHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
if
(
isAdd
)
{
SArray
*
qa
=
taosArrayInit
(
4
,
sizeof
(
tb_uid_t
));
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pTq
->
pVnode
->
pMeta
,
0
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tbUidList
);
++
i
)
{
uint64_t
*
id
=
(
uint64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
int32_t
code
=
metaReaderGetTableEntryByUidCache
(
&
mr
,
*
id
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tqError
(
"failed to get table meta, uid:%"
PRIu64
" code:%s"
,
*
id
,
tstrerror
(
terrno
));
continue
;
}
tDecoderClear
(
&
mr
.
coder
);
if
(
mr
.
me
.
type
!=
TSDB_CHILD_TABLE
||
mr
.
me
.
ctbEntry
.
suid
!=
pTqHandle
->
execHandle
.
execTb
.
suid
)
{
tqDebug
(
"table uid %"
PRId64
" does not add to tq handle"
,
*
id
);
continue
;
}
tqDebug
(
"table uid %"
PRId64
" add to tq handle"
,
*
id
);
taosArrayPush
(
qa
,
id
);
SArray
*
list
=
NULL
;
int
ret
=
qGetTableList
(
pTqHandle
->
execHandle
.
execTb
.
suid
,
pTq
->
pVnode
,
pTqHandle
->
execHandle
.
execTb
.
node
,
&
list
,
pTqHandle
->
execHandle
.
task
);
if
(
ret
!=
TDB_CODE_SUCCESS
)
{
tqError
(
"qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%"
PRIx64
,
ret
,
pTqHandle
->
subKey
,
pTqHandle
->
consumerId
);
taosArrayDestroy
(
list
);
return
ret
;
}
metaReaderClear
(
&
mr
);
if
(
taosArrayGetSize
(
qa
)
>
0
)
{
tqReaderAddTbUidList
(
pTqHandle
->
execHandle
.
pTqReader
,
qa
);
}
taosArrayDestroy
(
qa
);
tqReaderSetTbUidList
(
pTqHandle
->
execHandle
.
pTqReader
,
list
);
taosArrayDestroy
(
list
);
}
else
{
tqReaderRemoveTbUidList
(
pTqHandle
->
execHandle
.
pTqReader
,
tbUidList
);
}
...
...
source/libs/executor/src/executil.c
浏览文件 @
5d98a319
...
...
@@ -960,6 +960,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
SArray
*
pBlockList
=
NULL
;
SSDataBlock
*
pResBlock
=
NULL
;
SScalarParam
output
=
{
0
};
SArray
*
pUidTagList
=
NULL
;
tagFilterAssist
ctx
=
{
0
};
ctx
.
colHash
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_SMALLINT
),
false
,
HASH_NO_LOCK
);
...
...
@@ -979,7 +980,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
SDataType
type
=
{.
type
=
TSDB_DATA_TYPE_BOOL
,
.
bytes
=
sizeof
(
bool
)};
// int64_t stt = taosGetTimestampUs();
SArray
*
pUidTagList
=
taosArrayInit
(
10
,
sizeof
(
STUidTagInfo
));
pUidTagList
=
taosArrayInit
(
10
,
sizeof
(
STUidTagInfo
));
copyExistedUids
(
pUidTagList
,
pUidList
);
FilterCondType
condType
=
checkTagCond
(
pTagCond
);
...
...
@@ -1151,6 +1152,21 @@ _end:
return
code
;
}
int32_t
qGetTableList
(
int64_t
suid
,
void
*
pVnode
,
void
*
node
,
SArray
**
tableList
,
void
*
pTaskInfo
){
SSubplan
*
pSubplan
=
(
SSubplan
*
)
node
;
SScanPhysiNode
pNode
=
{
0
};
pNode
.
suid
=
suid
;
pNode
.
uid
=
suid
;
pNode
.
tableType
=
TSDB_SUPER_TABLE
;
STableListInfo
*
pTableListInfo
=
tableListCreate
();
uint8_t
digest
[
17
]
=
{
0
};
int
code
=
getTableList
(
pVnode
,
&
pNode
,
pSubplan
?
pSubplan
->
pTagCond
:
NULL
,
pSubplan
?
pSubplan
->
pTagIndexCond
:
NULL
,
pTableListInfo
,
digest
,
"qGetTableList"
,
&
((
SExecTaskInfo
*
)
pTaskInfo
)
->
storageAPI
);
*
tableList
=
pTableListInfo
->
pTableList
;
pTableListInfo
->
pTableList
=
NULL
;
tableListDestroy
(
pTableListInfo
);
return
code
;
}
size_t
getTableTagsBufLen
(
const
SNodeList
*
pGroups
)
{
size_t
keyLen
=
0
;
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
5d98a319
...
...
@@ -921,6 +921,7 @@ void nodesDestroyNode(SNode* pNode) {
break
;
case
QUERY_NODE_CREATE_TOPIC_STMT
:
nodesDestroyNode
(((
SCreateTopicStmt
*
)
pNode
)
->
pQuery
);
nodesDestroyNode
(((
SCreateTopicStmt
*
)
pNode
)
->
pWhere
);
break
;
case
QUERY_NODE_DROP_TOPIC_STMT
:
// no pointer field
case
QUERY_NODE_DROP_CGROUP_STMT
:
// no pointer field
...
...
source/libs/parser/inc/parAst.h
浏览文件 @
5d98a319
...
...
@@ -207,7 +207,7 @@ SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists,
SNode
*
createCreateTopicStmtUseDb
(
SAstCreateContext
*
pCxt
,
bool
ignoreExists
,
SToken
*
pTopicName
,
SToken
*
pSubDbName
,
bool
withMeta
);
SNode
*
createCreateTopicStmtUseTable
(
SAstCreateContext
*
pCxt
,
bool
ignoreExists
,
SToken
*
pTopicName
,
SNode
*
pRealTable
,
bool
withMeta
);
bool
withMeta
,
SNode
*
pWhere
);
SNode
*
createDropTopicStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreNotExists
,
SToken
*
pTopicName
);
SNode
*
createDropCGroupStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreNotExists
,
const
SToken
*
pCGroupId
,
SToken
*
pTopicName
);
SNode
*
createAlterLocalStmt
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pConfig
,
const
SToken
*
pValue
);
...
...
source/libs/parser/inc/parUtil.h
浏览文件 @
5d98a319
...
...
@@ -115,6 +115,7 @@ int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName,
int32_t
getTableCfgFromCache
(
SParseMetaCache
*
pMetaCache
,
const
SName
*
pName
,
STableCfg
**
pOutput
);
int32_t
getDnodeListFromCache
(
SParseMetaCache
*
pMetaCache
,
SArray
**
pDnodes
);
void
destoryParseMetaCache
(
SParseMetaCache
*
pMetaCache
,
bool
request
);
SNode
*
createSelectStmtImpl
(
bool
isDistinct
,
SNodeList
*
pProjectionList
,
SNode
*
pTable
);
#ifdef __cplusplus
}
...
...
source/libs/parser/inc/sql.y
浏览文件 @
5d98a319
...
...
@@ -543,9 +543,9 @@ cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS DATABASE db_name(C).
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B)
WITH META AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmtUseDb(pCxt, A, &B, &C, true); }
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B)
AS STABLE full_table_name(C)
. { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, false
); }
AS STABLE full_table_name(C)
where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, false, D
); }
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B)
WITH META AS STABLE full_table_name(C)
. { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, true
); }
WITH META AS STABLE full_table_name(C)
where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, true, D
); }
cmd ::= DROP TOPIC exists_opt(A) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B); }
cmd ::= DROP CONSUMER GROUP exists_opt(A) cgroup_name(B) ON topic_name(C). { pCxt->pRootNode = createDropCGroupStmt(pCxt, A, &B, &C); }
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
5d98a319
...
...
@@ -822,16 +822,9 @@ SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) {
SNode
*
createSelectStmt
(
SAstCreateContext
*
pCxt
,
bool
isDistinct
,
SNodeList
*
pProjectionList
,
SNode
*
pTable
)
{
CHECK_PARSER_STATUS
(
pCxt
);
S
SelectStmt
*
select
=
(
SSelectStmt
*
)
nodesMakeNode
(
QUERY_NODE_SELECT_STMT
);
S
Node
*
select
=
createSelectStmtImpl
(
isDistinct
,
pProjectionList
,
pTable
);
CHECK_OUT_OF_MEM
(
select
);
select
->
isDistinct
=
isDistinct
;
select
->
pProjectionList
=
pProjectionList
;
select
->
pFromTable
=
pTable
;
sprintf
(
select
->
stmtName
,
"%p"
,
select
);
select
->
isTimeLineResult
=
true
;
select
->
onlyHasKeepOrderFunc
=
true
;
select
->
timeRange
=
TSWINDOW_INITIALIZER
;
return
(
SNode
*
)
select
;
return
select
;
}
static
void
setSubquery
(
SNode
*
pStmt
)
{
...
...
@@ -1712,7 +1705,7 @@ SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, ST
}
SNode
*
createCreateTopicStmtUseTable
(
SAstCreateContext
*
pCxt
,
bool
ignoreExists
,
SToken
*
pTopicName
,
SNode
*
pRealTable
,
bool
withMeta
)
{
bool
withMeta
,
SNode
*
pWhere
)
{
CHECK_PARSER_STATUS
(
pCxt
);
if
(
!
checkTopicName
(
pCxt
,
pTopicName
))
{
return
NULL
;
...
...
@@ -1722,6 +1715,8 @@ SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists,
COPY_STRING_FORM_ID_TOKEN
(
pStmt
->
topicName
,
pTopicName
);
pStmt
->
ignoreExists
=
ignoreExists
;
pStmt
->
withMeta
=
withMeta
;
pStmt
->
pWhere
=
pWhere
;
strcpy
(
pStmt
->
subDbName
,
((
SRealTableNode
*
)
pRealTable
)
->
table
.
dbName
);
strcpy
(
pStmt
->
subSTbName
,
((
SRealTableNode
*
)
pRealTable
)
->
table
.
tableName
);
nodesDestroyNode
(
pRealTable
);
...
...
source/libs/parser/src/parAstParser.c
浏览文件 @
5d98a319
...
...
@@ -355,6 +355,11 @@ static int32_t collectMetaKeyFromCreateTopic(SCollectMetaKeyCxt* pCxt, SCreateTo
if
(
NULL
!=
pStmt
->
pQuery
)
{
return
collectMetaKeyFromQuery
(
pCxt
,
pStmt
->
pQuery
);
}
if
(
NULL
!=
pStmt
->
pWhere
)
{
int32_t
code
=
collectMetaKeyFromRealTableImpl
(
pCxt
,
pStmt
->
subDbName
,
pStmt
->
subSTbName
,
AUTH_TYPE_READ
);
return
code
;
}
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
5d98a319
...
...
@@ -55,6 +55,13 @@ typedef struct STranslateContext {
bool
showRewrite
;
}
STranslateContext
;
typedef
struct
SBuildTopicContext
{
bool
colExists
;
bool
colNotFound
;
STableMeta
*
pMeta
;
SNodeList
*
pTags
;
}
SBuildTopicContext
;
typedef
struct
SFullDatabaseName
{
char
fullDbName
[
TSDB_DB_FNAME_LEN
];
}
SFullDatabaseName
;
...
...
@@ -5823,6 +5830,9 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
toName
(
pCxt
->
pParseCxt
->
acctId
,
pStmt
->
subDbName
,
pStmt
->
subSTbName
,
&
name
);
tNameGetFullDbName
(
&
name
,
pReq
->
subDbName
);
tNameExtractFullName
(
&
name
,
pReq
->
subStbName
);
if
(
pStmt
->
pQuery
!=
NULL
)
{
code
=
nodesNodeToString
(
pStmt
->
pQuery
,
false
,
&
pReq
->
ast
,
NULL
);
}
}
else
if
(
'\0'
!=
pStmt
->
subDbName
[
0
])
{
pReq
->
subType
=
TOPIC_SUB_TYPE__DB
;
tNameSetDbName
(
&
name
,
pCxt
->
pParseCxt
->
acctId
,
pStmt
->
subDbName
,
strlen
(
pStmt
->
subDbName
));
...
...
@@ -5845,12 +5855,108 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
return
code
;
}
static
int32_t
addTagList
(
SNodeList
**
ppList
,
SNode
*
pNode
)
{
if
(
NULL
==
*
ppList
)
{
*
ppList
=
nodesMakeList
();
}
nodesListStrictAppend
(
*
ppList
,
pNode
);
return
TSDB_CODE_SUCCESS
;
}
static
EDealRes
checkColumnTagsInCond
(
SNode
*
pNode
,
void
*
pContext
)
{
SBuildTopicContext
*
pCxt
=
(
SBuildTopicContext
*
)
pContext
;
if
(
QUERY_NODE_COLUMN
==
nodeType
(
pNode
))
{
ETableColumnType
type
;
getColumnTypeFromMeta
(
pCxt
->
pMeta
,
((
SColumnNode
*
)
pNode
)
->
colName
,
&
type
);
if
(
type
==
TCOL_TYPE_COLUMN
)
{
pCxt
->
colExists
=
true
;
return
DEAL_RES_ERROR
;
}
else
if
(
type
==
TCOL_TYPE_TAG
)
{
addTagList
(
&
pCxt
->
pTags
,
nodesCloneNode
(
pNode
));
}
else
{
pCxt
->
colNotFound
=
true
;
return
DEAL_RES_ERROR
;
}
}
else
if
(
QUERY_NODE_FUNCTION
==
nodeType
(
pNode
))
{
SFunctionNode
*
pFunc
=
(
SFunctionNode
*
)
pNode
;
if
(
0
==
strcasecmp
(
pFunc
->
functionName
,
"tbname"
))
{
addTagList
(
&
pCxt
->
pTags
,
nodesCloneNode
(
pNode
));
}
}
return
DEAL_RES_CONTINUE
;
}
static
int32_t
checkCollectTopicTags
(
STranslateContext
*
pCxt
,
SCreateTopicStmt
*
pStmt
,
STableMeta
*
pMeta
,
SNodeList
**
ppProjection
)
{
SBuildTopicContext
colCxt
=
{.
colExists
=
false
,
.
colNotFound
=
false
,
.
pMeta
=
pMeta
,
.
pTags
=
NULL
};
nodesWalkExprPostOrder
(
pStmt
->
pWhere
,
checkColumnTagsInCond
,
&
colCxt
);
if
(
colCxt
.
colNotFound
)
{
nodesDestroyList
(
colCxt
.
pTags
);
return
generateSyntaxErrMsgExt
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_SYNTAX_ERROR
,
"Invalid column name"
);
}
else
if
(
colCxt
.
colExists
)
{
nodesDestroyList
(
colCxt
.
pTags
);
return
generateSyntaxErrMsgExt
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_SYNTAX_ERROR
,
"Columns are forbidden in where clause"
);
}
if
(
NULL
==
colCxt
.
pTags
)
{
// put one column to select
// for (int32_t i = 0; i < pMeta->tableInfo.numOfColumns; ++i) {
SSchema
*
column
=
&
pMeta
->
schema
[
0
];
SColumnNode
*
col
=
(
SColumnNode
*
)
nodesMakeNode
(
QUERY_NODE_COLUMN
);
strcpy
(
col
->
colName
,
column
->
name
);
strcpy
(
col
->
node
.
aliasName
,
col
->
colName
);
strcpy
(
col
->
node
.
userAlias
,
col
->
colName
);
addTagList
(
&
colCxt
.
pTags
,
(
SNode
*
)
col
);
// }
}
*
ppProjection
=
colCxt
.
pTags
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
buildQueryForTableTopic
(
STranslateContext
*
pCxt
,
SCreateTopicStmt
*
pStmt
,
SNode
**
pSelect
)
{
SParseContext
*
pParCxt
=
pCxt
->
pParseCxt
;
SRequestConnInfo
connInfo
=
{.
pTrans
=
pParCxt
->
pTransporter
,
.
requestId
=
pParCxt
->
requestId
,
.
requestObjRefId
=
pParCxt
->
requestRid
,
.
mgmtEps
=
pParCxt
->
mgmtEpSet
};
SName
name
;
STableMeta
*
pMeta
=
NULL
;
int32_t
code
=
getTableMetaImpl
(
pCxt
,
toName
(
pParCxt
->
acctId
,
pStmt
->
subDbName
,
pStmt
->
subSTbName
,
&
name
),
&
pMeta
);
if
(
code
)
{
taosMemoryFree
(
pMeta
);
return
code
;
}
if
(
TSDB_SUPER_TABLE
!=
pMeta
->
tableType
)
{
taosMemoryFree
(
pMeta
);
return
generateSyntaxErrMsgExt
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_SYNTAX_ERROR
,
"Only supertable table can be used"
);
}
SNodeList
*
pProjection
=
NULL
;
code
=
checkCollectTopicTags
(
pCxt
,
pStmt
,
pMeta
,
&
pProjection
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
SRealTableNode
*
realTable
=
(
SRealTableNode
*
)
nodesMakeNode
(
QUERY_NODE_REAL_TABLE
);
strcpy
(
realTable
->
table
.
dbName
,
pStmt
->
subDbName
);
strcpy
(
realTable
->
table
.
tableName
,
pStmt
->
subSTbName
);
strcpy
(
realTable
->
table
.
tableAlias
,
pStmt
->
subSTbName
);
*
pSelect
=
createSelectStmtImpl
(
true
,
pProjection
,
(
SNode
*
)
realTable
);
((
SSelectStmt
*
)
*
pSelect
)
->
pWhere
=
nodesCloneNode
(
pStmt
->
pWhere
);
pCxt
->
pParseCxt
->
topicQuery
=
true
;
code
=
translateQuery
(
pCxt
,
*
pSelect
);
}
taosMemoryFree
(
pMeta
);
return
code
;
}
static
int32_t
checkCreateTopic
(
STranslateContext
*
pCxt
,
SCreateTopicStmt
*
pStmt
)
{
if
(
NULL
==
pStmt
->
pQuery
)
{
if
(
NULL
==
pStmt
->
pQuery
&&
NULL
==
pStmt
->
pWhere
)
{
return
TSDB_CODE_SUCCESS
;
}
if
(
QUERY_NODE_SELECT_STMT
==
nodeType
(
pStmt
->
pQuery
))
{
if
(
pStmt
->
pWhere
)
{
return
buildQueryForTableTopic
(
pCxt
,
pStmt
,
&
pStmt
->
pQuery
);
}
else
if
(
QUERY_NODE_SELECT_STMT
==
nodeType
(
pStmt
->
pQuery
))
{
SSelectStmt
*
pSelect
=
(
SSelectStmt
*
)
pStmt
->
pQuery
;
if
(
!
pSelect
->
isDistinct
&&
(
NULL
!=
pSelect
->
pFromTable
&&
QUERY_NODE_REAL_TABLE
==
nodeType
(
pSelect
->
pFromTable
))
&&
...
...
source/libs/parser/src/parUtil.c
浏览文件 @
5d98a319
...
...
@@ -667,6 +667,22 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
return
code
;
}
SNode
*
createSelectStmtImpl
(
bool
isDistinct
,
SNodeList
*
pProjectionList
,
SNode
*
pTable
)
{
SSelectStmt
*
select
=
(
SSelectStmt
*
)
nodesMakeNode
(
QUERY_NODE_SELECT_STMT
);
if
(
NULL
==
select
)
{
return
NULL
;
}
select
->
isDistinct
=
isDistinct
;
select
->
pProjectionList
=
pProjectionList
;
select
->
pFromTable
=
pTable
;
sprintf
(
select
->
stmtName
,
"%p"
,
select
);
select
->
isTimeLineResult
=
true
;
select
->
onlyHasKeepOrderFunc
=
true
;
select
->
timeRange
=
TSWINDOW_INITIALIZER
;
return
(
SNode
*
)
select
;
}
static
int32_t
putMetaDataToHash
(
const
char
*
pKey
,
int32_t
len
,
const
SArray
*
pData
,
int32_t
index
,
SHashObj
**
pHash
)
{
if
(
NULL
==
*
pHash
)
{
*
pHash
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
...
...
source/libs/parser/src/sql.c
浏览文件 @
5d98a319
此差异已折叠。
点击以展开。
source/libs/parser/test/parInitialCTest.cpp
浏览文件 @
5d98a319
...
...
@@ -1145,6 +1145,15 @@ TEST_F(ParserInitialCTest, createTopic) {
setCreateTopicReq
(
"tp1"
,
1
,
"create topic if not exists tp1 with meta as stable st1"
,
nullptr
,
"test"
,
"st1"
,
1
);
run
(
"CREATE TOPIC IF NOT EXISTS tp1 WITH META AS STABLE st1"
);
clearCreateTopicReq
();
setCreateTopicReq
(
"tp1"
,
1
,
"create topic if not exists tp1 as stable st1 where tag1 > 0"
,
nullptr
,
"test"
,
"st1"
);
run
(
"CREATE TOPIC IF NOT EXISTS tp1 AS STABLE st1 WHERE tag1 > 0"
);
clearCreateTopicReq
();
setCreateTopicReq
(
"tp1"
,
1
,
"create topic if not exists tp1 with meta as stable st1 where tag1 > 0"
,
nullptr
,
"test"
,
"st1"
,
1
);
run
(
"CREATE TOPIC IF NOT EXISTS tp1 WITH META AS STABLE st1 WHERE tag1 > 0"
);
clearCreateTopicReq
();
}
/*
...
...
source/libs/qcom/src/queryUtil.c
浏览文件 @
5d98a319
...
...
@@ -454,6 +454,18 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
return
TSDB_CODE_SUCCESS
;
}
void
getColumnTypeFromMeta
(
STableMeta
*
pMeta
,
char
*
pName
,
ETableColumnType
*
pType
)
{
int32_t
nums
=
pMeta
->
tableInfo
.
numOfTags
+
pMeta
->
tableInfo
.
numOfColumns
;
for
(
int32_t
i
=
0
;
i
<
nums
;
++
i
)
{
if
(
0
==
strcmp
(
pName
,
pMeta
->
schema
[
i
].
name
))
{
*
pType
=
(
i
<
pMeta
->
tableInfo
.
numOfColumns
)
?
TCOL_TYPE_COLUMN
:
TCOL_TYPE_TAG
;
return
;
}
}
*
pType
=
TCOL_TYPE_NONE
;
}
void
freeVgInfo
(
SDBVgInfo
*
vgInfo
)
{
if
(
NULL
==
vgInfo
)
{
return
;
...
...
tests/parallel_test/cases.task
浏览文件 @
5d98a319
...
...
@@ -497,6 +497,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/db.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqError.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/schema.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbFilterWhere.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbFilter.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqCheckData.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqCheckData1.py
...
...
tests/script/tsim/tmq/topic.sim
浏览文件 @
5d98a319
...
...
@@ -108,4 +108,15 @@ if $rows != 6 then
return -1
endi
sql create topic topic_stable_1 as stable stb where t1 > 0
sql create topic topic_stable_2 as stable stb where t1 > 0 and t1 < 0
sql create topic topic_stable_3 as stable stb where 1 > 0
sql create topic topic_stable_4 as stable stb where abs(t1) > 0
sql_error create topic topic_stable_5 as stable stb where last(t1) > 0
sql_error create topic topic_stable_5 as stable stb where sum(t1) > 0
sql create topic topic_stable_6 as stable stb where tbname is not null
sql create topic topic_stable_7 as stable stb where tbname > 'a'
sql_error create topic topic_stable_8 as stable stb where tbname > 0 and xx < 0
sql_error create topic topic_stable_9 as stable stb where tbname > 0 and c1 < 0
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/system-test/7-tmq/stbFilterWhere.py
0 → 100644
浏览文件 @
5d98a319
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'replica'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
2
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
2
,
'showMsg'
:
1
,
'showRow'
:
1
}
tmqCom
.
initConsumerTable
()
tmqCom
.
create_database
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
dropFlag
=
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
'vgroups'
],
replica
=
paraDict
[
'replica'
])
tdSql
.
execute
(
"alter database %s wal_retention_period 3600"
%
(
paraDict
[
"dbName"
]))
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
'ctbNum'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
return
def
tmqCase_columnError
(
self
,
topicName
,
condition
):
tdLog
.
printNoPrefix
(
"======== test case error: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'replica'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
2
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
2
,
'showMsg'
:
1
,
'showRow'
:
1
}
tdLog
.
info
(
"create topics from stb with column filter"
)
topicString
=
"create topic %s as stable %s.%s where %s"
%
(
topicName
,
paraDict
[
'dbName'
],
paraDict
[
'stbName'
],
condition
)
tdLog
.
info
(
"create topic sql: %s"
%
topicString
)
tdSql
.
error
(
topicString
)
def
tmqCase
(
self
,
topicName
,
condition
):
tdLog
.
printNoPrefix
(
"======== test case: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'replica'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
2
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
2
,
'showMsg'
:
1
,
'showRow'
:
1
}
expectRowsList
=
[]
tmqCom
.
initConsumerTable
()
tdLog
.
info
(
"create topics from stb with tag filter"
)
topicString
=
"create topic %s as stable %s.%s where %s"
%
(
topicName
,
paraDict
[
'dbName'
],
paraDict
[
'stbName'
],
condition
)
tdLog
.
info
(
"create topic sql: %s"
%
topicString
)
tdSql
.
execute
(
topicString
)
queryString
=
"select * from %s.%s where %s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
],
condition
)
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
())
# init consume info, and start tmq_sim, then check consume result
tdLog
.
info
(
"insert consume info to consume processor"
)
consumerId
=
0
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
topicList
=
topicName
ifcheckdata
=
0
ifManualCommit
=
1
keyList
=
'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
paraDict
[
'pollDelay'
],
paraDict
[
"dbName"
],
paraDict
[
'showMsg'
],
paraDict
[
'showRow'
])
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
if
expectRowsList
[
0
]
!=
resultList
[
0
]:
tdLog
.
info
(
"expect consume rows: %d, act consume rows: %d"
%
(
expectRowsList
[
0
],
resultList
[
0
]))
tdLog
.
exit
(
"0 tmq consume rows error!"
)
tdLog
.
printNoPrefix
(
"======== test case end ...... "
)
def
tmqCase_addNewTable_dropTag
(
self
,
topicName
,
condition
):
tdLog
.
printNoPrefix
(
"======== test case1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'replica'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
2
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
2
,
'showMsg'
:
1
,
'showRow'
:
1
}
expectRowsList
=
[]
tmqCom
.
initConsumerTable
()
tdLog
.
info
(
"create topics from stb with tag filter"
)
topicString
=
"create topic %s as stable %s.%s where %s"
%
(
topicName
,
paraDict
[
'dbName'
],
paraDict
[
'stbName'
],
condition
)
tdLog
.
info
(
"create topic sql: %s"
%
topicString
)
tdSql
.
execute
(
topicString
)
queryString
=
"select * from %s.%s where %s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
],
condition
)
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
()
+
1
)
# init consume info, and start tmq_sim, then check consume result
tdLog
.
info
(
"insert consume info to consume processor"
)
consumerId
=
0
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
topicList
=
topicName
ifcheckdata
=
0
ifManualCommit
=
1
keyList
=
'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
paraDict
[
'pollDelay'
],
paraDict
[
"dbName"
],
paraDict
[
'showMsg'
],
paraDict
[
'showRow'
])
#add new table with one data
tdLog
.
info
(
"start insert data"
)
insertString
=
"insert into %s.tmp using %s.%s tags(1, 1, 1, 't4', 't5') values(now, 1, 1, 1, 'c4', 'c5', now)"
%
(
paraDict
[
'dbName'
],
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
tdSql
.
execute
(
insertString
)
#test drop tag
tdSql
.
error
(
"alter stable %s.%s drop tag t1"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
]))
tdSql
.
execute
(
"alter stable %s.%s drop tag t2"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
]))
tdSql
.
execute
(
"alter stable %s.%s drop column c2"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
]))
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
if
expectRowsList
[
0
]
!=
resultList
[
0
]:
tdLog
.
info
(
"expect consume rows: %d, act consume rows: %d"
%
(
expectRowsList
[
0
],
resultList
[
0
]))
tdLog
.
exit
(
"0 tmq consume rows error!"
)
tdLog
.
printNoPrefix
(
"======== test case1 end ...... "
)
def
run
(
self
):
tdSql
.
prepare
()
self
.
prepareTestEnv
()
self
.
tmqCase_columnError
(
"t1"
,
"c1 = 4 and t1 = 3"
)
self
.
tmqCase
(
"t2"
,
"2 > 1"
)
self
.
tmqCase
(
"t3"
,
"t4 = 'beijing'"
)
self
.
tmqCase
(
"t4"
,
"t4 > t3"
)
self
.
tmqCase
(
"t5"
,
"t3 = t4"
)
self
.
tmqCase_addNewTable_dropTag
(
"t6"
,
"t1 = 1"
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录