Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
cd2eaf01
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
cd2eaf01
编写于
7月 19, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(query): set the output column number for subscribe.
上级
c9e39e72
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
31 addition
and
16 deletion
+31
-16
include/libs/executor/executor.h
include/libs/executor/executor.h
+1
-1
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+1
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+1
-1
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+5
-5
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+2
-1
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+6
-6
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+14
-1
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+1
-0
未找到文件。
include/libs/executor/executor.h
浏览文件 @
cd2eaf01
...
...
@@ -63,7 +63,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
* @param SReadHandle
* @return
*/
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
readers
);
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
readers
,
int32_t
*
numOfCols
);
/**
* Set the input data block for the stream scan.
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
cd2eaf01
...
...
@@ -88,7 +88,7 @@ typedef struct {
STqExecTb
execTb
;
STqExecDb
execDb
;
};
int32_t
numOfCols
;
// number of out pout column, temporarily used
}
STqExecHandle
;
typedef
struct
{
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
cd2eaf01
...
...
@@ -506,7 +506,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.
initTqReader
=
true
,
.
version
=
ver
,
};
pHandle
->
execHandle
.
execCol
.
task
[
i
]
=
qCreateQueueExecTaskInfo
(
pHandle
->
execHandle
.
execCol
.
qmsg
,
&
handle
);
pHandle
->
execHandle
.
execCol
.
task
[
i
]
=
qCreateQueueExecTaskInfo
(
pHandle
->
execHandle
.
execCol
.
qmsg
,
&
handle
,
&
pHandle
->
execHandle
.
numOfCols
);
ASSERT
(
pHandle
->
execHandle
.
execCol
.
task
[
i
]);
void
*
scanner
=
NULL
;
qExtractStreamScanner
(
pHandle
->
execHandle
.
execCol
.
task
[
i
],
&
scanner
);
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
cd2eaf01
...
...
@@ -15,7 +15,7 @@
#include "tq.h"
static
int32_t
tqAddBlockDataToRsp
(
const
SSDataBlock
*
pBlock
,
SMqDataRsp
*
pRsp
)
{
static
int32_t
tqAddBlockDataToRsp
(
const
SSDataBlock
*
pBlock
,
SMqDataRsp
*
pRsp
,
int32_t
numOfCols
)
{
int32_t
dataStrLen
=
sizeof
(
SRetrieveTableRsp
)
+
blockGetEncodeSize
(
pBlock
);
void
*
buf
=
taosMemoryCalloc
(
1
,
dataStrLen
);
if
(
buf
==
NULL
)
return
-
1
;
...
...
@@ -29,7 +29,7 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp)
// TODO enable compress
int32_t
actualLen
=
0
;
blockEncode
(
pBlock
,
pRetrieve
->
data
,
&
actualLen
,
taosArrayGetSize
(
pBlock
->
pDataBlock
)
,
false
);
blockEncode
(
pBlock
,
pRetrieve
->
data
,
&
actualLen
,
numOfCols
,
false
);
actualLen
+=
sizeof
(
SRetrieveTableRsp
);
ASSERT
(
actualLen
<=
dataStrLen
);
taosArrayPush
(
pRsp
->
blockDataLen
,
&
actualLen
);
...
...
@@ -87,7 +87,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
tqDebug
(
"task execute end, get %p"
,
pDataBlock
);
if
(
pDataBlock
!=
NULL
)
{
tqAddBlockDataToRsp
(
pDataBlock
,
pRsp
);
tqAddBlockDataToRsp
(
pDataBlock
,
pRsp
,
pExec
->
numOfCols
);
pRsp
->
blockNum
++
;
if
(
pRsp
->
withTbName
)
{
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
...
...
@@ -195,7 +195,7 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
if
(
terrno
==
TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND
)
continue
;
ASSERT
(
0
);
}
tqAddBlockDataToRsp
(
&
block
,
pRsp
);
tqAddBlockDataToRsp
(
&
block
,
pRsp
,
taosArrayGetSize
(
block
.
pDataBlock
)
);
if
(
pRsp
->
withTbName
)
{
int64_t
uid
=
pExec
->
pExecReader
[
workerId
]
->
msgIter
.
uid
;
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
);
...
...
@@ -213,7 +213,7 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
if
(
terrno
==
TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND
)
continue
;
ASSERT
(
0
);
}
tqAddBlockDataToRsp
(
&
block
,
pRsp
);
tqAddBlockDataToRsp
(
&
block
,
pRsp
,
taosArrayGetSize
(
block
.
pDataBlock
)
);
if
(
pRsp
->
withTbName
)
{
int64_t
uid
=
pExec
->
pExecReader
[
workerId
]
->
msgIter
.
uid
;
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
);
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
cd2eaf01
...
...
@@ -92,7 +92,8 @@ int32_t tqMetaOpen(STQ* pTq) {
.
initTqReader
=
true
,
.
version
=
handle
.
snapshotVer
,
};
handle
.
execHandle
.
execCol
.
task
[
i
]
=
qCreateQueueExecTaskInfo
(
handle
.
execHandle
.
execCol
.
qmsg
,
&
reader
);
handle
.
execHandle
.
execCol
.
task
[
i
]
=
qCreateQueueExecTaskInfo
(
handle
.
execHandle
.
execCol
.
qmsg
,
&
reader
,
&
handle
.
execHandle
.
numOfCols
);
ASSERT
(
handle
.
execHandle
.
execCol
.
task
[
i
]);
void
*
scanner
=
NULL
;
qExtractStreamScanner
(
handle
.
execHandle
.
execCol
.
task
[
i
],
&
scanner
);
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
cd2eaf01
...
...
@@ -67,15 +67,15 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
return
false
;
}
// clang-format off
// data format:
// +----------------+--------------+----------+--------------------------------------------+--------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | total length | group id | col1_schema | col2_schema | col3_schema ...| column#1 length, column#2
// length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | .... | | (4 bytes) |(8 bytes)
// |(sizeof(int16_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | |
// actual size | |
// +----------------+--------------+----------+--------------------------------------------+--------------------------------------+-------------+-----------+-------------+-----------+
// +----------------+--------------+-----------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | total length | group id | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data | .... | | (4 bytes) |(8 bytes)
// | |sizeof(int32) |sizeof(uint64_t) |(sizeof(int16_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | |
// +----------------+--------------+-----------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header
// clang-format on
static
void
toDataCacheEntry
(
SDataDispatchHandle
*
pHandle
,
const
SInputData
*
pInput
,
SDataDispatchBuf
*
pBuf
)
{
int32_t
numOfCols
=
0
;
SNode
*
pNode
;
...
...
source/libs/executor/src/executor.c
浏览文件 @
cd2eaf01
...
...
@@ -104,7 +104,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
return
code
;
}
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
readers
)
{
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
readers
,
int32_t
*
numOfCols
)
{
if
(
msg
==
NULL
)
{
// TODO create raw scan
return
NULL
;
...
...
@@ -125,6 +125,19 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers) {
return
NULL
;
}
// extract the number of output columns
SDataBlockDescNode
*
pDescNode
=
plan
->
pNode
->
pOutputDataBlockDesc
;
*
numOfCols
=
0
;
SNode
*
pNode
;
FOREACH
(
pNode
,
pDescNode
->
pSlots
)
{
SSlotDescNode
*
pSlotDesc
=
(
SSlotDescNode
*
)
pNode
;
if
(
pSlotDesc
->
output
)
{
++
(
*
numOfCols
);
}
}
return
pTaskInfo
;
}
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
cd2eaf01
...
...
@@ -39,6 +39,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
taosThreadOnce
(
&
initPoolOnce
,
initRefPool
);
atexit
(
cleanupRefPool
);
int32_t
code
=
createExecTaskInfoImpl
(
pSubplan
,
pTask
,
readHandle
,
taskId
,
sql
,
model
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录