Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7d1a2abe
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1185
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
7d1a2abe
编写于
9月 29, 2022
作者:
L
Liu Jicong
提交者:
GitHub
9月 29, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17109 from taosdata/feature/tq
feat(tmq): push optimization
上级
6e79f48f
752e13f9
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
372 addition
and
73 deletion
+372
-73
include/common/tcommon.h
include/common/tcommon.h
+1
-0
include/libs/executor/executor.h
include/libs/executor/executor.h
+8
-5
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+14
-10
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+1
-0
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+2
-0
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+1
-1
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+15
-3
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+110
-5
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+3
-3
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+91
-0
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+38
-34
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+7
-2
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-0
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+9
-1
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+24
-2
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+43
-4
tests/docs-examples-test/node.sh
tests/docs-examples-test/node.sh
+2
-2
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-1
未找到文件。
include/common/tcommon.h
浏览文件 @
7d1a2abe
...
...
@@ -177,6 +177,7 @@ typedef struct SSDataBlock {
enum
{
FETCH_TYPE__DATA
=
1
,
FETCH_TYPE__META
,
FETCH_TYPE__SEP
,
FETCH_TYPE__NONE
,
};
...
...
include/libs/executor/executor.h
浏览文件 @
7d1a2abe
...
...
@@ -29,13 +29,13 @@ typedef void* DataSinkHandle;
struct
SRpcMsg
;
struct
SSubplan
;
typedef
int32_t
(
*
localFetchFp
)(
void
*
,
uint64_t
,
uint64_t
,
uint64_t
,
int64_t
,
int32_t
,
void
**
,
SArray
*
);
typedef
int32_t
(
*
localFetchFp
)(
void
*
,
uint64_t
,
uint64_t
,
uint64_t
,
int64_t
,
int32_t
,
void
**
,
SArray
*
);
typedef
struct
{
void
*
handle
;
void
*
handle
;
bool
localExec
;
localFetchFp
fp
;
SArray
*
explainRes
;
SArray
*
explainRes
;
}
SLocalFetch
;
typedef
struct
{
...
...
@@ -51,9 +51,9 @@ typedef struct {
bool
initTqReader
;
int32_t
numOfVgroups
;
void
*
sContext
;
// SSnapContext*
void
*
sContext
;
// SSnapContext*
void
*
pStateBackend
;
void
*
pStateBackend
;
}
SReadHandle
;
// in queue mode, data streams are seperated by msg
...
...
@@ -136,6 +136,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
* @param handle
* @return
*/
int32_t
qExecTaskOpt
(
qTaskInfo_t
tinfo
,
SArray
*
pResList
,
uint64_t
*
useconds
,
bool
*
hasMore
,
SLocalFetch
*
pLocal
);
int32_t
qExecTask
(
qTaskInfo_t
tinfo
,
SSDataBlock
**
pBlock
,
uint64_t
*
useconds
);
...
...
@@ -195,6 +196,8 @@ int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
int32_t
qStreamPrepareScan
(
qTaskInfo_t
tinfo
,
STqOffsetVal
*
pOffset
,
int8_t
subType
);
int32_t
qStreamScanMemData
(
qTaskInfo_t
tinfo
,
const
SSubmitReq
*
pReq
);
int32_t
qStreamExtractOffset
(
qTaskInfo_t
tinfo
,
STqOffsetVal
*
pOffset
);
SMqMetaRsp
*
qStreamExtractMetaMsg
(
qTaskInfo_t
tinfo
);
...
...
source/client/src/clientTmq.c
浏览文件 @
7d1a2abe
...
...
@@ -515,7 +515,7 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
SMqMetaRspObj
*
pMetaRspObj
=
(
SMqMetaRspObj
*
)
msg
;
topic
=
pMetaRspObj
->
topic
;
vgId
=
pMetaRspObj
->
vgId
;
}
else
if
(
TD_RES_TMQ_METADATA
(
msg
))
{
}
else
if
(
TD_RES_TMQ_METADATA
(
msg
))
{
SMqTaosxRspObj
*
pRspObj
=
(
SMqTaosxRspObj
*
)
msg
;
topic
=
pRspObj
->
topic
;
vgId
=
pRspObj
->
vgId
;
...
...
@@ -715,7 +715,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
int32_t
epoch
=
tmq
->
epoch
;
SMqHbReq
*
pReq
=
taosMemoryMalloc
(
sizeof
(
SMqHbReq
));
if
(
pReq
==
NULL
)
goto
OVER
;
pReq
->
consumerId
=
consumerId
;
pReq
->
consumerId
=
htobe64
(
consumerId
)
;
pReq
->
epoch
=
epoch
;
SMsgSendInfo
*
sendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
...
...
@@ -1603,6 +1603,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return
NULL
;
}
else
if
(
rspWrapper
->
tmqRspType
==
TMQ_MSG_TYPE__POLL_RSP
)
{
SMqPollRspWrapper
*
pollRspWrapper
=
(
SMqPollRspWrapper
*
)
rspWrapper
;
tscDebug
(
"consumer %ld actual process poll rsp"
,
tmq
->
consumerId
);
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t
consumerEpoch
=
atomic_load_32
(
&
tmq
->
epoch
);
if
(
pollRspWrapper
->
dataRsp
.
head
.
epoch
==
consumerEpoch
)
{
...
...
@@ -1661,9 +1662,9 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
// build rsp
void
*
pRsp
=
NULL
;
if
(
pollRspWrapper
->
taosxRsp
.
createTableNum
==
0
)
{
if
(
pollRspWrapper
->
taosxRsp
.
createTableNum
==
0
)
{
pRsp
=
tmqBuildRspFromWrapper
(
pollRspWrapper
);
}
else
{
}
else
{
pRsp
=
tmqBuildTaosxRspFromWrapper
(
pollRspWrapper
);
}
taosFreeQitem
(
pollRspWrapper
);
...
...
@@ -1718,7 +1719,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
while
(
1
)
{
tmqHandleAllDelayedTask
(
tmq
);
if
(
tmqPollImpl
(
tmq
,
timeout
)
<
0
)
return
NULL
;
if
(
tmqPollImpl
(
tmq
,
timeout
)
<
0
)
{
tscDebug
(
"return since poll err"
);
/*return NULL;*/
}
rspObj
=
tmqHandleAllRsp
(
tmq
,
timeout
,
false
);
if
(
rspObj
)
{
...
...
@@ -1850,12 +1854,12 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return
(
const
char
*
)
taosArrayGetP
(
pRspObj
->
rsp
.
blockTbName
,
pRspObj
->
resIter
);
}
else
if
(
TD_RES_TMQ_METADATA
(
res
))
{
SMqTaosxRspObj
*
pRspObj
=
(
SMqTaosxRspObj
*
)
res
;
if
(
!
pRspObj
->
rsp
.
withTbName
||
pRspObj
->
rsp
.
blockTbName
==
NULL
||
pRspObj
->
resIter
<
0
||
pRspObj
->
resIter
>=
pRspObj
->
rsp
.
blockNum
)
{
return
NULL
;
}
return
(
const
char
*
)
taosArrayGetP
(
pRspObj
->
rsp
.
blockTbName
,
pRspObj
->
resIter
);
if
(
!
pRspObj
->
rsp
.
withTbName
||
pRspObj
->
rsp
.
blockTbName
==
NULL
||
pRspObj
->
resIter
<
0
||
pRspObj
->
resIter
>=
pRspObj
->
rsp
.
blockNum
)
{
return
NULL
;
}
return
(
const
char
*
)
taosArrayGetP
(
pRspObj
->
rsp
.
blockTbName
,
pRspObj
->
resIter
);
}
return
NULL
;
}
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
7d1a2abe
...
...
@@ -272,6 +272,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMnode
,
consumerId
);
if
(
pConsumer
==
NULL
)
{
mError
(
"consumer %ld not exist"
,
consumerId
);
terrno
=
TSDB_CODE_MND_CONSUMER_NOT_EXIST
;
return
-
1
;
}
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
7d1a2abe
...
...
@@ -379,6 +379,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj
.
ast
=
strdup
(
pCreate
->
ast
);
topicObj
.
astLen
=
strlen
(
pCreate
->
ast
)
+
1
;
qDebugL
(
"ast %s"
,
topicObj
.
ast
);
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
pCreate
->
ast
,
&
pAst
)
!=
0
)
{
taosMemoryFree
(
topicObj
.
ast
);
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
7d1a2abe
...
...
@@ -217,7 +217,7 @@ int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t
tqSeekVer
(
STqReader
*
pReader
,
int64_t
ver
);
int32_t
tqNextBlock
(
STqReader
*
pReader
,
SFetchRet
*
ret
);
int32_t
tqReaderSetDataMsg
(
STqReader
*
pReader
,
SSubmitReq
*
pMsg
,
int64_t
ver
);
int32_t
tqReaderSetDataMsg
(
STqReader
*
pReader
,
const
SSubmitReq
*
pMsg
,
int64_t
ver
);
bool
tqNextDataBlock
(
STqReader
*
pReader
);
bool
tqNextDataBlockFilterOut
(
STqReader
*
pReader
,
SHashObj
*
filterOutUids
);
int32_t
tqRetrieveDataBlock
(
SSDataBlock
*
pBlock
,
STqReader
*
pReader
);
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
7d1a2abe
...
...
@@ -113,10 +113,20 @@ typedef struct {
}
STqHandle
;
typedef
struct
{
SMqDataRsp
dataRsp
;
SMqRspHead
rspHead
;
char
subKey
[
TSDB_SUBSCRIBE_KEY_LEN
];
SRpcHandleInfo
pInfo
;
}
STqPushEntry
;
struct
STQ
{
SVnode
*
pVnode
;
char
*
path
;
SHashObj
*
pPushMgr
;
// consumerId -> STqHandle*
SVnode
*
pVnode
;
char
*
path
;
SRWLatch
pushLock
;
SHashObj
*
pPushMgr
;
// consumerId -> STqPushEntry
SHashObj
*
pHandle
;
// subKey -> STqHandle
SHashObj
*
pCheckInfo
;
// topic -> SAlterCheckInfo
...
...
@@ -146,7 +156,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
// tqExec
int32_t
tqTaosxScanLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
SSubmitReq
*
pReq
,
STaosxRsp
*
pRsp
);
int32_t
tqAddBlockDataToRsp
(
const
SSDataBlock
*
pBlock
,
SMqDataRsp
*
pRsp
,
int32_t
numOfCols
);
int32_t
tqSendDataRsp
(
STQ
*
pTq
,
const
SRpcMsg
*
pMsg
,
const
SMqPollReq
*
pReq
,
const
SMqDataRsp
*
pRsp
);
int32_t
tqPushDataRsp
(
STQ
*
pTq
,
STqPushEntry
*
pPushEntry
);
// tqMeta
int32_t
tqMetaOpen
(
STQ
*
pTq
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
7d1a2abe
...
...
@@ -65,6 +65,11 @@ static void destroySTqHandle(void* data) {
}
}
static
void
tqPushEntryFree
(
void
*
data
)
{
STqPushEntry
*
p
=
*
(
void
**
)
data
;
taosMemoryFree
(
p
);
}
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
)
{
STQ
*
pTq
=
taosMemoryCalloc
(
1
,
sizeof
(
STQ
));
if
(
pTq
==
NULL
)
{
...
...
@@ -78,7 +83,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
taosHashSetFreeFp
(
pTq
->
pHandle
,
destroySTqHandle
);
pTq
->
pPushMgr
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_ENTRY_LOCK
);
taosInitRWLatch
(
&
pTq
->
pushLock
);
pTq
->
pPushMgr
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_NO_LOCK
);
taosHashSetFreeFp
(
pTq
->
pPushMgr
,
tqPushEntryFree
);
pTq
->
pCheckInfo
=
taosHashInit
(
64
,
MurmurHash3_32
,
true
,
HASH_ENTRY_LOCK
);
...
...
@@ -153,6 +160,65 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
return
0
;
}
int32_t
tqPushDataRsp
(
STQ
*
pTq
,
STqPushEntry
*
pPushEntry
)
{
SMqDataRsp
*
pRsp
=
&
pPushEntry
->
dataRsp
;
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockData
)
==
pRsp
->
blockNum
);
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockDataLen
)
==
pRsp
->
blockNum
);
ASSERT
(
!
pRsp
->
withSchema
);
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockSchema
)
==
0
);
if
(
pRsp
->
reqOffset
.
type
==
TMQ_OFFSET__LOG
)
{
if
(
pRsp
->
blockNum
>
0
)
{
ASSERT
(
pRsp
->
rspOffset
.
version
>
pRsp
->
reqOffset
.
version
);
}
else
{
ASSERT
(
pRsp
->
rspOffset
.
version
>=
pRsp
->
reqOffset
.
version
);
}
}
int32_t
len
=
0
;
int32_t
code
=
0
;
tEncodeSize
(
tEncodeSMqDataRsp
,
pRsp
,
len
,
code
);
if
(
code
<
0
)
{
return
-
1
;
}
int32_t
tlen
=
sizeof
(
SMqRspHead
)
+
len
;
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
return
-
1
;
}
memcpy
(
buf
,
&
pPushEntry
->
rspHead
,
sizeof
(
SMqRspHead
));
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMqRspHead
));
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
abuf
,
len
);
tEncodeSMqDataRsp
(
&
encoder
,
pRsp
);
tEncoderClear
(
&
encoder
);
SRpcMsg
rsp
=
{
.
info
=
pPushEntry
->
pInfo
,
.
pCont
=
buf
,
.
contLen
=
tlen
,
.
code
=
0
,
};
tmsgSendRsp
(
&
rsp
);
char
buf1
[
80
]
=
{
0
};
char
buf2
[
80
]
=
{
0
};
tFormatOffset
(
buf1
,
80
,
&
pRsp
->
reqOffset
);
tFormatOffset
(
buf2
,
80
,
&
pRsp
->
rspOffset
);
tqDebug
(
"vgId:%d, from consumer:%"
PRId64
", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s"
,
TD_VID
(
pTq
->
pVnode
),
pPushEntry
->
rspHead
.
consumerId
,
pRsp
->
head
.
epoch
,
pRsp
->
blockNum
,
buf1
,
buf2
);
return
0
;
}
int32_t
tqSendDataRsp
(
STQ
*
pTq
,
const
SRpcMsg
*
pMsg
,
const
SMqPollReq
*
pReq
,
const
SMqDataRsp
*
pRsp
)
{
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockData
)
==
pRsp
->
blockNum
);
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockDataLen
)
==
pRsp
->
blockNum
);
...
...
@@ -354,6 +420,8 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
return
-
1
;
}
pRsp
->
withTbName
=
0
;
#if 0
pRsp->withTbName = pReq->withTbName;
if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
...
...
@@ -362,6 +430,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
return -1;
}
}
#endif
if
(
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
pRsp
->
withSchema
=
false
;
...
...
@@ -477,11 +546,33 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
SMqDataRsp
dataRsp
=
{
0
};
tqInitDataRsp
(
&
dataRsp
,
pReq
,
pHandle
->
execHandle
.
subType
);
// lock
taosWLockLatch
(
&
pTq
->
pushLock
);
tqScanData
(
pTq
,
pHandle
,
&
dataRsp
,
&
fetchOffsetNew
);
#if 1
if
(
dataRsp
.
blockNum
==
0
&&
dataRsp
.
reqOffset
.
type
==
TMQ_OFFSET__LOG
&&
dataRsp
.
reqOffset
.
version
==
dataRsp
.
rspOffset
.
version
)
{
STqPushEntry
*
pPushEntry
=
taosMemoryCalloc
(
1
,
sizeof
(
STqPushEntry
));
if
(
pPushEntry
!=
NULL
)
{
pPushEntry
->
pInfo
=
pMsg
->
info
;
memcpy
(
pPushEntry
->
subKey
,
pHandle
->
subKey
,
TSDB_SUBSCRIBE_KEY_LEN
);
dataRsp
.
withTbName
=
0
;
memcpy
(
&
pPushEntry
->
dataRsp
,
&
dataRsp
,
sizeof
(
SMqDataRsp
));
pPushEntry
->
rspHead
.
consumerId
=
consumerId
;
pPushEntry
->
rspHead
.
epoch
=
reqEpoch
;
pPushEntry
->
rspHead
.
mqMsgType
=
TMQ_MSG_TYPE__POLL_RSP
;
taosHashPut
(
pTq
->
pPushMgr
,
pHandle
->
subKey
,
strlen
(
pHandle
->
subKey
)
+
1
,
&
pPushEntry
,
sizeof
(
void
*
));
tqDebug
(
"tmq poll: consumer %ld, subkey %s, vg %d save handle to push mgr"
,
consumerId
,
pHandle
->
subKey
,
TD_VID
(
pTq
->
pVnode
));
// unlock
taosWUnLockLatch
(
&
pTq
->
pushLock
);
return
0
;
}
}
taosWUnLockLatch
(
&
pTq
->
pushLock
);
#endif
if
(
tqSendDataRsp
(
pTq
,
pMsg
,
pReq
,
&
dataRsp
)
<
0
)
{
code
=
-
1
;
}
...
...
@@ -614,10 +705,22 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t
tqProcessVgDeleteReq
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
)
{
SMqVDeleteReq
*
pReq
=
(
SMqVDeleteReq
*
)
msg
;
int32_t
code
=
taosHashRemove
(
pTq
->
pHandle
,
pReq
->
subKey
,
strlen
(
pReq
->
subKey
));
ASSERT
(
code
==
0
);
taosWLockLatch
(
&
pTq
->
pushLock
);
int32_t
code
=
taosHashRemove
(
pTq
->
pPushMgr
,
pReq
->
subKey
,
strlen
(
pReq
->
subKey
));
if
(
code
!=
0
)
{
tqDebug
(
"vgId:%d, tq remove push handle %s"
,
pTq
->
pVnode
->
config
.
vgId
,
pReq
->
subKey
);
}
taosWUnLockLatch
(
&
pTq
->
pushLock
);
code
=
taosHashRemove
(
pTq
->
pHandle
,
pReq
->
subKey
,
strlen
(
pReq
->
subKey
));
if
(
code
!=
0
)
{
tqError
(
"cannot process tq delete req %s, since no such handle"
,
pReq
->
subKey
);
}
tqOffsetDelete
(
pTq
->
pOffsetStore
,
pReq
->
subKey
);
code
=
tqOffsetDelete
(
pTq
->
pOffsetStore
,
pReq
->
subKey
);
if
(
code
!=
0
)
{
tqError
(
"cannot process tq delete req %s, since no such offset"
,
pReq
->
subKey
);
}
if
(
tqMetaDeleteHandle
(
pTq
,
pReq
->
subKey
)
<
0
)
{
ASSERT
(
0
);
...
...
@@ -756,7 +859,9 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
atomic_add_fetch_32
(
&
pHandle
->
epoch
,
1
);
if
(
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
)
<
0
)
{
// TODO
ASSERT
(
0
);
}
// close handle
}
return
0
;
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
7d1a2abe
...
...
@@ -15,7 +15,7 @@
#include "tq.h"
static
int32_t
tqAddBlockDataToRsp
(
const
SSDataBlock
*
pBlock
,
SMqDataRsp
*
pRsp
,
int32_t
numOfCols
)
{
int32_t
tqAddBlockDataToRsp
(
const
SSDataBlock
*
pBlock
,
SMqDataRsp
*
pRsp
,
int32_t
numOfCols
)
{
int32_t
dataStrLen
=
sizeof
(
SRetrieveTableRsp
)
+
blockGetEncodeSize
(
pBlock
);
void
*
buf
=
taosMemoryCalloc
(
1
,
dataStrLen
);
if
(
buf
==
NULL
)
return
-
1
;
...
...
@@ -243,7 +243,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
}
if
(
pHandle
->
fetchMeta
)
{
SSubmitBlk
*
pBlk
=
pReader
->
pBlock
;
int32_t
schemaLen
=
htonl
(
pBlk
->
schemaLen
);
int32_t
schemaLen
=
htonl
(
pBlk
->
schemaLen
);
if
(
schemaLen
>
0
)
{
if
(
pRsp
->
createTableNum
==
0
)
{
pRsp
->
createTableLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
...
...
@@ -278,7 +278,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
}
if
(
pHandle
->
fetchMeta
)
{
SSubmitBlk
*
pBlk
=
pReader
->
pBlock
;
int32_t
schemaLen
=
htonl
(
pBlk
->
schemaLen
);
int32_t
schemaLen
=
htonl
(
pBlk
->
schemaLen
);
if
(
schemaLen
>
0
)
{
if
(
pRsp
->
createTableNum
==
0
)
{
pRsp
->
createTableLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
7d1a2abe
...
...
@@ -213,6 +213,97 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif
int
tqPushMsg
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
)
{
tqDebug
(
"vgId:%d tq push msg ver %ld, type: %s"
,
pTq
->
pVnode
->
config
.
vgId
,
ver
,
TMSG_INFO
(
msgType
));
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
// lock push mgr to avoid potential msg lost
taosWLockLatch
(
&
pTq
->
pushLock
);
tqDebug
(
"vgId:%d push handle num %d"
,
pTq
->
pVnode
->
config
.
vgId
,
taosHashGetSize
(
pTq
->
pPushMgr
));
if
(
taosHashGetSize
(
pTq
->
pPushMgr
)
!=
0
)
{
SArray
*
cachedKeys
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
SArray
*
cachedKeyLens
=
taosArrayInit
(
0
,
sizeof
(
size_t
));
void
*
data
=
taosMemoryMalloc
(
msgLen
);
if
(
data
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
tqError
(
"failed to copy data for stream since out of memory"
);
return
-
1
;
}
memcpy
(
data
,
msg
,
msgLen
);
SSubmitReq
*
pReq
=
(
SSubmitReq
*
)
data
;
pReq
->
version
=
ver
;
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
pPushMgr
,
pIter
);
if
(
pIter
==
NULL
)
break
;
STqPushEntry
*
pPushEntry
=
*
(
STqPushEntry
**
)
pIter
;
STqHandle
*
pHandle
=
taosHashGet
(
pTq
->
pHandle
,
pPushEntry
->
subKey
,
strlen
(
pPushEntry
->
subKey
));
if
(
pHandle
==
NULL
)
{
tqDebug
(
"vgId:%d cannot find handle %s"
,
pTq
->
pVnode
->
config
.
vgId
,
pPushEntry
->
subKey
);
continue
;
}
if
(
pPushEntry
->
dataRsp
.
reqOffset
.
version
>
ver
)
{
tqDebug
(
"vgId:%d push entry req version %ld, while push version %ld, skip"
,
pTq
->
pVnode
->
config
.
vgId
,
pPushEntry
->
dataRsp
.
reqOffset
.
version
,
ver
);
continue
;
}
STqExecHandle
*
pExec
=
&
pHandle
->
execHandle
;
qTaskInfo_t
task
=
pExec
->
task
;
SMqDataRsp
*
pRsp
=
&
pPushEntry
->
dataRsp
;
// prepare scan mem data
qStreamScanMemData
(
task
,
pReq
);
// exec
while
(
1
)
{
SSDataBlock
*
pDataBlock
=
NULL
;
uint64_t
ts
=
0
;
if
(
qExecTask
(
task
,
&
pDataBlock
,
&
ts
)
<
0
)
{
ASSERT
(
0
);
}
if
(
pDataBlock
==
NULL
)
{
break
;
}
tqAddBlockDataToRsp
(
pDataBlock
,
pRsp
,
pExec
->
numOfCols
);
pRsp
->
blockNum
++
;
}
tqDebug
(
"vgId:%d tq handle push, subkey: %s, block num: %d"
,
pTq
->
pVnode
->
config
.
vgId
,
pPushEntry
->
subKey
,
pRsp
->
blockNum
);
if
(
pRsp
->
blockNum
>
0
)
{
// set offset
tqOffsetResetToLog
(
&
pRsp
->
rspOffset
,
ver
);
// remove from hash
size_t
kLen
;
void
*
key
=
taosHashGetKey
(
pIter
,
&
kLen
);
void
*
keyCopy
=
taosMemoryMalloc
(
kLen
);
memcpy
(
keyCopy
,
key
,
kLen
);
taosArrayPush
(
cachedKeys
,
&
keyCopy
);
taosArrayPush
(
cachedKeyLens
,
&
kLen
);
tqPushDataRsp
(
pTq
,
pPushEntry
);
}
}
// delete entry
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
cachedKeys
);
i
++
)
{
void
*
key
=
taosArrayGetP
(
cachedKeys
,
i
);
size_t
kLen
=
*
(
size_t
*
)
taosArrayGet
(
cachedKeyLens
,
i
);
if
(
taosHashRemove
(
pTq
->
pPushMgr
,
key
,
kLen
)
!=
0
)
{
ASSERT
(
0
);
}
}
taosArrayDestroyP
(
cachedKeys
,
(
FDelete
)
taosMemoryFree
);
taosArrayDestroy
(
cachedKeyLens
);
}
// unlock
taosWUnLockLatch
(
&
pTq
->
pushLock
);
}
if
(
vnodeIsRoleLeader
(
pTq
->
pVnode
))
{
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
taosHashGetSize
(
pTq
->
pStreamMeta
->
pTasks
)
==
0
)
return
0
;
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
7d1a2abe
...
...
@@ -15,21 +15,20 @@
#include "tq.h"
bool
isValValidForTable
(
STqHandle
*
pHandle
,
SWalCont
*
pHead
){
if
(
pHandle
->
execHandle
.
subType
!=
TOPIC_SUB_TYPE__TABLE
){
bool
isValValidForTable
(
STqHandle
*
pHandle
,
SWalCont
*
pHead
)
{
if
(
pHandle
->
execHandle
.
subType
!=
TOPIC_SUB_TYPE__TABLE
)
{
return
true
;
}
int16_t
msgType
=
pHead
->
msgType
;
char
*
body
=
pHead
->
body
;
int32_t
bodyLen
=
pHead
->
bodyLen
;
int16_t
msgType
=
pHead
->
msgType
;
char
*
body
=
pHead
->
body
;
int32_t
bodyLen
=
pHead
->
bodyLen
;
int64_t
tbSuid
=
pHandle
->
execHandle
.
execTb
.
suid
;
int64_t
realTbSuid
=
0
;
SDecoder
coder
;
void
*
data
=
POINTER_SHIFT
(
body
,
sizeof
(
SMsgHead
));
int32_t
len
=
bodyLen
-
sizeof
(
SMsgHead
);
int64_t
tbSuid
=
pHandle
->
execHandle
.
execTb
.
suid
;
int64_t
realTbSuid
=
0
;
SDecoder
coder
;
void
*
data
=
POINTER_SHIFT
(
body
,
sizeof
(
SMsgHead
));
int32_t
len
=
bodyLen
-
sizeof
(
SMsgHead
);
tDecoderInit
(
&
coder
,
data
,
len
);
if
(
msgType
==
TDMT_VND_CREATE_STB
||
msgType
==
TDMT_VND_ALTER_STB
)
{
...
...
@@ -43,38 +42,38 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
if
(
tDecodeSVDropStbReq
(
&
coder
,
&
req
)
<
0
)
{
goto
end
;
}
realTbSuid
=
req
.
suid
;
realTbSuid
=
req
.
suid
;
}
else
if
(
msgType
==
TDMT_VND_CREATE_TABLE
)
{
SVCreateTbBatchReq
req
=
{
0
};
if
(
tDecodeSVCreateTbBatchReq
(
&
coder
,
&
req
)
<
0
)
{
goto
end
;
}
int32_t
needRebuild
=
0
;
int32_t
needRebuild
=
0
;
SVCreateTbReq
*
pCreateReq
=
NULL
;
for
(
int32_t
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
pCreateReq
=
req
.
pReqs
+
iReq
;
if
(
pCreateReq
->
type
==
TSDB_CHILD_TABLE
&&
pCreateReq
->
ctb
.
suid
==
tbSuid
)
{
if
(
pCreateReq
->
type
==
TSDB_CHILD_TABLE
&&
pCreateReq
->
ctb
.
suid
==
tbSuid
)
{
needRebuild
++
;
}
}
if
(
needRebuild
==
0
)
{
if
(
needRebuild
==
0
)
{
// do nothing
}
else
if
(
needRebuild
==
req
.
nReqs
)
{
}
else
if
(
needRebuild
==
req
.
nReqs
)
{
realTbSuid
=
tbSuid
;
}
else
{
}
else
{
realTbSuid
=
tbSuid
;
SVCreateTbBatchReq
reqNew
=
{
0
};
reqNew
.
pArray
=
taosArrayInit
(
req
.
nReqs
,
sizeof
(
struct
SVCreateTbReq
));
for
(
int32_t
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
pCreateReq
=
req
.
pReqs
+
iReq
;
if
(
pCreateReq
->
type
==
TSDB_CHILD_TABLE
&&
pCreateReq
->
ctb
.
suid
==
tbSuid
)
{
if
(
pCreateReq
->
type
==
TSDB_CHILD_TABLE
&&
pCreateReq
->
ctb
.
suid
==
tbSuid
)
{
reqNew
.
nReqs
++
;
taosArrayPush
(
reqNew
.
pArray
,
pCreateReq
);
}
}
int
tlen
;
int
tlen
;
int32_t
ret
=
0
;
tEncodeSize
(
tEncodeSVCreateTbBatchReq
,
&
reqNew
,
tlen
,
ret
);
void
*
buf
=
taosMemoryMalloc
(
tlen
);
...
...
@@ -107,7 +106,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
}
}
}
else
if
(
msgType
==
TDMT_VND_ALTER_TABLE
)
{
SVAlterTbReq
req
=
{
0
};
SVAlterTbReq
req
=
{
0
};
if
(
tDecodeSVAlterTbReq
(
&
coder
,
&
req
)
<
0
)
{
goto
end
;
...
...
@@ -129,32 +128,32 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
goto
end
;
}
int32_t
needRebuild
=
0
;
int32_t
needRebuild
=
0
;
SVDropTbReq
*
pDropReq
=
NULL
;
for
(
int32_t
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
pDropReq
=
req
.
pReqs
+
iReq
;
if
(
pDropReq
->
suid
==
tbSuid
)
{
if
(
pDropReq
->
suid
==
tbSuid
)
{
needRebuild
++
;
}
}
if
(
needRebuild
==
0
)
{
if
(
needRebuild
==
0
)
{
// do nothing
}
else
if
(
needRebuild
==
req
.
nReqs
)
{
}
else
if
(
needRebuild
==
req
.
nReqs
)
{
realTbSuid
=
tbSuid
;
}
else
{
}
else
{
realTbSuid
=
tbSuid
;
SVDropTbBatchReq
reqNew
=
{
0
};
reqNew
.
pArray
=
taosArrayInit
(
req
.
nReqs
,
sizeof
(
SVDropTbReq
));
for
(
int32_t
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
pDropReq
=
req
.
pReqs
+
iReq
;
if
(
pDropReq
->
suid
==
tbSuid
)
{
if
(
pDropReq
->
suid
==
tbSuid
)
{
reqNew
.
nReqs
++
;
taosArrayPush
(
reqNew
.
pArray
,
pDropReq
);
}
}
int
tlen
;
int
tlen
;
int32_t
ret
=
0
;
tEncodeSize
(
tEncodeSVDropTbBatchReq
,
&
reqNew
,
tlen
,
ret
);
void
*
buf
=
taosMemoryMalloc
(
tlen
);
...
...
@@ -177,11 +176,11 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
goto
end
;
}
realTbSuid
=
req
.
suid
;
}
else
{
}
else
{
ASSERT
(
0
);
}
end:
end:
tDecoderClear
(
&
coder
);
return
tbSuid
==
realTbSuid
;
}
...
...
@@ -224,7 +223,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
code
=
-
1
;
goto
END
;
}
if
(
isValValidForTable
(
pHandle
,
pHead
))
{
if
(
isValValidForTable
(
pHandle
,
pHead
))
{
*
fetchOffset
=
offset
;
code
=
0
;
goto
END
;
...
...
@@ -241,7 +240,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
offset
++
;
}
}
END:
END:
taosThreadMutexUnlock
(
&
pHandle
->
pWalReader
->
mutex
);
return
code
;
}
...
...
@@ -315,14 +314,18 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
return
-
1
;
}
void
*
body
=
pReader
->
pWalReader
->
pHead
->
head
.
body
;
#if 0
if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
// TODO do filter
ret->fetchType = FETCH_TYPE__META;
ret->meta = pReader->pWalReader->pHead->head.body;
return 0;
} else {
tqReaderSetDataMsg
(
pReader
,
body
,
pReader
->
pWalReader
->
pHead
->
head
.
version
);
#endif
tqReaderSetDataMsg
(
pReader
,
body
,
pReader
->
pWalReader
->
pHead
->
head
.
version
);
#if 0
}
#endif
}
while
(
tqNextDataBlock
(
pReader
))
{
...
...
@@ -334,6 +337,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
continue
;
}
ret
->
fetchType
=
FETCH_TYPE__DATA
;
tqDebug
(
"return data rows %d"
,
ret
->
data
.
info
.
rows
);
return
0
;
}
...
...
@@ -341,14 +345,14 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
ret
->
offset
.
type
=
TMQ_OFFSET__LOG
;
ret
->
offset
.
version
=
pReader
->
ver
;
ASSERT
(
pReader
->
ver
>=
0
);
ret
->
fetchType
=
FETCH_TYPE__
NONE
;
ret
->
fetchType
=
FETCH_TYPE__
SEP
;
tqDebug
(
"return offset %"
PRId64
", processed finish"
,
ret
->
offset
.
version
);
return
0
;
}
}
}
int32_t
tqReaderSetDataMsg
(
STqReader
*
pReader
,
SSubmitReq
*
pMsg
,
int64_t
ver
)
{
int32_t
tqReaderSetDataMsg
(
STqReader
*
pReader
,
const
SSubmitReq
*
pMsg
,
int64_t
ver
)
{
pReader
->
pMsg
=
pMsg
;
if
(
tInitSubmitMsgIter
(
pMsg
,
&
pReader
->
msgIter
)
<
0
)
return
-
1
;
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
7d1a2abe
...
...
@@ -316,6 +316,11 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return
0
;
}
if
(
pMsg
->
msgType
==
TDMT_VND_CONSUME
&&
!
pVnode
->
restored
)
{
vnodeRedirectRpcMsg
(
pVnode
,
pMsg
);
return
0
;
}
char
*
msgstr
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
int32_t
msgLen
=
pMsg
->
contLen
-
sizeof
(
SMsgHead
);
...
...
@@ -808,7 +813,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
SSubmitRsp
submitRsp
=
{
0
};
SSubmitMsgIter
msgIter
=
{
0
};
SSubmitBlk
*
pBlock
;
SSubmitRsp
rsp
=
{
0
};
SVCreateTbReq
createTbReq
=
{
0
};
SDecoder
decoder
=
{
0
};
int32_t
nRows
;
...
...
@@ -921,7 +925,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
}
if
(
taosArrayGetSize
(
newTbUids
)
>
0
)
{
vDebug
(
"vgId:%d, add %d table into query table list in handling submit"
,
TD_VID
(
pVnode
),
(
int32_t
)
taosArrayGetSize
(
newTbUids
));
vDebug
(
"vgId:%d, add %d table into query table list in handling submit"
,
TD_VID
(
pVnode
),
(
int32_t
)
taosArrayGetSize
(
newTbUids
));
}
tqUpdateTbUidList
(
pVnode
->
pTq
,
newTbUids
,
true
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
7d1a2abe
...
...
@@ -146,6 +146,7 @@ typedef struct {
SMqMetaRsp
metaRsp
;
// for tmq fetching meta
int8_t
returned
;
int64_t
snapshotVer
;
const
SSubmitReq
*
pReq
;
SSchemaWrapper
*
schema
;
char
tbName
[
TSDB_TABLE_NAME_LEN
];
...
...
@@ -192,6 +193,7 @@ enum {
OP_OPENED
=
0x1
,
OP_RES_TO_RETURN
=
0x5
,
OP_EXEC_DONE
=
0x9
,
OP_EXEC_RECV
=
0x11
,
};
typedef
struct
SOperatorFpSet
{
...
...
source/libs/executor/src/executor.c
浏览文件 @
7d1a2abe
...
...
@@ -486,7 +486,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
if
(
pLocal
)
{
memcpy
(
&
pTaskInfo
->
localFetch
,
pLocal
,
sizeof
(
*
pLocal
));
}
taosArrayClearEx
(
pResList
,
freeBlock
);
int64_t
curOwner
=
0
;
...
...
@@ -774,6 +774,14 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s
return
TSDB_CODE_SUCCESS
;
}
int32_t
qStreamScanMemData
(
qTaskInfo_t
tinfo
,
const
SSubmitReq
*
pReq
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_QUEUE
);
ASSERT
(
pTaskInfo
->
streamInfo
.
pReq
==
NULL
);
pTaskInfo
->
streamInfo
.
pReq
=
pReq
;
return
0
;
}
int32_t
qStreamPrepareScan
(
qTaskInfo_t
tinfo
,
STqOffsetVal
*
pOffset
,
int8_t
subType
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SOperatorInfo
*
pOperator
=
pTaskInfo
->
pRoot
;
...
...
source/libs/executor/src/projectoperator.c
浏览文件 @
7d1a2abe
...
...
@@ -53,7 +53,7 @@ static void destroyIndefinitOperatorInfo(void* param) {
SOperatorInfo
*
createProjectOperatorInfo
(
SOperatorInfo
*
downstream
,
SProjectPhysiNode
*
pProjPhyNode
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SProjectOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SProjectOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -184,7 +184,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
if
(
pLimitInfo
->
limit
.
limit
>=
0
&&
pLimitInfo
->
numOfOutputRows
+
pBlock
->
info
.
rows
>=
pLimitInfo
->
limit
.
limit
)
{
int32_t
keepRows
=
(
int32_t
)(
pLimitInfo
->
limit
.
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
pBlock
,
keepRows
);
//TODO: optimize it later when partition by + limit
//
TODO: optimize it later when partition by + limit
if
((
pLimitInfo
->
slimit
.
limit
==
-
1
&&
pLimitInfo
->
currentGroupId
==
0
)
||
(
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
))
{
doSetOperatorCompleted
(
pOperator
);
...
...
@@ -206,9 +206,16 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
blockDataCleanup
(
pFinalRes
);
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
if
(
pTaskInfo
->
streamInfo
.
pReq
)
{
pOperator
->
status
=
OP_OPENED
;
}
qDebug
(
"enter project"
);
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
if
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_QUEUE
)
{
pOperator
->
status
=
OP_OPENED
;
qDebug
(
"projection in queue model, set status open and return null"
);
return
NULL
;
}
...
...
@@ -237,9 +244,23 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
if
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_QUEUE
&&
pFinalRes
->
info
.
rows
==
0
)
{
pOperator
->
status
=
OP_OPENED
;
if
(
pOperator
->
status
==
OP_EXEC_RECV
)
{
continue
;
}
else
{
return
NULL
;
}
}
qDebug
(
"set op close, exec %d, status %d rows %d"
,
pTaskInfo
->
execModel
,
pOperator
->
status
,
pFinalRes
->
info
.
rows
);
doSetOperatorCompleted
(
pOperator
);
break
;
}
if
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_QUEUE
)
{
qDebug
(
"set status recv"
);
pOperator
->
status
=
OP_EXEC_RECV
;
}
// for stream interval
if
(
pBlock
->
info
.
type
==
STREAM_RETRIEVE
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
||
...
...
@@ -298,6 +319,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
if
(
pFinalRes
->
info
.
rows
>
0
||
(
pOperator
->
status
==
OP_EXEC_DONE
))
{
qDebug
(
"project return %d rows, status %d"
,
pFinalRes
->
info
.
rows
,
pOperator
->
status
);
break
;
}
}
else
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
7d1a2abe
...
...
@@ -1435,6 +1435,43 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
qDebug
(
"queue scan called"
);
if
(
pTaskInfo
->
streamInfo
.
pReq
!=
NULL
)
{
if
(
pInfo
->
tqReader
->
pMsg
==
NULL
)
{
pInfo
->
tqReader
->
pMsg
=
pTaskInfo
->
streamInfo
.
pReq
;
const
SSubmitReq
*
pSubmit
=
pInfo
->
tqReader
->
pMsg
;
if
(
tqReaderSetDataMsg
(
pInfo
->
tqReader
,
pSubmit
,
0
)
<
0
)
{
qError
(
"submit msg messed up when initing stream submit block %p"
,
pSubmit
);
pInfo
->
tqReader
->
pMsg
=
NULL
;
pTaskInfo
->
streamInfo
.
pReq
=
NULL
;
ASSERT
(
0
);
}
}
blockDataCleanup
(
pInfo
->
pRes
);
SDataBlockInfo
*
pBlockInfo
=
&
pInfo
->
pRes
->
info
;
while
(
tqNextDataBlock
(
pInfo
->
tqReader
))
{
SSDataBlock
block
=
{
0
};
int32_t
code
=
tqRetrieveDataBlock
(
&
block
,
pInfo
->
tqReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
block
.
info
.
rows
==
0
)
{
continue
;
}
setBlockIntoRes
(
pInfo
,
&
block
);
if
(
pBlockInfo
->
rows
>
0
)
{
return
pInfo
->
pRes
;
}
}
pInfo
->
tqReader
->
pMsg
=
NULL
;
pTaskInfo
->
streamInfo
.
pReq
=
NULL
;
return
NULL
;
}
if
(
pTaskInfo
->
streamInfo
.
prepareStatus
.
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
SSDataBlock
*
pResult
=
doTableScan
(
pInfo
->
pTableScanOp
);
if
(
pResult
&&
pResult
->
info
.
rows
>
0
)
{
...
...
@@ -1467,8 +1504,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if
(
setBlockIntoRes
(
pInfo
,
&
ret
.
data
)
<
0
)
{
ASSERT
(
0
);
}
// TODO clean data block
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
pOperator
->
status
=
OP_EXEC_RECV
;
qDebug
(
"queue scan log return %d rows"
,
pInfo
->
pRes
->
info
.
rows
);
return
pInfo
->
pRes
;
}
...
...
@@ -1477,18 +1514,19 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
// pTaskInfo->streamInfo.lastStatus = ret.offset;
// pTaskInfo->streamInfo.metaBlk = ret.meta;
// return NULL;
}
else
if
(
ret
.
fetchType
==
FETCH_TYPE__NONE
)
{
}
else
if
(
ret
.
fetchType
==
FETCH_TYPE__NONE
||
(
ret
.
fetchType
==
FETCH_TYPE__SEP
&&
pOperator
->
status
==
OP_EXEC_RECV
))
{
pTaskInfo
->
streamInfo
.
lastStatus
=
ret
.
offset
;
ASSERT
(
pTaskInfo
->
streamInfo
.
lastStatus
.
version
>=
pTaskInfo
->
streamInfo
.
prepareStatus
.
version
);
ASSERT
(
pTaskInfo
->
streamInfo
.
lastStatus
.
version
+
1
==
pInfo
->
tqReader
->
pWalReader
->
curVersion
);
char
formatBuf
[
80
];
tFormatOffset
(
formatBuf
,
80
,
&
ret
.
offset
);
qDebug
(
"queue scan log return null, offset %s"
,
formatBuf
);
pOperator
->
status
=
OP_OPENED
;
return
NULL
;
}
else
{
ASSERT
(
0
);
}
}
#if 0
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
...
...
@@ -1497,6 +1535,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
}
qDebug("stream scan tsdb return null");
return NULL;
#endif
}
else
{
ASSERT
(
0
);
return
NULL
;
...
...
tests/docs-examples-test/node.sh
浏览文件 @
7d1a2abe
...
...
@@ -23,7 +23,7 @@ node query_example.js
node async_query_example.js
node subscribe_demo.js
#
node subscribe_demo.js
taos
-s
"drop topic if exists topic_name_example"
taos
-s
"drop database if exists power"
...
...
@@ -39,4 +39,4 @@ taos -s "drop database if exists test"
node opentsdb_telnet_example.js
taos
-s
"drop database if exists test"
node opentsdb_json_example.js
\ No newline at end of file
node opentsdb_json_example.js
tests/script/jenkins/basic.txt
浏览文件 @
7d1a2abe
...
...
@@ -303,7 +303,7 @@
./test.sh -f tsim/insert/backquote.sim -m
# unsupport ./test.sh -f tsim/parser/fourArithmetic-basic.sim -m
./test.sh -f tsim/query/interval-offset.sim -m
./test.sh -f tsim/tmq/basic3.sim -m
# unsupport
./test.sh -f tsim/tmq/basic3.sim -m
./test.sh -f tsim/stable/vnode3.sim -m
./test.sh -f tsim/qnode/basic1.sim -m
# unsupport ./test.sh -f tsim/mnode/basic1.sim -m
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录