Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
214acb74
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
214acb74
编写于
8月 18, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/3.0_interval_hash_optimize
上级
7e73f361
d9e97881
变更
34
隐藏空白更改
内联
并排
Showing
34 changed file
with
557 addition
and
114 deletion
+557
-114
Jenkinsfile2
Jenkinsfile2
+1
-0
docs/zh/14-reference/14-taosKeeper.md
docs/zh/14-reference/14-taosKeeper.md
+2
-2
docs/zh/14-reference/14-taosx.md
docs/zh/14-reference/14-taosx.md
+0
-4
docs/zh/20-third-party/01-grafana.mdx
docs/zh/20-third-party/01-grafana.mdx
+6
-2
docs/zh/20-third-party/import_dashboard.webp
docs/zh/20-third-party/import_dashboard.webp
+0
-0
examples/c/stream_demo.c
examples/c/stream_demo.c
+3
-4
include/common/tcommon.h
include/common/tcommon.h
+1
-0
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+5
-1
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+3
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+1
-0
source/dnode/mnode/impl/inc/mndStb.h
source/dnode/mnode/impl/inc/mndStb.h
+1
-0
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+19
-0
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+2
-0
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+22
-10
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+12
-1
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+2
-1
source/dnode/vnode/src/meta/metaOpen.c
source/dnode/vnode/src/meta/metaOpen.c
+1
-1
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+111
-3
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+26
-4
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+3
-11
source/dnode/vnode/src/tq/tqSink.c
source/dnode/vnode/src/tq/tqSink.c
+22
-20
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+241
-33
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+2
-2
source/libs/scalar/src/scalar.c
source/libs/scalar/src/scalar.c
+3
-0
source/libs/stream/inc/streamInc.h
source/libs/stream/inc/streamInc.h
+0
-1
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+6
-2
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+27
-6
source/libs/stream/src/streamQueue.c
source/libs/stream/src/streamQueue.c
+6
-3
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+10
-2
source/util/src/tarray.c
source/util/src/tarray.c
+1
-0
source/util/src/terror.c
source/util/src/terror.c
+1
-0
tests/script/tsim/db/basic2.sim
tests/script/tsim/db/basic2.sim
+15
-0
tests/system-test/2-query/json_tag.py
tests/system-test/2-query/json_tag.py
+1
-1
未找到文件。
Jenkinsfile2
浏览文件 @
214acb74
...
...
@@ -43,6 +43,7 @@ def pre_test(){
cd ${WKC}
git reset --hard
git clean -fxd
rm -rf examples/rust/
git remote prune origin
git fetch
'''
...
...
docs/zh/14-reference/1
5
-taosKeeper.md
→
docs/zh/14-reference/1
4
-taosKeeper.md
浏览文件 @
214acb74
...
...
@@ -25,7 +25,7 @@ taosKeeper 安装方式:
<!-- taosKeeper 需要在操作系统终端执行,该工具支持两种配置方式:[命令行参数](#命令行参数启动) 和 [配置文件](#配置文件启动)。命令行参数优先级高于配置文件参数。-->
taosKeeper 需要在操作系统终端执行,该工具支持
[
配置文件启动
](
#配置文件启动
)
。
**在运行 taosKeeper 之前要确保 TDengine 集群与 taosAdapter 已经在正确运行。**
**在运行 taosKeeper 之前要确保 TDengine 集群与 taosAdapter 已经在正确运行。**
并且 TDengine 已经开启监控服务,具体请参考:
[
TDengine 监控配置
](
../config/#监控相关
)
。
<!--
### 命令行参数启动
...
...
@@ -93,7 +93,7 @@ taosKeeper 作为 TDengine 监控指标的导出工具,可以将 TDengine 产
```
shell
$
taos
#
#
如上示例,使用 log 库作为监控日志存储位置
>
use log
;
>
select
*
from cluster_info limit 1
;
```
...
...
docs/zh/14-reference/14-taosx.md
已删除
100644 → 0
浏览文件 @
7e73f361
---
sidebar_label
:
taosX
title
:
使用 taosX 在集群间复制数据
---
\ No newline at end of file
docs/zh/20-third-party/01-grafana.mdx
浏览文件 @
214acb74
...
...
@@ -193,7 +193,7 @@ docker run -d \
如上图所示,在 Query 中选中 `TDengine` 数据源,在下方查询框可输入相应 SQL 进行查询,具体说明如下:
- INPUT SQL:输入要查询的语句(该 SQL 语句的结果集应为两列多行),例如:`select
avg(mem_system) from log.dn
where ts >= $from and ts < $to interval($interval)` ,其中,from、to 和 interval 为 TDengine 插件的内置变量,表示从 Grafana 插件面板获取的查询范围和时间间隔。除了内置变量外,`也支持可以使用自定义模板变量`。
- INPUT SQL:输入要查询的语句(该 SQL 语句的结果集应为两列多行),例如:`select
_wstart, avg(mem_system) from log.dnodes_info
where ts >= $from and ts < $to interval($interval)` ,其中,from、to 和 interval 为 TDengine 插件的内置变量,表示从 Grafana 插件面板获取的查询范围和时间间隔。除了内置变量外,`也支持可以使用自定义模板变量`。
- ALIAS BY:可设置当前查询别名。
- GENERATE SQL: 点击该按钮会自动替换相应变量,并生成最终执行的语句。
...
...
@@ -205,7 +205,11 @@ docker run -d \
### 导入 Dashboard
在数据源配置页面,您可以为该数据源导入 TDinsight 面板,作为 TDengine 集群的监控可视化工具。该 Dashboard 已发布在 Grafana:[Dashboard 15167 - TDinsight](https://grafana.com/grafana/dashboards/15167)) 。其他安装方式和相关使用说明请见 [TDinsight 用户手册](/reference/tdinsight/)。
在数据源配置页面,您可以为该数据源导入 TDinsight 面板,作为 TDengine 集群的监控可视化工具。如果 TDengine 服务端为 3.0 版本请选择 `TDinsight for 3.x` 导入。

其中适配 TDengine 2.* 的 Dashboard 已发布在 Grafana:[Dashboard 15167 - TDinsight](https://grafana.com/grafana/dashboards/15167)) 。其他安装方式和相关使用说明请见 [TDinsight 用户手册](/reference/tdinsight/)。
使用 TDengine 作为数据源的其他面板,可以[在此搜索](https://grafana.com/grafana/dashboards/?dataSource=tdengine-datasource)。以下是一份不完全列表:
...
...
docs/zh/20-third-party/import_dashboard.webp
0 → 100644
浏览文件 @
214acb74
文件已添加
examples/c/stream_demo.c
浏览文件 @
214acb74
...
...
@@ -98,10 +98,9 @@ int32_t create_stream() {
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes
=
taos_query
(
pConn
,
"create stream stream1 trigger max_delay 10s into outstb as select _wstart, sum(k) from st1 partition "
"by tbname session(ts, 10s) "
);
pRes
=
taos_query
(
pConn
,
"create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, "
"count(k) from st1 partition by tbname interval(20s) "
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create stream stream1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
include/common/tcommon.h
浏览文件 @
214acb74
...
...
@@ -60,6 +60,7 @@ enum {
STREAM_INPUT__DATA_RETRIEVE
,
STREAM_INPUT__GET_RES
,
STREAM_INPUT__CHECKPOINT
,
STREAM_INPUT__DESTROY
,
};
typedef
enum
EStreamType
{
...
...
include/libs/stream/tstream.h
浏览文件 @
214acb74
...
...
@@ -53,6 +53,7 @@ enum {
TASK_SCHED_STATUS__WAITING
,
TASK_SCHED_STATUS__ACTIVE
,
TASK_SCHED_STATUS__FAILED
,
TASK_SCHED_STATUS__DROPPING
,
};
enum
{
...
...
@@ -127,6 +128,10 @@ typedef struct {
int8_t
type
;
}
SStreamCheckpoint
;
typedef
struct
{
int8_t
type
;
}
SStreamTaskDestroy
;
typedef
struct
{
int8_t
type
;
SSDataBlock
*
pBlock
;
...
...
@@ -211,7 +216,6 @@ typedef struct {
void
*
vnode
;
FTbSink
*
tbSinkFunc
;
STSchema
*
pTSchema
;
SHashObj
*
pHash
;
// groupId to tbuid
}
STaskSinkTb
;
typedef
void
FSmaSink
(
void
*
vnode
,
int64_t
smaId
,
const
SArray
*
data
);
...
...
include/util/taoserror.h
浏览文件 @
214acb74
...
...
@@ -291,6 +291,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1)
#define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2)
#define TSDB_CODE_MND_STREAM_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03F3)
#define TSDB_CODE_MND_STREAM_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x03F4)
// mnode-sma
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)
...
...
source/common/src/tdatablock.c
浏览文件 @
214acb74
...
...
@@ -1343,12 +1343,14 @@ SSDataBlock* createDataBlock() {
SSDataBlock
*
pBlock
=
taosMemoryCalloc
(
1
,
sizeof
(
SSDataBlock
));
if
(
pBlock
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pBlock
->
pDataBlock
=
taosArrayInit
(
4
,
sizeof
(
SColumnInfoData
));
if
(
pBlock
->
pDataBlock
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
pBlock
);
return
NULL
;
}
return
pBlock
;
...
...
@@ -1423,6 +1425,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
}
void
colDataDestroy
(
SColumnInfoData
*
pColData
)
{
if
(
!
pColData
)
return
;
if
(
IS_VAR_DATA_TYPE
(
pColData
->
info
.
type
))
{
taosMemoryFreeClear
(
pColData
->
varmeta
.
offset
);
}
else
{
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
214acb74
...
...
@@ -636,6 +636,7 @@ typedef struct {
int32_t
tEncodeSStreamObj
(
SEncoder
*
pEncoder
,
const
SStreamObj
*
pObj
);
int32_t
tDecodeSStreamObj
(
SDecoder
*
pDecoder
,
SStreamObj
*
pObj
);
void
tFreeStreamObj
(
SStreamObj
*
pObj
);
typedef
struct
{
char
streamName
[
TSDB_STREAM_FNAME_LEN
];
...
...
source/dnode/mnode/impl/inc/mndStb.h
浏览文件 @
214acb74
...
...
@@ -34,6 +34,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate);
SDbObj
*
mndAcquireDbByStb
(
SMnode
*
pMnode
,
const
char
*
stbName
);
int32_t
mndBuildStbFromReq
(
SMnode
*
pMnode
,
SStbObj
*
pDst
,
SMCreateStbReq
*
pCreate
,
SDbObj
*
pDb
);
int32_t
mndAddStbToTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SStbObj
*
pStb
);
void
mndFreeStb
(
SStbObj
*
pStb
);
void
mndExtractDbNameFromStbFullName
(
const
char
*
stbFullName
,
char
*
dst
);
void
mndExtractTbNameFromStbFullName
(
const
char
*
stbFullName
,
char
*
dst
,
int32_t
dstSize
);
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
214acb74
...
...
@@ -116,6 +116,25 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
return
0
;
}
void
tFreeStreamObj
(
SStreamObj
*
pStream
)
{
taosMemoryFree
(
pStream
->
sql
);
taosMemoryFree
(
pStream
->
ast
);
taosMemoryFree
(
pStream
->
physicalPlan
);
if
(
pStream
->
outputSchema
.
nCols
)
taosMemoryFree
(
pStream
->
outputSchema
.
pSchema
);
int32_t
sz
=
taosArrayGetSize
(
pStream
->
tasks
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SArray
*
pLevel
=
taosArrayGetP
(
pStream
->
tasks
,
i
);
int32_t
taskSz
=
taosArrayGetSize
(
pLevel
);
for
(
int32_t
j
=
0
;
j
<
taskSz
;
j
++
)
{
SStreamTask
*
pTask
=
taosArrayGetP
(
pLevel
,
j
);
tFreeSStreamTask
(
pTask
);
}
taosArrayDestroy
(
pLevel
);
}
taosArrayDestroy
(
pStream
->
tasks
);
}
SMqVgEp
*
tCloneSMqVgEp
(
const
SMqVgEp
*
pVgEp
)
{
SMqVgEp
*
pVgEpNew
=
taosMemoryMalloc
(
sizeof
(
SMqVgEp
));
if
(
pVgEpNew
==
NULL
)
return
NULL
;
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
214acb74
...
...
@@ -424,6 +424,8 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
}
mndAddTaskToTaskSet
(
taskSourceLevel
,
pTask
);
pTask
->
triggerParam
=
0
;
// source
pTask
->
taskLevel
=
TASK_LEVEL__SOURCE
;
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
214acb74
...
...
@@ -266,6 +266,15 @@ _OVER:
return
pRow
;
}
void
mndFreeStb
(
SStbObj
*
pStb
)
{
taosArrayDestroy
(
pStb
->
pFuncs
);
taosMemoryFreeClear
(
pStb
->
pColumns
);
taosMemoryFreeClear
(
pStb
->
pTags
);
taosMemoryFreeClear
(
pStb
->
comment
);
taosMemoryFreeClear
(
pStb
->
pAst1
);
taosMemoryFreeClear
(
pStb
->
pAst2
);
}
static
int32_t
mndStbActionInsert
(
SSdb
*
pSdb
,
SStbObj
*
pStb
)
{
mTrace
(
"stb:%s, perform insert action, row:%p"
,
pStb
->
name
,
pStb
);
return
0
;
...
...
@@ -273,12 +282,7 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
static
int32_t
mndStbActionDelete
(
SSdb
*
pSdb
,
SStbObj
*
pStb
)
{
mTrace
(
"stb:%s, perform delete action, row:%p"
,
pStb
->
name
,
pStb
);
taosArrayDestroy
(
pStb
->
pFuncs
);
taosMemoryFreeClear
(
pStb
->
pColumns
);
taosMemoryFreeClear
(
pStb
->
pTags
);
taosMemoryFreeClear
(
pStb
->
comment
);
taosMemoryFreeClear
(
pStb
->
pAst1
);
taosMemoryFreeClear
(
pStb
->
pAst2
);
mndFreeStb
(
pStb
);
return
0
;
}
...
...
@@ -2021,8 +2025,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
FOREACH
(
pNode
,
pNodeList
)
{
SColumnNode
*
pCol
=
(
SColumnNode
*
)
pNode
;
if
(
pCol
->
tableId
!=
suid
)
{
mDebug
(
"topic:%s, check colId:%d passed"
,
pTopic
->
name
,
pCol
->
colId
);
if
(
pCol
->
tableId
==
suid
)
{
sdbRelease
(
pSdb
,
pTopic
);
nodesDestroyNode
(
pAst
);
return
-
1
;
...
...
@@ -2045,6 +2048,16 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
pIter
=
sdbFetch
(
pSdb
,
SDB_STREAM
,
pIter
,
(
void
**
)
&
pStream
);
if
(
pIter
==
NULL
)
break
;
if
(
pStream
->
smaId
!=
0
)
{
sdbRelease
(
pSdb
,
pStream
);
continue
;
}
if
(
pStream
->
targetStbUid
==
suid
)
{
sdbRelease
(
pSdb
,
pStream
);
return
-
1
;
}
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
pStream
->
ast
,
&
pAst
)
!=
0
)
{
ASSERT
(
0
);
...
...
@@ -2057,8 +2070,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
FOREACH
(
pNode
,
pNodeList
)
{
SColumnNode
*
pCol
=
(
SColumnNode
*
)
pNode
;
if
(
pCol
->
tableId
!=
suid
)
{
mDebug
(
"stream:%s, check colId:%d passed"
,
pStream
->
name
,
pCol
->
colId
);
if
(
pCol
->
tableId
==
suid
)
{
sdbRelease
(
pSdb
,
pStream
);
nodesDestroyNode
(
pAst
);
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
214acb74
...
...
@@ -167,6 +167,9 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
static
int32_t
mndStreamActionDelete
(
SSdb
*
pSdb
,
SStreamObj
*
pStream
)
{
mTrace
(
"stream:%s, perform delete action"
,
pStream
->
name
);
taosWLockLatch
(
&
pStream
->
lock
);
tFreeStreamObj
(
pStream
);
taosWUnLockLatch
(
&
pStream
->
lock
);
return
0
;
}
...
...
@@ -493,10 +496,17 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
stbObj
.
uid
=
pStream
->
targetStbUid
;
if
(
mndAddStbToTrans
(
pMnode
,
pTrans
,
pDb
,
&
stbObj
)
<
0
)
goto
_OVER
;
if
(
mndAddStbToTrans
(
pMnode
,
pTrans
,
pDb
,
&
stbObj
)
<
0
)
{
mndFreeStb
(
&
stbObj
);
goto
_OVER
;
}
tFreeSMCreateStbReq
(
&
createReq
);
mndFreeStb
(
&
stbObj
);
return
0
;
_OVER:
tFreeSMCreateStbReq
(
&
createReq
);
mndReleaseStb
(
pMnode
,
pStb
);
mndReleaseDb
(
pMnode
,
pDb
);
return
-
1
;
...
...
@@ -715,6 +725,7 @@ _OVER:
mndReleaseDb
(
pMnode
,
pDb
);
tFreeSCMCreateStreamReq
(
&
createStreamReq
);
tFreeStreamObj
(
&
streamObj
);
return
code
;
}
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
214acb74
...
...
@@ -92,8 +92,9 @@ typedef struct SMetaEntry SMetaEntry;
void
metaReaderInit
(
SMetaReader
*
pReader
,
SMeta
*
pMeta
,
int32_t
flags
);
void
metaReaderClear
(
SMetaReader
*
pReader
);
int32_t
metaGetTableEntryByUid
(
SMetaReader
*
pReader
,
tb_uid_t
uid
);
int32_t
metaGetTableTags
(
SMeta
*
pMeta
,
uint64_t
suid
,
SArray
*
uidList
,
SHashObj
*
tags
);
int32_t
metaReadNext
(
SMetaReader
*
pReader
);
const
void
*
metaGetTableTagVal
(
SMetaEntry
*
pEntry
,
int16_t
type
,
STagVal
*
tagVal
);
const
void
*
metaGetTableTagVal
(
void
*
tag
,
int16_t
type
,
STagVal
*
tagVal
);
int
metaGetTableNameByUid
(
void
*
meta
,
uint64_t
uid
,
char
*
tbName
);
typedef
struct
SMetaFltParam
{
...
...
source/dnode/vnode/src/meta/metaOpen.c
浏览文件 @
214acb74
...
...
@@ -87,7 +87,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
}
// open pCtbIdx
ret
=
tdbTbOpen
(
"ctb.idx"
,
sizeof
(
SCtbIdxKey
),
0
,
ctbIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pCtbIdx
);
ret
=
tdbTbOpen
(
"ctb.idx"
,
sizeof
(
SCtbIdxKey
),
-
1
,
ctbIdxKeyCmpr
,
pMeta
->
pEnv
,
&
pMeta
->
pCtbIdx
);
if
(
ret
<
0
)
{
metaError
(
"vgId:%d, failed to open meta child table index since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
...
...
source/dnode/vnode/src/meta/metaQuery.c
浏览文件 @
214acb74
...
...
@@ -53,6 +53,80 @@ _err:
return
-
1
;
}
// int metaGetTableEntryByUidTest(void* meta, SArray *uidList) {
//
// SArray* readerList = taosArrayInit(taosArrayGetSize(uidList), sizeof(SMetaReader));
// SArray* uidVersion = taosArrayInit(taosArrayGetSize(uidList), sizeof(STbDbKey));
// SMeta *pMeta = meta;
// int64_t version;
// SHashObj *uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
//
// int64_t stt1 = taosGetTimestampUs();
// for(int i = 0; i < taosArrayGetSize(uidList); i++) {
// void* ppVal = NULL;
// int vlen = 0;
// uint64_t * uid = taosArrayGet(uidList, i);
// // query uid.idx
// if (tdbTbGet(pMeta->pUidIdx, uid, sizeof(*uid), &ppVal, &vlen) < 0) {
// continue;
// }
// version = *(int64_t *)ppVal;
//
// STbDbKey tbDbKey = {.version = version, .uid = *uid};
// taosArrayPush(uidVersion, &tbDbKey);
// taosHashPut(uHash, uid, sizeof(int64_t), ppVal, sizeof(int64_t));
// }
// int64_t stt2 = taosGetTimestampUs();
// qDebug("metaGetTableEntryByUidTest1 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt2-stt1);
//
// TBC *pCur = NULL;
// tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
// tdbTbcMoveToFirst(pCur);
// void *pKey = NULL;
// int kLen = 0;
//
// while(1){
// SMetaReader pReader = {0};
// int32_t ret = tdbTbcNext(pCur, &pKey, &kLen, &pReader.pBuf, &pReader.szBuf);
// if (ret < 0) break;
// STbDbKey *tmp = (STbDbKey*)pKey;
// int64_t *ver = (int64_t*)taosHashGet(uHash, &tmp->uid, sizeof(int64_t));
// if(ver == NULL || *ver != tmp->version) continue;
// taosArrayPush(readerList, &pReader);
// }
// tdbTbcClose(pCur);
//
// taosArrayClear(readerList);
// int64_t stt3 = taosGetTimestampUs();
// qDebug("metaGetTableEntryByUidTest2 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt3-stt2);
// for(int i = 0; i < taosArrayGetSize(uidVersion); i++) {
// SMetaReader pReader = {0};
//
// STbDbKey *tbDbKey = taosArrayGet(uidVersion, i);
// // query table.db
// if (tdbTbGet(pMeta->pTbDb, tbDbKey, sizeof(STbDbKey), &pReader.pBuf, &pReader.szBuf) < 0) {
// continue;
// }
// taosArrayPush(readerList, &pReader);
// }
// int64_t stt4 = taosGetTimestampUs();
// qDebug("metaGetTableEntryByUidTest3 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt4-stt3);
//
// for(int i = 0; i < taosArrayGetSize(readerList); i++){
// SMetaReader* pReader = taosArrayGet(readerList, i);
// metaReaderInit(pReader, meta, 0);
// // decode the entry
// tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
//
// if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
// }
// metaReaderClear(pReader);
// }
// int64_t stt5 = taosGetTimestampUs();
// qDebug("metaGetTableEntryByUidTest4 rows:%d, cost:%ld us", taosArrayGetSize(readerList), stt5-stt4);
// return 0;
// }
int
metaGetTableEntryByUid
(
SMetaReader
*
pReader
,
tb_uid_t
uid
)
{
SMeta
*
pMeta
=
pReader
->
pMeta
;
int64_t
version
;
...
...
@@ -749,9 +823,8 @@ SArray *metaGetSmaTbUids(SMeta *pMeta) {
#endif
const
void
*
metaGetTableTagVal
(
SMetaEntry
*
pEntry
,
int16_t
type
,
STagVal
*
val
)
{
ASSERT
(
pEntry
->
type
==
TSDB_CHILD_TABLE
);
STag
*
tag
=
(
STag
*
)
pEntry
->
ctbEntry
.
pTags
;
const
void
*
metaGetTableTagVal
(
void
*
pTag
,
int16_t
type
,
STagVal
*
val
)
{
STag
*
tag
=
(
STag
*
)
pTag
;
if
(
type
==
TSDB_DATA_TYPE_JSON
)
{
return
tag
;
}
...
...
@@ -853,6 +926,9 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
break
;
}
}
if
(
p
->
suid
!=
pKey
->
suid
)
{
break
;
}
first
=
false
;
if
(
p
!=
NULL
)
{
int32_t
cmp
=
(
*
param
->
filterFunc
)(
p
->
data
,
pKey
->
data
,
pKey
->
type
);
...
...
@@ -889,6 +965,38 @@ END:
return
ret
;
}
int32_t
metaGetTableTags
(
SMeta
*
pMeta
,
uint64_t
suid
,
SArray
*
uidList
,
SHashObj
*
tags
)
{
SMCtbCursor
*
pCur
=
metaOpenCtbCursor
(
pMeta
,
suid
);
SHashObj
*
uHash
=
NULL
;
size_t
len
=
taosArrayGetSize
(
uidList
);
// len > 0 means there already have uids
if
(
len
>
0
)
{
uHash
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
for
(
int
i
=
0
;
i
<
len
;
i
++
)
{
int64_t
*
uid
=
taosArrayGet
(
uidList
,
i
);
taosHashPut
(
uHash
,
uid
,
sizeof
(
int64_t
),
&
i
,
sizeof
(
i
));
}
}
while
(
1
)
{
tb_uid_t
id
=
metaCtbCursorNext
(
pCur
);
if
(
id
==
0
)
{
break
;
}
if
(
len
>
0
&&
taosHashGet
(
uHash
,
&
id
,
sizeof
(
int64_t
))
==
NULL
)
{
continue
;
}
else
if
(
len
==
0
)
{
taosArrayPush
(
uidList
,
&
id
);
}
taosHashPut
(
tags
,
&
id
,
sizeof
(
int64_t
),
pCur
->
pVal
,
pCur
->
vLen
);
}
taosHashCleanup
(
uHash
);
metaCloseCtbCursor
(
pCur
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
metaGetInfo
(
SMeta
*
pMeta
,
int64_t
uid
,
SMetaInfo
*
pInfo
)
{
int32_t
code
=
0
;
void
*
pData
=
NULL
;
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
214acb74
...
...
@@ -180,11 +180,29 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche
}
int
metaCreateSTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateStbReq
*
pReq
)
{
SMetaEntry
me
=
{
0
};
SMetaEntry
me
=
{
0
};
int
kLen
=
0
;
int
vLen
=
0
;
const
void
*
pKey
=
NULL
;
const
void
*
pVal
=
NULL
;
void
*
pBuf
=
NULL
;
int32_t
szBuf
=
0
;
void
*
p
=
NULL
;
// validate req
if
(
tdbTbGet
(
pMeta
->
pNameIdx
,
pReq
->
name
,
strlen
(
pReq
->
name
),
NULL
,
NULL
)
==
0
)
{
return
0
;
void
*
pData
=
NULL
;
int
nData
=
0
;
if
(
tdbTbGet
(
pMeta
->
pNameIdx
,
pReq
->
name
,
strlen
(
pReq
->
name
)
+
1
,
&
pData
,
&
nData
)
==
0
)
{
tb_uid_t
uid
=
*
(
tb_uid_t
*
)
pData
;
tdbFree
(
pData
);
SMetaInfo
info
;
metaGetInfo
(
pMeta
,
uid
,
&
info
);
if
(
info
.
uid
==
info
.
suid
)
{
return
0
;
}
else
{
terrno
=
TSDB_CODE_TDB_TABLE_ALREADY_EXIST
;
return
-
1
;
}
}
// set structs
...
...
@@ -865,6 +883,9 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaUpdateTagIdx
(
pMeta
,
&
ctbEntry
);
}
SCtbIdxKey
ctbIdxKey
=
{.
suid
=
ctbEntry
.
ctbEntry
.
suid
,
.
uid
=
uid
};
tdbTbUpsert
(
pMeta
->
pCtbIdx
,
&
ctbIdxKey
,
sizeof
(
ctbIdxKey
),
ctbEntry
.
ctbEntry
.
pTags
,
((
STag
*
)(
ctbEntry
.
ctbEntry
.
pTags
))
->
len
,
&
pMeta
->
txn
);
tDecoderClear
(
&
dc1
);
tDecoderClear
(
&
dc2
);
if
(
ctbEntry
.
ctbEntry
.
pTags
)
taosMemoryFree
((
void
*
)
ctbEntry
.
ctbEntry
.
pTags
);
...
...
@@ -1069,7 +1090,8 @@ static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
static
int
metaUpdateCtbIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
)
{
SCtbIdxKey
ctbIdxKey
=
{.
suid
=
pME
->
ctbEntry
.
suid
,
.
uid
=
pME
->
uid
};
return
tdbTbInsert
(
pMeta
->
pCtbIdx
,
&
ctbIdxKey
,
sizeof
(
ctbIdxKey
),
NULL
,
0
,
&
pMeta
->
txn
);
return
tdbTbInsert
(
pMeta
->
pCtbIdx
,
&
ctbIdxKey
,
sizeof
(
ctbIdxKey
),
pME
->
ctbEntry
.
pTags
,
((
STag
*
)(
pME
->
ctbEntry
.
pTags
))
->
len
,
&
pMeta
->
txn
);
}
int
metaCreateTagIdxKey
(
tb_uid_t
suid
,
int32_t
cid
,
const
void
*
pTagData
,
int32_t
nTagData
,
int8_t
type
,
tb_uid_t
uid
,
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
214acb74
...
...
@@ -628,8 +628,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
}
int32_t
tqExpandTask
(
STQ
*
pTq
,
SStreamTask
*
pTask
)
{
int32_t
code
=
0
;
if
(
pTask
->
taskLevel
==
TASK_LEVEL__AGG
)
{
ASSERT
(
taosArrayGetSize
(
pTask
->
childEpInfo
)
!=
0
);
}
...
...
@@ -640,8 +638,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
pTask
->
outputQueue
=
streamQueueOpen
();
if
(
pTask
->
inputQueue
==
NULL
||
pTask
->
outputQueue
==
NULL
)
{
code
=
-
1
;
goto
FAIL
;
return
-
1
;
}
pTask
->
inputStatus
=
TASK_INPUT_STATUS__NORMAL
;
...
...
@@ -686,14 +683,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
streamSetupTrigger
(
pTask
);
tqInfo
(
"
deploy
stream task on vg %d, task id %d, child id %d"
,
TD_VID
(
pTq
->
pVnode
),
pTask
->
taskId
,
tqInfo
(
"
expand
stream task on vg %d, task id %d, child id %d"
,
TD_VID
(
pTq
->
pVnode
),
pTask
->
taskId
,
pTask
->
selfChildId
);
FAIL:
if
(
pTask
->
inputQueue
)
streamQueueClose
(
pTask
->
inputQueue
);
if
(
pTask
->
outputQueue
)
streamQueueClose
(
pTask
->
outputQueue
);
// TODO free executor
return
code
;
return
0
;
}
int32_t
tqProcessTaskDeployReq
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
)
{
...
...
source/dnode/vnode/src/tq/tqSink.c
浏览文件 @
214acb74
...
...
@@ -231,34 +231,35 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
ASSERT
(
pTask
->
tbSink
.
pTSchema
);
deleteReq
.
deleteReqs
=
taosArrayInit
(
0
,
sizeof
(
SSingleDeleteReq
));
SSubmitReq
*
p
Req
=
tqBlockToSubmit
(
pVnode
,
pRes
,
pTask
->
tbSink
.
pTSchema
,
true
,
pTask
->
tbSink
.
stbUid
,
pTask
->
tbSink
.
stbFullName
,
&
deleteReq
);
SSubmitReq
*
submit
Req
=
tqBlockToSubmit
(
pVnode
,
pRes
,
pTask
->
tbSink
.
pTSchema
,
true
,
pTask
->
tbSink
.
stbUid
,
pTask
->
tbSink
.
stbFullName
,
&
deleteReq
);
tqDebug
(
"vgId:%d, task %d convert blocks over, put into write-queue"
,
TD_VID
(
pVnode
),
pTask
->
taskId
);
int32_t
code
;
int32_t
len
;
tEncodeSize
(
tEncodeSBatchDeleteReq
,
&
deleteReq
,
len
,
code
);
if
(
code
<
0
)
{
//
ASSERT
(
0
);
}
SEncoder
encoder
;
void
*
buf
=
rpcMallocCont
(
len
+
sizeof
(
SMsgHead
));
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncoderInit
(
&
encoder
,
abuf
,
len
);
tEncodeSBatchDeleteReq
(
&
encoder
,
&
deleteReq
);
tEncoderClear
(
&
encoder
);
if
(
taosArrayGetSize
(
deleteReq
.
deleteReqs
)
!=
0
)
{
int32_t
code
;
int32_t
len
;
tEncodeSize
(
tEncodeSBatchDeleteReq
,
&
deleteReq
,
len
,
code
);
if
(
code
<
0
)
{
//
ASSERT
(
0
);
}
SEncoder
encoder
;
void
*
serializedDeleteReq
=
rpcMallocCont
(
len
+
sizeof
(
SMsgHead
));
void
*
abuf
=
POINTER_SHIFT
(
serializedDeleteReq
,
sizeof
(
SMsgHead
));
tEncoderInit
(
&
encoder
,
abuf
,
len
);
tEncodeSBatchDeleteReq
(
&
encoder
,
&
deleteReq
);
tEncoderClear
(
&
encoder
);
((
SMsgHead
*
)
buf
)
->
vgId
=
pVnode
->
config
.
vgId
;
((
SMsgHead
*
)
serializedDeleteReq
)
->
vgId
=
pVnode
->
config
.
vgId
;
if
(
taosArrayGetSize
(
deleteReq
.
deleteReqs
)
!=
0
)
{
SRpcMsg
msg
=
{
.
msgType
=
TDMT_VND_BATCH_DEL
,
.
pCont
=
buf
,
.
pCont
=
serializedDeleteReq
,
.
contLen
=
len
+
sizeof
(
SMsgHead
),
};
if
(
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
WRITE_QUEUE
,
&
msg
)
!=
0
)
{
rpcFreeCont
(
serializedDeleteReq
);
tqDebug
(
"failed to put into write-queue since %s"
,
terrstr
());
}
}
...
...
@@ -268,11 +269,12 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
// build write msg
SRpcMsg
msg
=
{
.
msgType
=
TDMT_VND_SUBMIT
,
.
pCont
=
p
Req
,
.
contLen
=
ntohl
(
p
Req
->
length
),
.
pCont
=
submit
Req
,
.
contLen
=
ntohl
(
submit
Req
->
length
),
};
if
(
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
WRITE_QUEUE
,
&
msg
)
!=
0
)
{
rpcFreeCont
(
submitReq
);
tqDebug
(
"failed to put into write-queue since %s"
,
terrstr
());
}
}
source/libs/executor/src/executil.c
浏览文件 @
214acb74
...
...
@@ -221,7 +221,7 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
STagVal
tagVal
=
{
0
};
tagVal
.
cid
=
pSColumnNode
->
colId
;
const
char
*
p
=
metaGetTableTagVal
(
&
mr
->
me
,
pSColumnNode
->
node
.
resType
.
type
,
&
tagVal
);
const
char
*
p
=
metaGetTableTagVal
(
mr
->
me
.
ctbEntry
.
pTags
,
pSColumnNode
->
node
.
resType
.
type
,
&
tagVal
);
if
(
p
==
NULL
)
{
res
->
node
.
resType
.
type
=
TSDB_DATA_TYPE_NULL
;
}
else
if
(
pSColumnNode
->
node
.
resType
.
type
==
TSDB_DATA_TYPE_JSON
)
{
...
...
@@ -298,6 +298,209 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle,
return
TSDB_CODE_SUCCESS
;
}
typedef
struct
tagFilterAssist
{
SHashObj
*
colHash
;
int32_t
index
;
SArray
*
cInfoList
;
}
tagFilterAssist
;
static
EDealRes
getColumn
(
SNode
**
pNode
,
void
*
pContext
)
{
SColumnNode
*
pSColumnNode
=
NULL
;
if
(
QUERY_NODE_COLUMN
==
nodeType
((
*
pNode
)))
{
pSColumnNode
=
*
(
SColumnNode
**
)
pNode
;
}
else
if
(
QUERY_NODE_FUNCTION
==
nodeType
((
*
pNode
))){
SFunctionNode
*
pFuncNode
=
*
(
SFunctionNode
**
)(
pNode
);
if
(
pFuncNode
->
funcType
==
FUNCTION_TYPE_TBNAME
)
{
pSColumnNode
=
(
SColumnNode
*
)
nodesMakeNode
(
QUERY_NODE_COLUMN
);
if
(
NULL
==
pSColumnNode
)
{
return
DEAL_RES_ERROR
;
}
pSColumnNode
->
colId
=
-
1
;
pSColumnNode
->
colType
=
COLUMN_TYPE_TBNAME
;
pSColumnNode
->
node
.
resType
.
type
=
TSDB_DATA_TYPE_VARCHAR
;
pSColumnNode
->
node
.
resType
.
bytes
=
TSDB_TABLE_FNAME_LEN
-
1
+
VARSTR_HEADER_SIZE
;
nodesDestroyNode
(
*
pNode
);
*
pNode
=
(
SNode
*
)
pSColumnNode
;
}
else
{
return
DEAL_RES_CONTINUE
;
}
}
else
{
return
DEAL_RES_CONTINUE
;
}
tagFilterAssist
*
pData
=
(
tagFilterAssist
*
)
pContext
;
void
*
data
=
taosHashGet
(
pData
->
colHash
,
&
pSColumnNode
->
colId
,
sizeof
(
pSColumnNode
->
colId
));
if
(
!
data
){
taosHashPut
(
pData
->
colHash
,
&
pSColumnNode
->
colId
,
sizeof
(
pSColumnNode
->
colId
),
pNode
,
sizeof
((
*
pNode
)));
pSColumnNode
->
slotId
=
pData
->
index
++
;
SColumnInfo
cInfo
=
{.
colId
=
pSColumnNode
->
colId
,
.
type
=
pSColumnNode
->
node
.
resType
.
type
,
.
bytes
=
pSColumnNode
->
node
.
resType
.
bytes
};
#if TAG_FILTER_DEBUG
qDebug
(
"tagfilter build column info, slotId:%d, colId:%d, type:%d"
,
pSColumnNode
->
slotId
,
cInfo
.
colId
,
cInfo
.
type
);
#endif
taosArrayPush
(
pData
->
cInfoList
,
&
cInfo
);
}
else
{
SColumnNode
*
col
=
*
(
SColumnNode
**
)
data
;
pSColumnNode
->
slotId
=
col
->
slotId
;
}
return
DEAL_RES_CONTINUE
;
}
static
int32_t
createResultData
(
SDataType
*
pType
,
int32_t
numOfRows
,
SScalarParam
*
pParam
)
{
SColumnInfoData
*
pColumnData
=
taosMemoryCalloc
(
1
,
sizeof
(
SColumnInfoData
));
if
(
pColumnData
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
terrno
;
}
pColumnData
->
info
.
type
=
pType
->
type
;
pColumnData
->
info
.
bytes
=
pType
->
bytes
;
pColumnData
->
info
.
scale
=
pType
->
scale
;
pColumnData
->
info
.
precision
=
pType
->
precision
;
int32_t
code
=
colInfoDataEnsureCapacity
(
pColumnData
,
numOfRows
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
pColumnData
);
return
terrno
;
}
pParam
->
columnData
=
pColumnData
;
pParam
->
colAlloced
=
true
;
return
TSDB_CODE_SUCCESS
;
}
static
SColumnInfoData
*
getColInfoResult
(
void
*
metaHandle
,
uint64_t
suid
,
SArray
*
uidList
,
SNode
*
pTagCond
){
int32_t
code
=
TSDB_CODE_SUCCESS
;
SArray
*
pBlockList
=
NULL
;
SSDataBlock
*
pResBlock
=
NULL
;
SHashObj
*
tags
=
NULL
;
SScalarParam
output
=
{
0
};
tagFilterAssist
ctx
=
{
0
};
ctx
.
colHash
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_SMALLINT
),
false
,
HASH_NO_LOCK
);
if
(
ctx
.
colHash
==
NULL
){
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
ctx
.
index
=
0
;
ctx
.
cInfoList
=
taosArrayInit
(
4
,
sizeof
(
SColumnInfo
));
if
(
ctx
.
cInfoList
==
NULL
){
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
nodesRewriteExprPostOrder
(
&
pTagCond
,
getColumn
,
(
void
*
)
&
ctx
);
pResBlock
=
createDataBlock
();
if
(
pResBlock
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
ctx
.
cInfoList
);
++
i
)
{
SColumnInfoData
colInfo
=
{{
0
},
0
};
colInfo
.
info
=
*
(
SColumnInfo
*
)
taosArrayGet
(
ctx
.
cInfoList
,
i
);
blockDataAppendColInfo
(
pResBlock
,
&
colInfo
);
}
// int64_t stt = taosGetTimestampUs();
tags
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
code
=
metaGetTableTags
(
metaHandle
,
suid
,
uidList
,
tags
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
goto
end
;
}
int32_t
rows
=
taosArrayGetSize
(
uidList
);
if
(
rows
==
0
){
goto
end
;
}
// int64_t stt1 = taosGetTimestampUs();
// qDebug("generate tag meta rows:%d, cost:%ld us", rows, stt1-stt);
code
=
blockDataEnsureCapacity
(
pResBlock
,
rows
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
goto
end
;
}
// int64_t st = taosGetTimestampUs();
for
(
int32_t
i
=
0
;
i
<
rows
;
i
++
)
{
int64_t
*
uid
=
taosArrayGet
(
uidList
,
i
);
void
*
tag
=
taosHashGet
(
tags
,
uid
,
sizeof
(
int64_t
));
ASSERT
(
tag
);
for
(
int32_t
j
=
0
;
j
<
taosArrayGetSize
(
pResBlock
->
pDataBlock
);
j
++
){
SColumnInfoData
*
pColInfo
=
(
SColumnInfoData
*
)
taosArrayGet
(
pResBlock
->
pDataBlock
,
j
);
if
(
pColInfo
->
info
.
colId
==
-
1
){
// tbname
char
str
[
TSDB_TABLE_FNAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
metaGetTableNameByUid
(
metaHandle
,
*
uid
,
str
);
colDataAppend
(
pColInfo
,
i
,
str
,
false
);
#if TAG_FILTER_DEBUG
qDebug
(
"tagfilter uid:%ld, tbname:%s"
,
*
uid
,
str
+
2
);
#endif
}
else
{
STagVal
tagVal
=
{
0
};
tagVal
.
cid
=
pColInfo
->
info
.
colId
;
const
char
*
p
=
metaGetTableTagVal
(
tag
,
pColInfo
->
info
.
type
,
&
tagVal
);
if
(
p
==
NULL
||
(
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_JSON
&&
((
STag
*
)
p
)
->
nTag
==
0
)){
colDataAppend
(
pColInfo
,
i
,
p
,
true
);
}
else
if
(
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_JSON
)
{
colDataAppend
(
pColInfo
,
i
,
p
,
false
);
}
else
if
(
IS_VAR_DATA_TYPE
(
pColInfo
->
info
.
type
))
{
char
*
tmp
=
taosMemoryCalloc
(
tagVal
.
nData
+
VARSTR_HEADER_SIZE
+
1
,
1
);
varDataSetLen
(
tmp
,
tagVal
.
nData
);
memcpy
(
tmp
+
VARSTR_HEADER_SIZE
,
tagVal
.
pData
,
tagVal
.
nData
);
colDataAppend
(
pColInfo
,
i
,
tmp
,
false
);
#if TAG_FILTER_DEBUG
qDebug
(
"tagfilter varch:%s"
,
tmp
+
2
);
#endif
taosMemoryFree
(
tmp
);
}
else
{
colDataAppend
(
pColInfo
,
i
,
(
const
char
*
)
&
tagVal
.
i64
,
false
);
#if TAG_FILTER_DEBUG
if
(
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_INT
){
qDebug
(
"tagfilter int:%d"
,
*
(
int
*
)(
&
tagVal
.
i64
));
}
else
if
(
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_DOUBLE
){
qDebug
(
"tagfilter double:%f"
,
*
(
double
*
)(
&
tagVal
.
i64
));
}
#endif
}
}
}
}
pResBlock
->
info
.
rows
=
rows
;
// int64_t st1 = taosGetTimestampUs();
// qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
pBlockList
=
taosArrayInit
(
2
,
POINTER_BYTES
);
taosArrayPush
(
pBlockList
,
&
pResBlock
);
SDataType
type
=
{.
type
=
TSDB_DATA_TYPE_BOOL
,
.
bytes
=
sizeof
(
bool
)};
code
=
createResultData
(
&
type
,
rows
,
&
output
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
}
code
=
scalarCalculate
(
pTagCond
,
pBlockList
,
&
output
);
if
(
code
!=
TSDB_CODE_SUCCESS
){
terrno
=
code
;
}
// int64_t st2 = taosGetTimestampUs();
// qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
end:
taosHashCleanup
(
tags
);
taosHashCleanup
(
ctx
.
colHash
);
taosArrayDestroy
(
ctx
.
cInfoList
);
blockDataDestroy
(
pResBlock
);
taosArrayDestroy
(
pBlockList
);
return
output
.
columnData
;
}
int32_t
getTableList
(
void
*
metaHandle
,
void
*
pVnode
,
SScanPhysiNode
*
pScanNode
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
STableListInfo
*
pListInfo
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -308,63 +511,68 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
}
uint64_t
tableUid
=
pScanNode
->
uid
;
pListInfo
->
suid
=
pScanNode
->
suid
;
SArray
*
res
=
taosArrayInit
(
8
,
sizeof
(
uint64_t
));
if
(
pScanNode
->
tableType
==
TSDB_SUPER_TABLE
)
{
if
(
pTagIndexCond
)
{
SIndexMetaArg
metaArg
=
{
.
metaEx
=
metaHandle
,
.
idx
=
tsdbGetIdx
(
metaHandle
),
.
ivtIdx
=
tsdbGetIvtIdx
(
metaHandle
),
.
suid
=
tableUid
};
SArray
*
res
=
taosArrayInit
(
8
,
sizeof
(
uint64_t
)
);
// int64_t stt = taosGetTimestampUs(
);
SIdxFltStatus
status
=
SFLT_NOT_INDEX
;
code
=
doFilterTag
(
pTagIndexCond
,
&
metaArg
,
res
,
&
status
);
if
(
code
!=
0
||
status
==
SFLT_NOT_INDEX
)
{
qError
(
"failed to get tableIds from index, reason:%s, suid:%"
PRIu64
,
tstrerror
(
code
),
tableUid
);
// code = TSDB_CODE_INDEX_REBUILDING;
code
=
vnodeGetAllTableList
(
pVnode
,
tableUid
,
pListInfo
->
pTableList
);
}
else
{
qDebug
(
"success to get tableIds, size:%d, suid:%"
PRIu64
,
(
int
)
taosArrayGetSize
(
res
),
tableUid
);
code
=
TDB_CODE_SUCCESS
;
}
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
res
);
i
++
)
{
STableKeyInfo
info
=
{.
uid
=
*
(
uint64_t
*
)
taosArrayGet
(
res
,
i
),
.
groupId
=
0
};
taosArrayPush
(
pListInfo
->
pTableList
,
&
info
);
}
taosArrayDestroy
(
res
);
}
else
{
code
=
vnodeGetAllTableList
(
pVnode
,
tableUid
,
pListInfo
->
pTableList
);
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get tableIds, reason:%s, suid:%"
PRIu64
,
tstrerror
(
code
),
tableUid
);
terrno
=
code
;
return
code
;
// int64_t stt1 = taosGetTimestampUs();
// qDebug("generate table list, cost:%ld us", stt1-stt);
}
else
if
(
!
pTagCond
){
vnodeGetCtbIdList
(
pVnode
,
pScanNode
->
suid
,
res
);
}
}
else
{
// Create one table group.
STableKeyInfo
info
=
{.
uid
=
tableUid
,
.
groupId
=
0
};
taosArrayPush
(
pListInfo
->
pTableList
,
&
info
);
taosArrayPush
(
res
,
&
tableUid
);
}
if
(
pTagCond
)
{
int32_t
i
=
0
;
while
(
i
<
taosArrayGetSize
(
pListInfo
->
pTableList
))
{
STableKeyInfo
*
info
=
taosArrayGet
(
pListInfo
->
pTableList
,
i
);
bool
qualified
=
true
;
code
=
isQualifiedTable
(
info
,
pTagCond
,
metaHandle
,
&
qualified
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
SColumnInfoData
*
pColInfoData
=
getColInfoResult
(
metaHandle
,
pListInfo
->
suid
,
res
,
pTagCond
);
if
(
terrno
!=
TDB_CODE_SUCCESS
){
colDataDestroy
(
pColInfoData
);
taosMemoryFreeClear
(
pColInfoData
);
taosArrayDestroy
(
res
);
return
terrno
;
}
if
(
!
qualified
)
{
taosArrayRemove
(
pListInfo
->
pTableList
,
i
);
int32_t
i
=
0
;
int32_t
j
=
0
;
int32_t
len
=
taosArrayGetSize
(
res
);
while
(
i
<
taosArrayGetSize
(
res
)
&&
j
<
len
&&
pColInfoData
)
{
void
*
var
=
POINTER_SHIFT
(
pColInfoData
->
pData
,
j
*
pColInfoData
->
info
.
bytes
);
int64_t
*
uid
=
taosArrayGet
(
res
,
i
);
qDebug
(
"tagfilter get uid:%ld, res:%d"
,
*
uid
,
*
(
bool
*
)
var
);
if
(
*
(
bool
*
)
var
==
false
)
{
taosArrayRemove
(
res
,
i
);
j
++
;
continue
;
}
i
++
;
j
++
;
}
colDataDestroy
(
pColInfoData
);
taosMemoryFreeClear
(
pColInfoData
);
}
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
res
);
i
++
)
{
STableKeyInfo
info
=
{.
uid
=
*
(
uint64_t
*
)
taosArrayGet
(
res
,
i
),
.
groupId
=
0
};
taosArrayPush
(
pListInfo
->
pTableList
,
&
info
);
qDebug
(
"tagfilter get uid:%ld"
,
info
.
uid
);
}
taosArrayDestroy
(
res
);
pListInfo
->
pGroupList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
if
(
pListInfo
->
pGroupList
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
214acb74
...
...
@@ -440,7 +440,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
}
else
{
// these are tags
STagVal
tagVal
=
{
0
};
tagVal
.
cid
=
pExpr
->
base
.
pParam
[
0
].
pCol
->
colId
;
const
char
*
p
=
metaGetTableTagVal
(
&
mr
.
me
,
pColInfoData
->
info
.
type
,
&
tagVal
);
const
char
*
p
=
metaGetTableTagVal
(
mr
.
me
.
ctbEntry
.
pTags
,
pColInfoData
->
info
.
type
,
&
tagVal
);
char
*
data
=
NULL
;
if
(
pColInfoData
->
info
.
type
!=
TSDB_DATA_TYPE_JSON
&&
p
!=
NULL
)
{
...
...
@@ -2506,7 +2506,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
}
else
{
// it is a tag value
STagVal
val
=
{
0
};
val
.
cid
=
pExprInfo
[
j
].
base
.
pParam
[
0
].
pCol
->
colId
;
const
char
*
p
=
metaGetTableTagVal
(
&
mr
.
me
,
pDst
->
info
.
type
,
&
val
);
const
char
*
p
=
metaGetTableTagVal
(
mr
.
me
.
ctbEntry
.
pTags
,
pDst
->
info
.
type
,
&
val
);
char
*
data
=
NULL
;
if
(
pDst
->
info
.
type
!=
TSDB_DATA_TYPE_JSON
&&
p
!=
NULL
)
{
...
...
source/libs/scalar/src/scalar.c
浏览文件 @
214acb74
...
...
@@ -292,6 +292,9 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
}
SColumnInfoData
*
columnData
=
(
SColumnInfoData
*
)
taosArrayGet
(
block
->
pDataBlock
,
ref
->
slotId
);
#if TAG_FILTER_DEBUG
qDebug
(
"tagfilter column info, slotId:%d, colId:%d, type:%d"
,
ref
->
slotId
,
columnData
->
info
.
colId
,
columnData
->
info
.
type
);
#endif
param
->
numOfRows
=
block
->
info
.
rows
;
param
->
columnData
=
columnData
;
break
;
...
...
source/libs/stream/inc/streamInc.h
浏览文件 @
214acb74
...
...
@@ -32,7 +32,6 @@ typedef struct {
static
SStreamGlobalEnv
streamEnv
;
int32_t
streamExec
(
SStreamTask
*
pTask
);
int32_t
streamPipelineExec
(
SStreamTask
*
pTask
,
int32_t
batchNum
,
bool
dispatch
);
int32_t
streamDispatch
(
SStreamTask
*
pTask
);
...
...
source/libs/stream/src/stream.c
浏览文件 @
214acb74
...
...
@@ -185,7 +185,9 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
tFreeStreamDispatchReq
(
pReq
);
if
(
exec
)
{
streamTryExec
(
pTask
);
if
(
streamTryExec
(
pTask
)
<
0
)
{
return
-
1
;
}
if
(
pTask
->
outputType
==
TASK_OUTPUT__FIXED_DISPATCH
||
pTask
->
outputType
==
TASK_OUTPUT__SHUFFLE_DISPATCH
)
{
streamDispatch
(
pTask
);
...
...
@@ -221,7 +223,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
}
int32_t
streamProcessRunReq
(
SStreamTask
*
pTask
)
{
streamTryExec
(
pTask
);
if
(
streamTryExec
(
pTask
)
<
0
)
{
return
-
1
;
}
if
(
pTask
->
outputType
==
TASK_OUTPUT__FIXED_DISPATCH
||
pTask
->
outputType
==
TASK_OUTPUT__SHUFFLE_DISPATCH
)
{
streamDispatch
(
pTask
);
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
214acb74
...
...
@@ -15,6 +15,7 @@
#include "executor.h"
#include "tstream.h"
#include "ttimer.h"
SStreamMeta
*
streamMetaOpen
(
const
char
*
path
,
void
*
ahandle
,
FTaskExpand
expandFunc
)
{
SStreamMeta
*
pMeta
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamMeta
));
...
...
@@ -99,16 +100,19 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char*
goto
FAIL
;
}
taosHashPut
(
pMeta
->
pTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
&
pTask
,
sizeof
(
void
*
));
if
(
taosHashPut
(
pMeta
->
pTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
&
pTask
,
sizeof
(
void
*
))
<
0
)
{
goto
FAIL
;
}
if
(
tdbTbUpsert
(
pMeta
->
pTaskDb
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
msg
,
msgLen
,
&
pMeta
->
txn
)
<
0
)
{
taosHashRemove
(
pMeta
->
pTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
));
ASSERT
(
0
);
return
-
1
;
goto
FAIL
;
}
return
0
;
FAIL:
if
(
pTask
)
t
aosMemoryFree
(
pTask
);
if
(
pTask
)
t
FreeSStreamTask
(
pTask
);
return
-
1
;
}
...
...
@@ -158,11 +162,28 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask
*
pTask
=
*
ppTask
;
taosHashRemove
(
pMeta
->
pTasks
,
&
taskId
,
sizeof
(
int32_t
));
atomic_store_8
(
&
pTask
->
taskStatus
,
TASK_STATUS__DROPPING
);
}
if
(
tdbTbDelete
(
pMeta
->
pTaskDb
,
&
taskId
,
sizeof
(
int32_t
),
&
pMeta
->
txn
)
<
0
)
{
/*return -1;*/
if
(
tdbTbDelete
(
pMeta
->
pTaskDb
,
&
taskId
,
sizeof
(
int32_t
),
&
pMeta
->
txn
)
<
0
)
{
/*return -1;*/
}
if
(
pTask
->
triggerParam
!=
0
)
{
taosTmrStop
(
pTask
->
timer
);
}
while
(
1
)
{
int8_t
schedStatus
=
atomic_val_compare_exchange_8
(
&
pTask
->
schedStatus
,
TASK_SCHED_STATUS__INACTIVE
,
TASK_SCHED_STATUS__DROPPING
);
if
(
schedStatus
==
TASK_SCHED_STATUS__INACTIVE
)
{
tFreeSStreamTask
(
pTask
);
break
;
}
else
if
(
schedStatus
==
TASK_SCHED_STATUS__DROPPING
)
{
break
;
}
taosMsleep
(
10
);
}
}
return
0
;
}
...
...
source/libs/stream/src/streamQueue.c
浏览文件 @
214acb74
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
tstream
.h"
#include "
streamInc
.h"
SStreamQueue
*
streamQueueOpen
()
{
SStreamQueue
*
pQueue
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamQueue
));
...
...
@@ -36,9 +36,12 @@ void streamQueueClose(SStreamQueue* queue) {
while
(
1
)
{
void
*
qItem
=
streamQueueNextItem
(
queue
);
if
(
qItem
)
{
taos
FreeQitem
(
qItem
);
stream
FreeQitem
(
qItem
);
}
else
{
return
;
break
;
}
}
taosFreeQall
(
queue
->
qall
);
taosCloseQueue
(
queue
->
queue
);
taosMemoryFree
(
queue
);
}
source/libs/stream/src/streamTask.c
浏览文件 @
214acb74
...
...
@@ -152,9 +152,17 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
}
void
tFreeSStreamTask
(
SStreamTask
*
pTask
)
{
streamQueueClose
(
pTask
->
inputQueue
);
streamQueueClose
(
pTask
->
outputQueue
);
if
(
pTask
->
inputQueue
)
streamQueueClose
(
pTask
->
inputQueue
);
if
(
pTask
->
outputQueue
)
streamQueueClose
(
pTask
->
outputQueue
);
if
(
pTask
->
exec
.
qmsg
)
taosMemoryFree
(
pTask
->
exec
.
qmsg
);
if
(
pTask
->
exec
.
executor
)
qDestroyTask
(
pTask
->
exec
.
executor
);
taosArrayDestroy
(
pTask
->
childEpInfo
);
if
(
pTask
->
outputType
==
TASK_OUTPUT__TABLE
)
{
tDeleteSSchemaWrapper
(
pTask
->
tbSink
.
pSchemaWrapper
);
taosMemoryFree
(
pTask
->
tbSink
.
pTSchema
);
}
if
(
pTask
->
outputType
==
TASK_OUTPUT__SHUFFLE_DISPATCH
)
{
taosArrayDestroy
(
pTask
->
shuffleDispatcher
.
dbInfo
.
pVgroupInfos
);
}
taosMemoryFree
(
pTask
);
}
source/util/src/tarray.c
浏览文件 @
214acb74
...
...
@@ -386,6 +386,7 @@ void* taosArrayDestroy(SArray* pArray) {
}
void
taosArrayDestroyP
(
SArray
*
pArray
,
FDelete
fp
)
{
if
(
!
pArray
)
return
;
for
(
int32_t
i
=
0
;
i
<
pArray
->
size
;
i
++
)
{
fp
(
*
(
void
**
)
TARRAY_GET_ELEM
(
pArray
,
i
));
}
...
...
source/util/src/terror.c
浏览文件 @
214acb74
...
...
@@ -293,6 +293,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_STREAM_ALREADY_EXIST
,
"Stream already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_STREAM_NOT_EXIST
,
"Stream not exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_STREAM_OPTION
,
"Invalid stream option"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_STREAM_MUST_BE_DELETED
,
"Stream must be dropped first"
)
// mnode-sma
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_SMA_ALREADY_EXIST
,
"SMA already exists"
)
...
...
tests/script/tsim/db/basic2.sim
浏览文件 @
214acb74
...
...
@@ -3,6 +3,21 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
print =============== conflict stb
sql create database db vgroups 1;
sql use db;
sql create table stb (ts timestamp, i int) tags (j int);
sql_error create table stb using stb tags (1);
sql_error create table stb (ts timestamp, i int);
sql create table ctb (ts timestamp, i int);
sql_error create table ctb (ts timestamp, i int) tags (j int);
sql create table ntb (ts timestamp, i int);
sql_error create table ntb (ts timestamp, i int) tags (j int);
sql drop database db
print =============== create database d1
sql create database d1
sql use d1
...
...
tests/system-test/2-query/json_tag.py
浏览文件 @
214acb74
...
...
@@ -197,7 +197,7 @@ class TDTestCase:
# test where with json tag
tdSql
.
query
(
f
"select * from
{
dbname
}
.jsons1_1 where jtag is not null"
)
tdSql
.
query
(
f
"select * from
{
dbname
}
.jsons1 where jtag='{{
\"
tag1
\"
:11,
\"
tag2
\"
:
\"\"
}}'"
)
tdSql
.
error
(
f
"select * from
{
dbname
}
.jsons1 where jtag='{{
\"
tag1
\"
:11,
\"
tag2
\"
:
\"\"
}}'"
)
tdSql
.
error
(
f
"select * from
{
dbname
}
.jsons1 where jtag->'tag1'={{}}"
)
# test json error
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录