Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
900f5cf5
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
900f5cf5
编写于
9月 01, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
9月 01, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #16557 from taosdata/feature/stream
feat(taox): support auto create sub table
上级
4bc13fe0
3bbab43b
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
232 addition
and
103 deletion
+232
-103
include/common/tmsg.h
include/common/tmsg.h
+1
-0
source/client/src/tmq.c
source/client/src/tmq.c
+1
-1
source/common/src/tmsg.c
source/common/src/tmsg.c
+11
-0
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+3
-2
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+139
-46
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+63
-47
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-0
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+1
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+12
-7
未找到文件。
include/common/tmsg.h
浏览文件 @
900f5cf5
...
...
@@ -2994,6 +2994,7 @@ typedef struct {
int32_t
tEncodeSTaosxRsp
(
SEncoder
*
pEncoder
,
const
STaosxRsp
*
pRsp
);
int32_t
tDecodeSTaosxRsp
(
SDecoder
*
pDecoder
,
STaosxRsp
*
pRsp
);
void
tDeleteSTaosxRsp
(
STaosxRsp
*
pRsp
);
typedef
struct
{
SMqRspHead
head
;
...
...
source/client/src/tmq.c
浏览文件 @
900f5cf5
...
...
@@ -1773,7 +1773,7 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) {
}
return
TMQ_RES_TABLE_META
;
}
else
if
(
TD_RES_TMQ_TAOSX
(
res
))
{
return
TMQ_RES_
TAOSX
;
return
TMQ_RES_
DATA
;
}
else
{
return
TMQ_RES_INVALID
;
}
...
...
source/common/src/tmsg.c
浏览文件 @
900f5cf5
...
...
@@ -5984,6 +5984,17 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
}
return
0
;
}
void
tDeleteSTaosxRsp
(
STaosxRsp
*
pRsp
)
{
taosArrayDestroy
(
pRsp
->
blockDataLen
);
taosArrayDestroyP
(
pRsp
->
blockData
,
(
FDelete
)
taosMemoryFree
);
taosArrayDestroyP
(
pRsp
->
blockSchema
,
(
FDelete
)
tDeleteSSchemaWrapper
);
taosArrayDestroyP
(
pRsp
->
blockTbName
,
(
FDelete
)
taosMemoryFree
);
taosArrayDestroy
(
pRsp
->
createTableLen
);
taosArrayDestroyP
(
pRsp
->
createTableReq
,
(
FDelete
)
taosMemoryFree
);
}
int32_t
tEncodeSSingleDeleteReq
(
SEncoder
*
pEncoder
,
const
SSingleDeleteReq
*
pReq
)
{
if
(
tEncodeI64
(
pEncoder
,
pReq
->
uid
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pReq
->
ts
)
<
0
)
return
-
1
;
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
900f5cf5
...
...
@@ -140,11 +140,12 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
int32_t
tDecodeSTqHandle
(
SDecoder
*
pDecoder
,
STqHandle
*
pHandle
);
// tqRead
int64_t
tqScan
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
SMqDataRsp
*
pRsp
,
SMqMetaRsp
*
pMetaRsp
,
STqOffsetVal
*
offset
);
int32_t
tqScan
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
STaosxRsp
*
pRsp
,
SMqMetaRsp
*
pMetaRsp
,
STqOffsetVal
*
offset
);
int32_t
tqScanData
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
pOffset
);
int64_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
SWalCkHead
**
pHeadWithCkSum
);
// tqExec
int32_t
tq
LogScanExec
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
SSubmitReq
*
pReq
,
SMqData
Rsp
*
pRsp
);
int32_t
tq
TaosxScanLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
SSubmitReq
*
pReq
,
STaosx
Rsp
*
pRsp
);
int32_t
tqSendDataRsp
(
STQ
*
pTq
,
const
SRpcMsg
*
pMsg
,
const
SMqPollReq
*
pReq
,
const
SMqDataRsp
*
pRsp
);
// tqMeta
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
900f5cf5
...
...
@@ -196,6 +196,66 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
return
0
;
}
int32_t
tqSendTaosxRsp
(
STQ
*
pTq
,
const
SRpcMsg
*
pMsg
,
const
SMqPollReq
*
pReq
,
const
STaosxRsp
*
pRsp
)
{
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockData
)
==
pRsp
->
blockNum
);
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockDataLen
)
==
pRsp
->
blockNum
);
if
(
pRsp
->
withSchema
)
{
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockSchema
)
==
pRsp
->
blockNum
);
}
else
{
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockSchema
)
==
0
);
}
if
(
pRsp
->
reqOffset
.
type
==
TMQ_OFFSET__LOG
)
{
if
(
pRsp
->
blockNum
>
0
)
{
ASSERT
(
pRsp
->
rspOffset
.
version
>
pRsp
->
reqOffset
.
version
);
}
else
{
ASSERT
(
pRsp
->
rspOffset
.
version
>=
pRsp
->
reqOffset
.
version
);
}
}
int32_t
len
=
0
;
int32_t
code
=
0
;
tEncodeSize
(
tEncodeSTaosxRsp
,
pRsp
,
len
,
code
);
if
(
code
<
0
)
{
return
-
1
;
}
int32_t
tlen
=
sizeof
(
SMqRspHead
)
+
len
;
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
return
-
1
;
}
((
SMqRspHead
*
)
buf
)
->
mqMsgType
=
TMQ_MSG_TYPE__TAOSX_RSP
;
((
SMqRspHead
*
)
buf
)
->
epoch
=
pReq
->
epoch
;
((
SMqRspHead
*
)
buf
)
->
consumerId
=
pReq
->
consumerId
;
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMqRspHead
));
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
abuf
,
len
);
tEncodeSTaosxRsp
(
&
encoder
,
pRsp
);
tEncoderClear
(
&
encoder
);
SRpcMsg
rsp
=
{
.
info
=
pMsg
->
info
,
.
pCont
=
buf
,
.
contLen
=
tlen
,
.
code
=
0
,
};
tmsgSendRsp
(
&
rsp
);
char
buf1
[
80
]
=
{
0
};
char
buf2
[
80
]
=
{
0
};
tFormatOffset
(
buf1
,
80
,
&
pRsp
->
reqOffset
);
tFormatOffset
(
buf2
,
80
,
&
pRsp
->
rspOffset
);
tqDebug
(
"taosx rsp, vgId:%d, from consumer:%"
PRId64
", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s"
,
TD_VID
(
pTq
->
pVnode
),
pReq
->
consumerId
,
pReq
->
epoch
,
pRsp
->
blockNum
,
buf1
,
buf2
);
return
0
;
}
static
FORCE_INLINE
bool
tqOffsetLessOrEqual
(
const
STqOffset
*
pLeft
,
const
STqOffset
*
pRight
)
{
return
pLeft
->
val
.
type
==
TMQ_OFFSET__LOG
&&
pRight
->
val
.
type
==
TMQ_OFFSET__LOG
&&
pLeft
->
val
.
version
<=
pRight
->
val
.
version
;
...
...
@@ -303,6 +363,22 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
return
0
;
}
static
int32_t
tqInitTaosxRsp
(
STaosxRsp
*
pRsp
,
const
SMqPollReq
*
pReq
)
{
pRsp
->
reqOffset
=
pReq
->
reqOffset
;
pRsp
->
withTbName
=
1
;
pRsp
->
withSchema
=
1
;
pRsp
->
blockData
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
pRsp
->
blockDataLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
pRsp
->
blockTbName
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
pRsp
->
blockSchema
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
if
(
pRsp
->
blockData
==
NULL
||
pRsp
->
blockDataLen
==
NULL
||
pRsp
->
blockTbName
==
NULL
||
pRsp
->
blockSchema
==
NULL
)
{
return
-
1
;
}
return
0
;
}
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
SMqPollReq
*
pReq
=
pMsg
->
pCont
;
int64_t
consumerId
=
pReq
->
consumerId
;
...
...
@@ -341,9 +417,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug
(
"tmq poll: consumer %"
PRId64
" (epoch %d), subkey %s, recv poll req in vg %d, req offset %s"
,
consumerId
,
pReq
->
epoch
,
pHandle
->
subKey
,
TD_VID
(
pTq
->
pVnode
),
buf
);
SMqDataRsp
dataRsp
=
{
0
};
tqInitDataRsp
(
&
dataRsp
,
pReq
,
pHandle
->
execHandle
.
subType
);
// 2.reset offset if needed
if
(
reqOffset
.
type
>
0
)
{
fetchOffsetNew
=
reqOffset
;
...
...
@@ -367,6 +440,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqOffsetResetToLog
(
&
fetchOffsetNew
,
walGetFirstVer
(
pTq
->
pVnode
->
pWal
));
}
}
else
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_LATEST
)
{
SMqDataRsp
dataRsp
=
{
0
};
tqInitDataRsp
(
&
dataRsp
,
pReq
,
pHandle
->
execHandle
.
subType
);
tqOffsetResetToLog
(
&
dataRsp
.
rspOffset
,
walGetLastVer
(
pTq
->
pVnode
->
pWal
));
tqDebug
(
"tmq poll: consumer %"
PRId64
", subkey %s, vg %d, offset reset to %"
PRId64
,
consumerId
,
pHandle
->
subKey
,
TD_VID
(
pTq
->
pVnode
),
dataRsp
.
rspOffset
.
version
);
...
...
@@ -380,16 +456,38 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
" in vg %d, subkey %s, reset none failed"
,
pHandle
->
subKey
,
consumerId
,
TD_VID
(
pTq
->
pVnode
),
pReq
->
subKey
);
terrno
=
TSDB_CODE_TQ_NO_COMMITTED_OFFSET
;
code
=
-
1
;
tDeleteSMqDataRsp
(
&
dataRsp
);
return
code
;
return
-
1
;
}
}
}
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
||
fetchOffsetNew
.
type
!=
TMQ_OFFSET__LOG
)
{
SMqMetaRsp
metaRsp
=
{
0
};
tqScan
(
pTq
,
pHandle
,
&
dataRsp
,
&
metaRsp
,
&
fetchOffsetNew
);
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
SMqDataRsp
dataRsp
=
{
0
};
tqInitDataRsp
(
&
dataRsp
,
pReq
,
pHandle
->
execHandle
.
subType
);
tqScanData
(
pTq
,
pHandle
,
&
dataRsp
,
&
fetchOffsetNew
);
if
(
tqSendDataRsp
(
pTq
,
pMsg
,
pReq
,
&
dataRsp
)
<
0
)
{
code
=
-
1
;
}
tqDebug
(
"tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid:%ld, version:%ld"
,
consumerId
,
pHandle
->
subKey
,
TD_VID
(
pTq
->
pVnode
),
dataRsp
.
blockNum
,
dataRsp
.
rspOffset
.
type
,
dataRsp
.
rspOffset
.
uid
,
dataRsp
.
rspOffset
.
version
);
tDeleteSMqDataRsp
(
&
dataRsp
);
return
code
;
}
// for taosx
ASSERT
(
pHandle
->
execHandle
.
subType
!=
TOPIC_SUB_TYPE__COLUMN
);
SMqMetaRsp
metaRsp
=
{
0
};
STaosxRsp
taosxRsp
=
{
0
};
tqInitTaosxRsp
(
&
taosxRsp
,
pReq
);
if
(
fetchOffsetNew
.
type
!=
TMQ_OFFSET__LOG
)
{
tqScan
(
pTq
,
pHandle
,
&
taosxRsp
,
&
metaRsp
,
&
fetchOffsetNew
);
if
(
metaRsp
.
metaRspLen
>
0
)
{
if
(
tqSendMetaPollRsp
(
pTq
,
pMsg
,
pReq
,
&
metaRsp
)
<
0
)
{
...
...
@@ -399,29 +497,30 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
pHandle
->
subKey
,
TD_VID
(
pTq
->
pVnode
),
metaRsp
.
rspOffset
.
type
,
metaRsp
.
rspOffset
.
uid
,
metaRsp
.
rspOffset
.
version
);
taosMemoryFree
(
metaRsp
.
metaRsp
);
goto
OVER
;
tDeleteSTaosxRsp
(
&
taosxRsp
);
return
code
;
}
if
(
data
Rsp
.
blockNum
>
0
)
{
if
(
tqSend
DataRsp
(
pTq
,
pMsg
,
pReq
,
&
data
Rsp
)
<
0
)
{
if
(
taosx
Rsp
.
blockNum
>
0
)
{
if
(
tqSend
TaosxRsp
(
pTq
,
pMsg
,
pReq
,
&
taosx
Rsp
)
<
0
)
{
code
=
-
1
;
}
goto
OVER
;
tDeleteSTaosxRsp
(
&
taosxRsp
);
return
code
;
}
else
{
fetchOffsetNew
=
data
Rsp
.
rspOffset
;
fetchOffsetNew
=
taosx
Rsp
.
rspOffset
;
}
tqDebug
(
"t
mq
poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld"
,
consumerId
,
pHandle
->
subKey
,
TD_VID
(
pTq
->
pVnode
),
dataRsp
.
blockNum
,
data
Rsp
.
rspOffset
.
type
,
dataRsp
.
rspOffset
.
uid
,
data
Rsp
.
rspOffset
.
version
);
tqDebug
(
"t
aosx
poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld"
,
consumerId
,
pHandle
->
subKey
,
TD_VID
(
pTq
->
pVnode
),
taosxRsp
.
blockNum
,
taosx
Rsp
.
rspOffset
.
type
,
taosxRsp
.
rspOffset
.
uid
,
taosx
Rsp
.
rspOffset
.
version
);
}
if
(
pHandle
->
execHandle
.
subType
!=
TOPIC_SUB_TYPE__COLUMN
&&
fetchOffsetNew
.
type
==
TMQ_OFFSET__LOG
)
{
if
(
fetchOffsetNew
.
type
==
TMQ_OFFSET__LOG
)
{
int64_t
fetchVer
=
fetchOffsetNew
.
version
+
1
;
pCkHead
=
taosMemoryMalloc
(
sizeof
(
SWalCkHead
)
+
2048
);
if
(
pCkHead
==
NULL
)
{
code
=
-
1
;
goto
OVER
;
return
-
1
;
}
walSetReaderCapacity
(
pHandle
->
pWalReader
,
2048
);
...
...
@@ -436,13 +535,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
}
if
(
tqFetchLog
(
pTq
,
pHandle
,
&
fetchVer
,
&
pCkHead
)
<
0
)
{
// TODO add push mgr
tqOffsetResetToLog
(
&
dataRsp
.
rspOffset
,
fetchVer
);
if
(
tqSendDataRsp
(
pTq
,
pMsg
,
pReq
,
&
dataRsp
)
<
0
)
{
tqOffsetResetToLog
(
&
taosxRsp
.
rspOffset
,
fetchVer
);
if
(
tqSendTaosxRsp
(
pTq
,
pMsg
,
pReq
,
&
taosxRsp
)
<
0
)
{
code
=
-
1
;
}
goto
OVER
;
tDeleteSTaosxRsp
(
&
taosxRsp
);
if
(
pCkHead
)
taosMemoryFree
(
pCkHead
);
return
code
;
}
SWalCont
*
pHead
=
&
pCkHead
->
head
;
...
...
@@ -453,17 +552,19 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if
(
pHead
->
msgType
==
TDMT_VND_SUBMIT
)
{
SSubmitReq
*
pCont
=
(
SSubmitReq
*
)
&
pHead
->
body
;
if
(
tq
LogScanExec
(
pTq
,
pHandle
,
pCont
,
&
data
Rsp
)
<
0
)
{
if
(
tq
TaosxScanLog
(
pTq
,
pHandle
,
pCont
,
&
taosx
Rsp
)
<
0
)
{
/*ASSERT(0);*/
}
// TODO batch optimization:
// TODO continue scan until meeting batch requirement
if
(
data
Rsp
.
blockNum
>
0
/* threshold */
)
{
tqOffsetResetToLog
(
&
data
Rsp
.
rspOffset
,
fetchVer
);
if
(
tqSend
DataRsp
(
pTq
,
pMsg
,
pReq
,
&
data
Rsp
)
<
0
)
{
if
(
taosx
Rsp
.
blockNum
>
0
/* threshold */
)
{
tqOffsetResetToLog
(
&
taosx
Rsp
.
rspOffset
,
fetchVer
);
if
(
tqSend
TaosxRsp
(
pTq
,
pMsg
,
pReq
,
&
taosx
Rsp
)
<
0
)
{
code
=
-
1
;
}
goto
OVER
;
tDeleteSTaosxRsp
(
&
taosxRsp
);
if
(
pCkHead
)
taosMemoryFree
(
pCkHead
);
return
code
;
}
else
{
fetchVer
++
;
}
...
...
@@ -472,30 +573,22 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
ASSERT
(
pHandle
->
fetchMeta
);
ASSERT
(
IS_META_MSG
(
pHead
->
msgType
));
tqDebug
(
"fetch meta msg, ver:%"
PRId64
", type:%d"
,
pHead
->
version
,
pHead
->
msgType
);
SMqMetaRsp
metaRsp
=
{
0
};
tqOffsetResetToLog
(
&
metaRsp
.
rspOffset
,
fetchVer
);
metaRsp
.
resMsgType
=
pHead
->
msgType
;
metaRsp
.
metaRspLen
=
pHead
->
bodyLen
;
metaRsp
.
metaRsp
=
pHead
->
body
;
if
(
tqSendMetaPollRsp
(
pTq
,
pMsg
,
pReq
,
&
metaRsp
)
<
0
)
{
code
=
-
1
;
goto
OVER
;
taosMemoryFree
(
pCkHead
);
return
code
;
}
code
=
0
;
goto
OVER
;
if
(
pCkHead
)
taosMemoryFree
(
pCkHead
);
return
code
;
}
}
}
// send empty to client
if
(
tqSendDataRsp
(
pTq
,
pMsg
,
pReq
,
&
dataRsp
)
<
0
)
{
code
=
-
1
;
}
OVER:
if
(
pCkHead
)
taosMemoryFree
(
pCkHead
);
tDeleteSMqDataRsp
(
&
dataRsp
);
return
code
;
return
0
;
}
int32_t
tqProcessVgDeleteReq
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
)
{
...
...
@@ -602,10 +695,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
pHandle
->
execHandle
.
pExecReader
=
tqOpenReader
(
pTq
->
pVnode
);
pHandle
->
execHandle
.
execDb
.
pFilterOutTbUid
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
buildSnapContext
(
handle
.
meta
,
handle
.
version
,
0
,
pHandle
->
execHandle
.
subType
,
pHandle
->
fetchMeta
,
(
SSnapContext
**
)(
&
handle
.
sContext
));
buildSnapContext
(
handle
.
meta
,
handle
.
version
,
0
,
pHandle
->
execHandle
.
subType
,
pHandle
->
fetchMeta
,
(
SSnapContext
**
)(
&
handle
.
sContext
));
pHandle
->
execHandle
.
task
=
qCreateQueueExecTaskInfo
(
NULL
,
&
handle
,
NULL
,
NULL
);
pHandle
->
execHandle
.
task
=
qCreateQueueExecTaskInfo
(
NULL
,
&
handle
,
NULL
,
NULL
);
}
else
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
pHandle
->
pWalReader
=
walOpenReader
(
pTq
->
pVnode
->
pWal
,
NULL
);
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
900f5cf5
...
...
@@ -60,7 +60,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
return
0
;
}
int
64
_t
tqScanData
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
pOffset
)
{
int
32
_t
tqScanData
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
pOffset
)
{
const
STqExecHandle
*
pExec
=
&
pHandle
->
execHandle
;
ASSERT
(
pExec
->
subType
==
TOPIC_SUB_TYPE__COLUMN
);
...
...
@@ -89,18 +89,41 @@ int64_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
if
(
qExecTask
(
task
,
&
pDataBlock
,
&
ts
)
<
0
)
{
ASSERT
(
0
);
}
tqDebug
(
"tmq task execute
en
d, get %p"
,
pDataBlock
);
tqDebug
(
"tmq task executed, get %p"
,
pDataBlock
);
if
(
pDataBlock
)
{
tqAddBlockDataToRsp
(
pDataBlock
,
pRsp
,
pExec
->
numOfCols
);
pRsp
->
blockNum
++
;
if
(
pDataBlock
==
NULL
)
{
break
;
}
tqAddBlockDataToRsp
(
pDataBlock
,
pRsp
,
pExec
->
numOfCols
);
pRsp
->
blockNum
++
;
if
(
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
rowCnt
+=
pDataBlock
->
info
.
rows
;
if
(
rowCnt
>=
4096
)
break
;
}
}
if
(
qStreamExtractOffset
(
task
,
&
pRsp
->
rspOffset
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
ASSERT
(
pRsp
->
rspOffset
.
type
!=
0
);
if
(
pRsp
->
withTbName
)
{
if
(
pRsp
->
rspOffset
.
type
==
TMQ_OFFSET__LOG
)
{
int64_t
uid
=
pExec
->
pExecReader
->
msgIter
.
uid
;
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
);
}
else
{
pRsp
->
withTbName
=
false
;
}
}
ASSERT
(
pRsp
->
withSchema
==
false
);
return
0
;
}
int
64_t
tqScan
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
SMqData
Rsp
*
pRsp
,
SMqMetaRsp
*
pMetaRsp
,
STqOffsetVal
*
pOffset
)
{
int
32_t
tqScan
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
STaosx
Rsp
*
pRsp
,
SMqMetaRsp
*
pMetaRsp
,
STqOffsetVal
*
pOffset
)
{
const
STqExecHandle
*
pExec
=
&
pHandle
->
execHandle
;
qTaskInfo_t
task
=
pExec
->
task
;
...
...
@@ -134,7 +157,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
int64_t
uid
=
0
;
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
uid
=
pExec
->
pExecReader
->
msgIter
.
uid
;
if
(
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
)
<
0
)
{
if
(
tqAddTbNameToRsp
(
pTq
,
uid
,
(
SMqDataRsp
*
)
pRsp
)
<
0
)
{
continue
;
}
}
else
{
...
...
@@ -144,18 +167,14 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
}
if
(
pRsp
->
withSchema
)
{
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
tqAddBlockSchemaToRsp
(
pExec
,
pRsp
);
tqAddBlockSchemaToRsp
(
pExec
,
(
SMqDataRsp
*
)
pRsp
);
}
else
{
SSchemaWrapper
*
pSW
=
tCloneSSchemaWrapper
(
qExtractSchemaFromTask
(
task
));
taosArrayPush
(
pRsp
->
blockSchema
,
&
pSW
);
}
}
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
tqAddBlockDataToRsp
(
pDataBlock
,
pRsp
,
pExec
->
numOfCols
);
}
else
{
tqAddBlockDataToRsp
(
pDataBlock
,
pRsp
,
taosArrayGetSize
(
pDataBlock
->
pDataBlock
));
}
tqAddBlockDataToRsp
(
pDataBlock
,
(
SMqDataRsp
*
)
pRsp
,
taosArrayGetSize
(
pDataBlock
->
pDataBlock
));
pRsp
->
blockNum
++
;
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
continue
;
...
...
@@ -165,34 +184,32 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
}
}
if
(
pHandle
->
execHandle
.
subType
!=
TOPIC_SUB_TYPE__COLUMN
)
{
if
(
pDataBlock
==
NULL
&&
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
if
(
qStreamExtractPrepareUid
(
task
)
!=
0
)
{
continue
;
}
tqDebug
(
"tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %"
PRId64
,
TD_VID
(
pTq
->
pVnode
),
pHandle
->
snapshotVer
+
1
);
break
;
}
if
(
pRsp
->
blockNum
>
0
)
{
tqDebug
(
"tmqsnap task exec exited, get data"
);
break
;
}
SMqMetaRsp
*
tmp
=
qStreamExtractMetaMsg
(
task
);
if
(
tmp
->
rspOffset
.
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
tqOffsetResetToData
(
pOffset
,
tmp
->
rspOffset
.
uid
,
tmp
->
rspOffset
.
ts
);
qStreamPrepareScan
(
task
,
pOffset
,
pHandle
->
execHandle
.
subType
);
tmp
->
rspOffset
.
type
=
TMQ_OFFSET__SNAPSHOT_META
;
tqDebug
(
"tmqsnap task exec change to get data"
);
if
(
pDataBlock
==
NULL
&&
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
if
(
qStreamExtractPrepareUid
(
task
)
!=
0
)
{
continue
;
}
tqDebug
(
"tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %"
PRId64
,
TD_VID
(
pTq
->
pVnode
),
pHandle
->
snapshotVer
+
1
);
break
;
}
*
pMetaRsp
=
*
tmp
;
tqDebug
(
"tmqsnap task exec exited, get meta"
);
if
(
pRsp
->
blockNum
>
0
)
{
tqDebug
(
"tmqsnap task exec exited, get data"
);
break
;
}
SMqMetaRsp
*
tmp
=
qStreamExtractMetaMsg
(
task
);
if
(
tmp
->
rspOffset
.
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
tqOffsetResetToData
(
pOffset
,
tmp
->
rspOffset
.
uid
,
tmp
->
rspOffset
.
ts
);
qStreamPrepareScan
(
task
,
pOffset
,
pHandle
->
execHandle
.
subType
);
tmp
->
rspOffset
.
type
=
TMQ_OFFSET__SNAPSHOT_META
;
tqDebug
(
"tmqsnap task exec change to get data"
);
continue
;
}
*
pMetaRsp
=
*
tmp
;
tqDebug
(
"tmqsnap task exec exited, get meta"
);
tqDebug
(
"task exec exited"
);
break
;
}
...
...
@@ -205,12 +222,11 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
return
0
;
}
int32_t
tq
LogScanExec
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
SSubmitReq
*
pReq
,
SMqData
Rsp
*
pRsp
)
{
int32_t
tq
TaosxScanLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
SSubmitReq
*
pReq
,
STaosx
Rsp
*
pRsp
)
{
STqExecHandle
*
pExec
=
&
pHandle
->
execHandle
;
ASSERT
(
pExec
->
subType
!=
TOPIC_SUB_TYPE__COLUMN
);
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
pRsp
->
withSchema
=
1
;
STqReader
*
pReader
=
pExec
->
pExecReader
;
tqReaderSetDataMsg
(
pReader
,
pReq
,
0
);
while
(
tqNextDataBlock
(
pReader
))
{
...
...
@@ -220,18 +236,17 @@ int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp
}
if
(
pRsp
->
withTbName
)
{
int64_t
uid
=
pExec
->
pExecReader
->
msgIter
.
uid
;
if
(
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
)
<
0
)
{
if
(
tqAddTbNameToRsp
(
pTq
,
uid
,
(
SMqDataRsp
*
)
pRsp
)
<
0
)
{
blockDataFreeRes
(
&
block
);
continue
;
}
}
tqAddBlockDataToRsp
(
&
block
,
pRsp
,
taosArrayGetSize
(
block
.
pDataBlock
));
tqAddBlockDataToRsp
(
&
block
,
(
SMqDataRsp
*
)
pRsp
,
taosArrayGetSize
(
block
.
pDataBlock
));
blockDataFreeRes
(
&
block
);
tqAddBlockSchemaToRsp
(
pExec
,
pRsp
);
tqAddBlockSchemaToRsp
(
pExec
,
(
SMqDataRsp
*
)
pRsp
);
pRsp
->
blockNum
++
;
}
}
else
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__DB
)
{
pRsp
->
withSchema
=
1
;
STqReader
*
pReader
=
pExec
->
pExecReader
;
tqReaderSetDataMsg
(
pReader
,
pReq
,
0
);
while
(
tqNextDataBlockFilterOut
(
pReader
,
pExec
->
execDb
.
pFilterOutTbUid
))
{
...
...
@@ -241,24 +256,25 @@ int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp
}
if
(
pRsp
->
withTbName
)
{
int64_t
uid
=
pExec
->
pExecReader
->
msgIter
.
uid
;
if
(
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
)
<
0
)
{
if
(
tqAddTbNameToRsp
(
pTq
,
uid
,
(
SMqDataRsp
*
)
pRsp
)
<
0
)
{
blockDataFreeRes
(
&
block
);
continue
;
}
}
tqAddBlockDataToRsp
(
&
block
,
pRsp
,
taosArrayGetSize
(
block
.
pDataBlock
));
tqAddBlockDataToRsp
(
&
block
,
(
SMqDataRsp
*
)
pRsp
,
taosArrayGetSize
(
block
.
pDataBlock
));
blockDataFreeRes
(
&
block
);
tqAddBlockSchemaToRsp
(
pExec
,
pRsp
);
tqAddBlockSchemaToRsp
(
pExec
,
(
SMqDataRsp
*
)
pRsp
);
pRsp
->
blockNum
++
;
}
#if
0
#if
1
if
(
pHandle
->
fetchMeta
&&
pRsp
->
blockNum
)
{
SSubmitMsgIter
iter
=
{
0
};
tInitSubmitMsgIter
(
pReq
,
&
iter
);
STaosxRsp
*
pXrsp
=
(
STaosxRsp
*
)
pRsp
;
while
(
1
)
{
SSubmitBlk
*
pBlk
=
NULL
;
if (tGetSubmitMsgNext(&iter, &pBlk) < 0) return -1;
if
(
tGetSubmitMsgNext
(
&
iter
,
&
pBlk
)
<
0
)
break
;
if
(
pBlk
==
NULL
)
break
;
if
(
pBlk
->
schemaLen
>
0
)
{
if
(
pXrsp
->
createTableNum
==
0
)
{
pXrsp
->
createTableLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
900f5cf5
...
...
@@ -143,6 +143,7 @@ typedef struct {
STqOffsetVal
prepareStatus
;
// for tmq
STqOffsetVal
lastStatus
;
// for tmq
SMqMetaRsp
metaRsp
;
// for tmq fetching meta
int8_t
returned
;
int64_t
snapshotVer
;
SSchemaWrapper
*
schema
;
...
...
source/libs/executor/src/executor.c
浏览文件 @
900f5cf5
...
...
@@ -745,6 +745,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SOperatorInfo
*
pOperator
=
pTaskInfo
->
pRoot
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_QUEUE
);
pTaskInfo
->
streamInfo
.
prepareStatus
=
*
pOffset
;
pTaskInfo
->
streamInfo
.
returned
=
0
;
if
(
tOffsetEqual
(
pOffset
,
&
pTaskInfo
->
streamInfo
.
lastStatus
))
{
return
0
;
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
900f5cf5
...
...
@@ -1285,17 +1285,22 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SSDataBlock
*
pResult
=
doTableScan
(
pInfo
->
pTableScanOp
);
if
(
pResult
&&
pResult
->
info
.
rows
>
0
)
{
qDebug
(
"queue scan tsdb return %d rows"
,
pResult
->
info
.
rows
);
pTaskInfo
->
streamInfo
.
returned
=
1
;
return
pResult
;
}
else
{
STableScanInfo
*
pTSInfo
=
pInfo
->
pTableScanOp
->
info
;
tsdbReaderClose
(
pTSInfo
->
dataReader
);
pTSInfo
->
dataReader
=
NULL
;
tqOffsetResetToLog
(
&
pTaskInfo
->
streamInfo
.
prepareStatus
,
pTaskInfo
->
streamInfo
.
snapshotVer
);
qDebug
(
"queue scan tsdb over, switch to wal ver %d"
,
pTaskInfo
->
streamInfo
.
snapshotVer
+
1
);
if
(
tqSeekVer
(
pInfo
->
tqReader
,
pTaskInfo
->
streamInfo
.
snapshotVer
+
1
)
<
0
)
{
if
(
!
pTaskInfo
->
streamInfo
.
returned
)
{
STableScanInfo
*
pTSInfo
=
pInfo
->
pTableScanOp
->
info
;
tsdbReaderClose
(
pTSInfo
->
dataReader
);
pTSInfo
->
dataReader
=
NULL
;
tqOffsetResetToLog
(
&
pTaskInfo
->
streamInfo
.
prepareStatus
,
pTaskInfo
->
streamInfo
.
snapshotVer
);
qDebug
(
"queue scan tsdb over, switch to wal ver %d"
,
pTaskInfo
->
streamInfo
.
snapshotVer
+
1
);
if
(
tqSeekVer
(
pInfo
->
tqReader
,
pTaskInfo
->
streamInfo
.
snapshotVer
+
1
)
<
0
)
{
return
NULL
;
}
ASSERT
(
pInfo
->
tqReader
->
pWalReader
->
curVersion
==
pTaskInfo
->
streamInfo
.
snapshotVer
+
1
);
}
else
{
return
NULL
;
}
ASSERT
(
pInfo
->
tqReader
->
pWalReader
->
curVersion
==
pTaskInfo
->
streamInfo
.
snapshotVer
+
1
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录