Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ebc9e438
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
ebc9e438
编写于
7月 08, 2022
作者:
L
Liu Jicong
提交者:
GitHub
7月 08, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14690 from taosdata/feature/stream
refactor(stream): internal refactor
上级
236c1d02
0fd4f3b5
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
428 addition
and
160 deletion
+428
-160
include/common/tcommon.h
include/common/tcommon.h
+4
-2
include/common/tdatablock.h
include/common/tdatablock.h
+1
-1
include/libs/executor/executor.h
include/libs/executor/executor.h
+7
-1
include/libs/wal/wal.h
include/libs/wal/wal.h
+1
-0
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+1
-1
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+5
-1
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+3
-0
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+1
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+50
-11
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+53
-6
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+18
-8
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+102
-92
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+3
-3
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+40
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+4
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+121
-13
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+12
-18
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-1
tests/system-test/failed.txt
tests/system-test/failed.txt
+1
-0
未找到文件。
include/common/tcommon.h
浏览文件 @
ebc9e438
...
...
@@ -55,7 +55,8 @@ enum {
enum
{
STREAM_INPUT__DATA_SUBMIT
=
1
,
STREAM_INPUT__DATA_BLOCK
,
STREAM_INPUT__DATA_SCAN
,
STREAM_INPUT__TABLE_SCAN
,
STREAM_INPUT__TQ_SCAN
,
STREAM_INPUT__DATA_RETRIEVE
,
STREAM_INPUT__TRIGGER
,
STREAM_INPUT__CHECKPOINT
,
...
...
@@ -122,7 +123,8 @@ enum {
};
typedef
struct
{
int8_t
fetchType
;
int8_t
fetchType
;
STqOffsetVal
offset
;
union
{
SSDataBlock
data
;
void
*
meta
;
...
...
include/common/tdatablock.h
浏览文件 @
ebc9e438
...
...
@@ -231,7 +231,7 @@ SSDataBlock* createDataBlock();
int32_t
blockDataAppendColInfo
(
SSDataBlock
*
pBlock
,
SColumnInfoData
*
pColInfoData
);
SColumnInfoData
createColumnInfoData
(
int16_t
type
,
int32_t
bytes
,
int16_t
colId
);
SColumnInfoData
*
bdGetColumnInfoData
(
SSDataBlock
*
pBlock
,
int32_t
index
);
SColumnInfoData
*
bdGetColumnInfoData
(
const
SSDataBlock
*
pBlock
,
int32_t
index
);
void
blockEncode
(
const
SSDataBlock
*
pBlock
,
char
*
data
,
int32_t
*
dataLen
,
int32_t
numOfCols
,
int8_t
needCompress
);
const
char
*
blockDecode
(
SSDataBlock
*
pBlock
,
int32_t
numOfCols
,
int32_t
numOfRows
,
const
char
*
pData
);
...
...
include/libs/executor/executor.h
浏览文件 @
ebc9e438
...
...
@@ -174,7 +174,13 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
*/
int32_t
qGetStreamScanStatus
(
qTaskInfo_t
tinfo
,
uint64_t
*
uid
,
int64_t
*
ts
);
int32_t
qStreamPrepareScan
(
qTaskInfo_t
tinfo
,
uint64_t
uid
,
int64_t
ts
);
int32_t
qStreamPrepareTsdbScan
(
qTaskInfo_t
tinfo
,
uint64_t
uid
,
int64_t
ts
);
int32_t
qStreamPrepareScan1
(
qTaskInfo_t
tinfo
,
const
STqOffsetVal
*
pOffset
);
int32_t
qStreamExtractOffset
(
qTaskInfo_t
tinfo
,
STqOffsetVal
*
pOffset
);
void
*
qStreamExtractMetaMsg
(
qTaskInfo_t
tinfo
);
void
*
qExtractReaderFromStreamScanner
(
void
*
scanner
);
int32_t
qExtractStreamScanner
(
qTaskInfo_t
tinfo
,
void
**
scanner
);
...
...
include/libs/wal/wal.h
浏览文件 @
ebc9e438
...
...
@@ -194,6 +194,7 @@ int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
SWalReader
*
walOpenReader
(
SWal
*
,
SWalFilterCond
*
pCond
);
void
walCloseReader
(
SWalReader
*
pRead
);
int32_t
walReadVer
(
SWalReader
*
pRead
,
int64_t
ver
);
int32_t
walReadSeekVer
(
SWalReader
*
pRead
,
int64_t
ver
);
int32_t
walNextValidMsg
(
SWalReader
*
pRead
);
// only for tq usage
...
...
source/common/src/tdatablock.c
浏览文件 @
ebc9e438
...
...
@@ -1356,7 +1356,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId)
return
col
;
}
SColumnInfoData
*
bdGetColumnInfoData
(
SSDataBlock
*
pBlock
,
int32_t
index
)
{
SColumnInfoData
*
bdGetColumnInfoData
(
const
SSDataBlock
*
pBlock
,
int32_t
index
)
{
ASSERT
(
pBlock
!=
NULL
);
if
(
index
>=
taosArrayGetSize
(
pBlock
->
pDataBlock
))
{
return
NULL
;
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
ebc9e438
...
...
@@ -546,7 +546,11 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
char
cgroup
[
TSDB_CGROUP_LEN
];
mndSplitSubscribeKey
(
pRebInfo
->
key
,
topic
,
cgroup
,
true
);
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
topic
);
ASSERT
(
pTopic
);
/*ASSERT(pTopic);*/
if
(
pTopic
==
NULL
)
{
mError
(
"rebalance %s failed since topic %s was dropped, abort"
,
pRebInfo
->
key
,
topic
);
continue
;
}
taosRLockLatch
(
&
pTopic
->
lock
);
rebOutput
.
pSub
=
mndCreateSub
(
pMnode
,
pTopic
,
pRebInfo
->
key
);
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
ebc9e438
...
...
@@ -174,6 +174,9 @@ int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t
tqReaderAddTbUidList
(
STqReader
*
pReader
,
const
SArray
*
tbUidList
);
int32_t
tqReaderRemoveTbUidList
(
STqReader
*
pReader
,
const
SArray
*
tbUidList
);
int32_t
tqSeekVer
(
STqReader
*
pReader
,
int64_t
ver
);
int32_t
tqNextBlock
(
STqReader
*
pReader
,
SFetchRet
*
ret
);
int32_t
tqReaderSetDataMsg
(
STqReader
*
pReader
,
SSubmitReq
*
pMsg
,
int64_t
ver
);
bool
tqNextDataBlock
(
STqReader
*
pReader
);
bool
tqNextDataBlockFilterOut
(
STqReader
*
pReader
,
SHashObj
*
filterOutUids
);
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
ebc9e438
...
...
@@ -129,6 +129,7 @@ typedef struct {
static
STqMgmt
tqMgmt
=
{
0
};
// tqRead
int64_t
tqScanLog
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
offset
);
int64_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
SWalCkHead
**
pHeadWithCkSum
);
// tqExec
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
ebc9e438
...
...
@@ -244,11 +244,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
STqOffsetVal
fetchOffsetNew
;
// 1.find handle
char
buf
[
80
];
tFormatOffset
(
buf
,
80
,
&
reqOffset
);
tqDebug
(
"tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %s"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
buf
);
STqHandle
*
pHandle
=
taosHashGet
(
pTq
->
handles
,
pReq
->
subKey
,
strlen
(
pReq
->
subKey
));
/*ASSERT(pHandle);*/
if
(
pHandle
==
NULL
)
{
...
...
@@ -270,6 +265,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
consumerEpoch
=
atomic_val_compare_exchange_32
(
&
pHandle
->
epoch
,
consumerEpoch
,
reqEpoch
);
}
char
buf
[
80
];
tFormatOffset
(
buf
,
80
,
&
reqOffset
);
tqDebug
(
"tmq poll: consumer %ld (epoch %d), subkey %s, recv poll req in vg %d, req offset %s"
,
consumerId
,
pReq
->
epoch
,
pHandle
->
subKey
,
TD_VID
(
pTq
->
pVnode
),
buf
);
// 2.reset offset if needed
if
(
reqOffset
.
type
>
0
)
{
fetchOffsetNew
=
reqOffset
;
...
...
@@ -279,7 +279,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
fetchOffsetNew
=
pOffset
->
val
;
char
formatBuf
[
80
];
tFormatOffset
(
formatBuf
,
80
,
&
fetchOffsetNew
);
tqDebug
(
"tmq poll: consumer %ld,
offset reset to %s"
,
consumerId
,
formatBuf
);
tqDebug
(
"tmq poll: consumer %ld,
subkey %s, offset reset to %s"
,
consumerId
,
pHandle
->
subKey
,
formatBuf
);
}
else
{
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_EARLIEAST
)
{
if
(
pReq
->
useSnapshot
&&
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
...
...
@@ -294,9 +294,29 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
}
else
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_LATEST
)
{
tqOffsetResetToLog
(
&
fetchOffsetNew
,
walGetLastVer
(
pTq
->
pVnode
->
pWal
));
tqDebug
(
"tmq poll: consumer %ld, subkey %s, offset reset to %ld"
,
consumerId
,
pHandle
->
subKey
,
fetchOffsetNew
.
version
);
SMqDataRsp
dataRsp
=
{
0
};
tqInitDataRsp
(
&
dataRsp
,
pReq
,
pHandle
->
execHandle
.
subType
);
dataRsp
.
rspOffset
=
fetchOffsetNew
;
code
=
0
;
if
(
tqSendDataRsp
(
pTq
,
pMsg
,
pReq
,
&
dataRsp
)
<
0
)
{
code
=
-
1
;
}
taosArrayDestroy
(
dataRsp
.
blockDataLen
);
taosArrayDestroyP
(
dataRsp
.
blockData
,
(
FDelete
)
taosMemoryFree
);
if
(
dataRsp
.
withSchema
)
{
taosArrayDestroyP
(
dataRsp
.
blockSchema
,
(
FDelete
)
tDeleteSSchemaWrapper
);
}
if
(
dataRsp
.
withTbName
)
{
taosArrayDestroyP
(
dataRsp
.
blockTbName
,
(
FDelete
)
taosMemoryFree
);
}
return
code
;
}
else
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_NONE
)
{
tqError
(
"tmq poll:
no offset committed for consumer %ld in vg %d, subkey %s, reset none failed"
,
consumerId
,
TD_VID
(
pTq
->
pVnode
),
pReq
->
subKey
);
tqError
(
"tmq poll:
subkey %s, no offset committed for consumer %ld in vg %d, subkey %s, reset none failed"
,
pHandle
->
subKey
,
consumerId
,
TD_VID
(
pTq
->
pVnode
),
pReq
->
subKey
);
terrno
=
TSDB_CODE_TQ_NO_COMMITTED_OFFSET
;
return
-
1
;
}
...
...
@@ -307,7 +327,24 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqDataRsp
dataRsp
=
{
0
};
tqInitDataRsp
(
&
dataRsp
,
pReq
,
pHandle
->
execHandle
.
subType
);
if
(
fetchOffsetNew
.
type
==
TMQ_OFFSET__LOG
)
{
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
&&
fetchOffsetNew
.
type
==
TMQ_OFFSET__LOG
)
{
fetchOffsetNew
.
version
++
;
if
(
tqScanLog
(
pTq
,
&
pHandle
->
execHandle
,
&
dataRsp
,
&
fetchOffsetNew
)
<
0
)
{
ASSERT
(
0
);
code
=
-
1
;
goto
OVER
;
}
if
(
dataRsp
.
blockNum
==
0
)
{
// TODO add to async task
/*dataRsp.rspOffset.version--;*/
}
if
(
tqSendDataRsp
(
pTq
,
pMsg
,
pReq
,
&
dataRsp
)
<
0
)
{
code
=
-
1
;
}
goto
OVER
;
}
if
(
pHandle
->
execHandle
.
subType
!=
TOPIC_SUB_TYPE__COLUMN
&&
fetchOffsetNew
.
type
==
TMQ_OFFSET__LOG
)
{
int64_t
fetchVer
=
fetchOffsetNew
.
version
+
1
;
SWalCkHead
*
pCkHead
=
taosMemoryMalloc
(
sizeof
(
SWalCkHead
)
+
2048
);
if
(
pCkHead
==
NULL
)
{
...
...
@@ -319,8 +356,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
while
(
1
)
{
consumerEpoch
=
atomic_load_32
(
&
pHandle
->
epoch
);
if
(
consumerEpoch
>
reqEpoch
)
{
tqWarn
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d, discard req epoch %d"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
fetchVer
,
consumerEpoch
,
reqEpoch
);
tqWarn
(
"tmq poll: consumer %ld (epoch %d), subkey %s, vg %d offset %ld, found new consumer epoch %d, discard req "
"epoch %d"
,
consumerId
,
pReq
->
epoch
,
pHandle
->
subKey
,
TD_VID
(
pTq
->
pVnode
),
fetchVer
,
consumerEpoch
,
reqEpoch
);
break
;
}
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
ebc9e438
...
...
@@ -46,7 +46,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerI
return
0
;
}
static
int32_t
tqAddTbNameToRsp
(
const
STQ
*
pTq
,
int64_t
uid
,
SMqDataRsp
*
pRsp
,
int32_t
workerId
)
{
static
int32_t
tqAddTbNameToRsp
(
const
STQ
*
pTq
,
int64_t
uid
,
SMqDataRsp
*
pRsp
)
{
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pTq
->
pVnode
->
pMeta
,
0
);
if
(
metaGetTableEntryByUid
(
&
mr
,
uid
)
<
0
)
{
...
...
@@ -59,6 +59,53 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i
return
0
;
}
int64_t
tqScanLog
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
pOffset
)
{
qTaskInfo_t
task
=
pExec
->
execCol
.
task
[
0
];
if
(
qStreamPrepareScan1
(
task
,
pOffset
)
<
0
)
{
pRsp
->
rspOffset
=
*
pOffset
;
pRsp
->
rspOffset
.
version
--
;
return
0
;
}
while
(
1
)
{
SSDataBlock
*
pDataBlock
=
NULL
;
uint64_t
ts
=
0
;
if
(
qExecTask
(
task
,
&
pDataBlock
,
&
ts
)
<
0
)
{
ASSERT
(
0
);
}
if
(
pDataBlock
!=
NULL
)
{
tqAddBlockDataToRsp
(
pDataBlock
,
pRsp
);
if
(
pRsp
->
withTbName
)
{
int64_t
uid
=
pExec
->
pExecReader
[
0
]
->
msgIter
.
uid
;
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
);
}
pRsp
->
blockNum
++
;
continue
;
}
void
*
meta
=
qStreamExtractMetaMsg
(
task
);
if
(
meta
!=
NULL
)
{
// tq add meta to rsp
}
if
(
qStreamExtractOffset
(
task
,
&
pRsp
->
rspOffset
)
<
0
)
{
ASSERT
(
0
);
}
if
(
pRsp
->
rspOffset
.
type
==
TMQ_OFFSET__LOG
)
{
ASSERT
(
pRsp
->
rspOffset
.
version
+
1
>=
pRsp
->
reqOffset
.
version
);
}
ASSERT
(
pRsp
->
rspOffset
.
type
!=
0
);
break
;
}
return
0
;
}
int32_t
tqScanSnapshot
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
offset
,
int32_t
workerId
)
{
ASSERT
(
pExec
->
subType
==
TOPIC_SUB_TYPE__COLUMN
);
qTaskInfo_t
task
=
pExec
->
execCol
.
task
[
workerId
];
...
...
@@ -67,7 +114,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
/*ASSERT(0);*/
/*}*/
if
(
qStreamPrepareScan
(
task
,
offset
.
uid
,
offset
.
ts
)
<
0
)
{
if
(
qStreamPrepare
Tsdb
Scan
(
task
,
offset
.
uid
,
offset
.
ts
)
<
0
)
{
ASSERT
(
0
);
}
...
...
@@ -93,7 +140,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
ASSERT(0);
}
tqAddTbNameToRsp(pTq, uid, pRsp
, workerId
);
tqAddTbNameToRsp(pTq, uid, pRsp);
#endif
}
pRsp
->
blockNum
++
;
...
...
@@ -129,7 +176,7 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
tqAddBlockDataToRsp
(
pDataBlock
,
pRsp
);
if
(
pRsp
->
withTbName
)
{
int64_t
uid
=
pExec
->
pExecReader
[
workerId
]
->
msgIter
.
uid
;
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
,
workerId
);
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
);
}
pRsp
->
blockNum
++
;
}
...
...
@@ -146,7 +193,7 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
tqAddBlockDataToRsp
(
&
block
,
pRsp
);
if
(
pRsp
->
withTbName
)
{
int64_t
uid
=
pExec
->
pExecReader
[
workerId
]
->
msgIter
.
uid
;
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
,
workerId
);
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
);
}
tqAddBlockSchemaToRsp
(
pExec
,
workerId
,
pRsp
);
pRsp
->
blockNum
++
;
...
...
@@ -164,7 +211,7 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
tqAddBlockDataToRsp
(
&
block
,
pRsp
);
if
(
pRsp
->
withTbName
)
{
int64_t
uid
=
pExec
->
pExecReader
[
workerId
]
->
msgIter
.
uid
;
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
,
workerId
);
tqAddTbNameToRsp
(
pTq
,
uid
,
pRsp
);
}
tqAddBlockSchemaToRsp
(
pExec
,
workerId
,
pRsp
);
pRsp
->
blockNum
++
;
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
ebc9e438
...
...
@@ -15,11 +15,6 @@
#include "tq.h"
int64_t
tqScanLog
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
offset
)
{
/*if ()*/
return
0
;
}
int64_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
SWalCkHead
**
ppCkHead
)
{
int32_t
code
=
0
;
taosThreadMutexLock
(
&
pHandle
->
pWalReader
->
mutex
);
...
...
@@ -84,8 +79,10 @@ STqReader* tqOpenReader(SVnode* pVnode) {
return
NULL
;
}
// TODO open
/*pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);*/
pReader
->
pWalReader
=
walOpenReader
(
pVnode
->
pWal
,
NULL
);
if
(
pReader
->
pWalReader
==
NULL
)
{
return
NULL
;
}
pReader
->
pVnodeMeta
=
pVnode
->
pMeta
;
pReader
->
pMsg
=
NULL
;
...
...
@@ -106,12 +103,19 @@ void tqCloseReader(STqReader* pReader) {
taosMemoryFree
(
pReader
);
}
int32_t
tqSeekVer
(
STqReader
*
pReader
,
int64_t
ver
)
{
//
return
walReadSeekVer
(
pReader
->
pWalReader
,
ver
);
}
int32_t
tqNextBlock
(
STqReader
*
pReader
,
SFetchRet
*
ret
)
{
bool
fromProcessedMsg
=
pReader
->
pMsg
!=
NULL
;
while
(
1
)
{
if
(
!
fromProcessedMsg
)
{
if
(
walNextValidMsg
(
pReader
->
pWalReader
)
<
0
)
{
ret
->
offset
.
type
=
TMQ_OFFSET__LOG
;
ret
->
offset
.
version
=
pReader
->
ver
;
ret
->
fetchType
=
FETCH_TYPE__NONE
;
return
-
1
;
}
...
...
@@ -130,19 +134,25 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
memset
(
&
ret
->
data
,
0
,
sizeof
(
SSDataBlock
));
int32_t
code
=
tqRetrieveDataBlock
(
&
ret
->
data
,
pReader
);
if
(
code
!=
0
||
ret
->
data
.
info
.
rows
==
0
)
{
ASSERT
(
0
);
continue
;
#if 0
if (fromProcessedMsg) {
ret->fetchType = FETCH_TYPE__NONE;
return 0;
} else {
break;
}
#endif
}
ret
->
fetchType
=
FETCH_TYPE__DATA
;
return
0
;
}
if
(
fromProcessedMsg
)
{
ret
->
offset
.
type
=
TMQ_OFFSET__LOG
;
ret
->
offset
.
version
=
pReader
->
ver
;
ASSERT
(
pReader
->
ver
!=
-
1
);
ret
->
fetchType
=
FETCH_TYPE__NONE
;
return
0
;
}
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
ebc9e438
...
...
@@ -51,13 +51,12 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
#define START_TS_COLUMN_INDEX 0
#define END_TS_COLUMN_INDEX 1
#define UID_COLUMN_INDEX 2
#define GROUPID_COLUMN_INDEX UID_COLUMN_INDEX
#define START_TS_COLUMN_INDEX
0
#define END_TS_COLUMN_INDEX
1
#define UID_COLUMN_INDEX
2
#define GROUPID_COLUMN_INDEX
UID_COLUMN_INDEX
#define DELETE_GROUPID_COLUMN_INDEX 2
enum
{
// when this task starts to execute, this status will set
TASK_NOT_COMPLETED
=
0x1u
,
...
...
@@ -81,8 +80,8 @@ typedef struct SResultInfo { // TODO refactor
}
SResultInfo
;
typedef
struct
STableQueryInfo
{
TSKEY
lastKey
;
// last check ts, todo remove it later
SResultRowPosition
pos
;
// current active time window
TSKEY
lastKey
;
// last check ts, todo remove it later
SResultRowPosition
pos
;
// current active time window
}
STableQueryInfo
;
typedef
struct
SLimit
{
...
...
@@ -105,7 +104,7 @@ typedef struct STaskCostInfo {
uint64_t
loadDataTime
;
SFileBlockLoadRecorder
*
pRecoder
;
uint64_t
elapsedTime
;
uint64_t
elapsedTime
;
uint64_t
firstStageMergeTime
;
uint64_t
winInfoSize
;
...
...
@@ -118,8 +117,8 @@ typedef struct STaskCostInfo {
}
STaskCostInfo
;
typedef
struct
SOperatorCostInfo
{
double
openCost
;
double
totalCost
;
double
openCost
;
double
totalCost
;
}
SOperatorCostInfo
;
struct
SOperatorInfo
;
...
...
@@ -139,24 +138,35 @@ typedef struct STaskIdInfo {
char
*
str
;
}
STaskIdInfo
;
typedef
struct
{
STqOffsetVal
prepareStatus
;
// for tmq
STqOffsetVal
lastStatus
;
// for tmq
void
*
metaBlk
;
// for tmq fetching meta
SSDataBlock
*
pullOverBlk
;
// for streaming
SWalFilterCond
cond
;
}
SStreamTaskInfo
;
typedef
struct
SExecTaskInfo
{
STaskIdInfo
id
;
uint32_t
status
;
STimeWindow
window
;
STaskCostInfo
cost
;
int64_t
owner
;
// if it is in execution
int32_t
code
;
STaskIdInfo
id
;
uint32_t
status
;
STimeWindow
window
;
STaskCostInfo
cost
;
int64_t
owner
;
// if it is in execution
int32_t
code
;
SStreamTaskInfo
streamInfo
;
struct
{
char
*
tablename
;
char
*
dbname
;
int32_t
tversion
;
SSchemaWrapper
*
sw
;
char
*
tablename
;
char
*
dbname
;
int32_t
tversion
;
SSchemaWrapper
*
sw
;
}
schemaVer
;
STableListInfo
tableqinfoList
;
// this is a table list
const
char
*
sql
;
// query sql string
jmp_buf
env
;
// jump to this position when error happens.
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
STableListInfo
tableqinfoList
;
// this is a table list
const
char
*
sql
;
// query sql string
jmp_buf
env
;
// jump to this position when error happens.
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
struct
SOperatorInfo
*
pRoot
;
}
SExecTaskInfo
;
...
...
@@ -168,36 +178,36 @@ enum {
};
typedef
struct
SOperatorFpSet
{
__optr_open_fn_t
_openFn
;
// DO NOT invoke this function directly
__optr_fn_t
getNextFn
;
__optr_fn_t
getStreamResFn
;
// execute the aggregate in the stream model, todo remove it
__optr_fn_t
cleanupFn
;
// call this function to release the allocated resources ASAP
__optr_close_fn_t
closeFn
;
__optr_encode_fn_t
encodeResultRow
;
__optr_decode_fn_t
decodeResultRow
;
__optr_explain_fn_t
getExplainFn
;
__optr_open_fn_t
_openFn
;
// DO NOT invoke this function directly
__optr_fn_t
getNextFn
;
__optr_fn_t
getStreamResFn
;
// execute the aggregate in the stream model, todo remove it
__optr_fn_t
cleanupFn
;
// call this function to release the allocated resources ASAP
__optr_close_fn_t
closeFn
;
__optr_encode_fn_t
encodeResultRow
;
__optr_decode_fn_t
decodeResultRow
;
__optr_explain_fn_t
getExplainFn
;
}
SOperatorFpSet
;
typedef
struct
SExprSupp
{
SExprInfo
*
pExprInfo
;
int32_t
numOfExprs
;
// the number of scalar expression in group operator
int32_t
numOfExprs
;
// the number of scalar expression in group operator
SqlFunctionCtx
*
pCtx
;
int32_t
*
rowEntryInfoOffset
;
// offset value for each row result cell info
}
SExprSupp
;
typedef
struct
SOperatorInfo
{
uint8_t
operatorType
;
bool
blocking
;
// block operator or not
uint8_t
status
;
// denote if current operator is completed
char
*
name
;
// name, for debug purpose
void
*
info
;
// extension attribution
SExprSupp
exprSupp
;
SExecTaskInfo
*
pTaskInfo
;
SOperatorCostInfo
cost
;
SResultInfo
resultInfo
;
struct
SOperatorInfo
**
pDownstream
;
// downstram pointer list
int32_t
numOfDownstream
;
// number of downstream. The value is always ONE expect for join operator
SOperatorFpSet
fpSet
;
uint8_t
operatorType
;
bool
blocking
;
// block operator or not
uint8_t
status
;
// denote if current operator is completed
char
*
name
;
// name, for debug purpose
void
*
info
;
// extension attribution
SExprSupp
exprSupp
;
SExecTaskInfo
*
pTaskInfo
;
SOperatorCostInfo
cost
;
SResultInfo
resultInfo
;
struct
SOperatorInfo
**
pDownstream
;
// downstram pointer list
int32_t
numOfDownstream
;
// number of downstream. The value is always ONE expect for join operator
SOperatorFpSet
fpSet
;
}
SOperatorInfo
;
typedef
enum
{
...
...
@@ -210,12 +220,12 @@ typedef enum {
#define COL_MATCH_FROM_SLOT_ID 0x2
typedef
struct
SSourceDataInfo
{
int32_t
index
;
SRetrieveTableRsp
*
pRsp
;
uint64_t
totalRows
;
int32_t
code
;
EX_SOURCE_STATUS
status
;
const
char
*
taskId
;
int32_t
index
;
SRetrieveTableRsp
*
pRsp
;
uint64_t
totalRows
;
int32_t
code
;
EX_SOURCE_STATUS
status
;
const
char
*
taskId
;
}
SSourceDataInfo
;
typedef
struct
SLoadRemoteDataInfo
{
...
...
@@ -325,10 +335,10 @@ typedef enum EStreamScanMode {
}
EStreamScanMode
;
typedef
struct
SCatchSupporter
{
SHashObj
*
pWindowHashTable
;
// quick locate the window object for each window
SDiskbasedBuf
*
pDataBuf
;
// buffer based on blocked-wised disk file
int32_t
keySize
;
int64_t
*
pKeyBuf
;
SHashObj
*
pWindowHashTable
;
// quick locate the window object for each window
SDiskbasedBuf
*
pDataBuf
;
// buffer based on blocked-wised disk file
int32_t
keySize
;
int64_t
*
pKeyBuf
;
}
SCatchSupporter
;
typedef
struct
SStreamAggSupporter
{
...
...
@@ -344,48 +354,48 @@ typedef struct SStreamAggSupporter {
typedef
struct
SessionWindowSupporter
{
SStreamAggSupporter
*
pStreamAggSup
;
int64_t
gap
;
uint8_t
parentType
;
int64_t
gap
;
uint8_t
parentType
;
}
SessionWindowSupporter
;
typedef
struct
SStreamScanInfo
{
uint64_t
tableUid
;
// queried super table uid
SExprInfo
*
pPseudoExpr
;
int32_t
numOfPseudoExpr
;
int32_t
primaryTsIndex
;
// primary time stamp slot id
SReadHandle
readHandle
;
SInterval
interval
;
// if the upstream is an interval operator, the interval info is also kept here.
SArray
*
pColMatchInfo
;
//
SNode
*
pCondition
;
SArray
*
pBlockLists
;
// multiple SSDatablock.
SSDataBlock
*
pRes
;
// result SSDataBlock
SSDataBlock
*
pUpdateRes
;
// update SSDataBlock
int32_t
updateResIndex
;
int32_t
blockType
;
// current block type
int32_t
validBlockIndex
;
// Is current data has returned?
uint64_t
numOfExec
;
// execution times
STqReader
*
tqReader
;
int32_t
tsArrayIndex
;
SArray
*
tsArray
;
uint64_t
groupId
;
SUpdateInfo
*
pUpdateInfo
;
uint64_t
tableUid
;
// queried super table uid
SExprInfo
*
pPseudoExpr
;
int32_t
numOfPseudoExpr
;
int32_t
primaryTsIndex
;
// primary time stamp slot id
SReadHandle
readHandle
;
SInterval
interval
;
// if the upstream is an interval operator, the interval info is also kept here.
SArray
*
pColMatchInfo
;
//
SNode
*
pCondition
;
EStreamScanMode
scanMode
;
SOperatorInfo
*
pStreamScanOp
;
SOperatorInfo
*
pTableScanOp
;
SArray
*
childIds
;
SArray
*
pBlockLists
;
// multiple SSDatablock.
SSDataBlock
*
pRes
;
// result SSDataBlock
SSDataBlock
*
pUpdateRes
;
// update SSDataBlock
int32_t
updateResIndex
;
int32_t
blockType
;
// current block type
int32_t
validBlockIndex
;
// Is current data has returned?
uint64_t
numOfExec
;
// execution times
STqReader
*
tqReader
;
int32_t
tsArrayIndex
;
SArray
*
tsArray
;
uint64_t
groupId
;
SUpdateInfo
*
pUpdateInfo
;
EStreamScanMode
scanMode
;
SOperatorInfo
*
pStreamScanOp
;
SOperatorInfo
*
pTableScanOp
;
SArray
*
childIds
;
SessionWindowSupporter
sessionSup
;
bool
assignBlockUid
;
// assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t
scanWinIndex
;
// for state operator
int32_t
pullDataResIndex
;
SSDataBlock
*
pPullDataRes
;
// pull data SSDataBlock
SSDataBlock
*
pDeleteDataRes
;
// delete data SSDataBlock
int32_t
deleteDataIndex
;
bool
assignBlockUid
;
// assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t
scanWinIndex
;
// for state operator
int32_t
pullDataResIndex
;
SSDataBlock
*
pPullDataRes
;
// pull data SSDataBlock
SSDataBlock
*
pDeleteDataRes
;
// delete data SSDataBlock
int32_t
deleteDataIndex
;
// status for tmq
//SSchemaWrapper schema;
//
SSchemaWrapper schema;
STqOffset
offset
;
}
SStreamScanInfo
;
...
...
@@ -595,7 +605,7 @@ typedef struct SSessionAggOperatorInfo {
int64_t
gap
;
// session window gap
int32_t
tsSlotId
;
// primary timestamp slot id
STimeWindowAggSupp
twAggSup
;
SNode
*
pCondition
;
const
SNode
*
pCondition
;
}
SSessionAggOperatorInfo
;
typedef
struct
SResultWindowInfo
{
...
...
@@ -657,7 +667,7 @@ typedef struct SStateWindowOperatorInfo {
int32_t
tsSlotId
;
// primary timestamp column slot id
STimeWindowAggSupp
twAggSup
;
// bool reptScan;
const
SNode
*
pCondition
;
const
SNode
*
pCondition
;
}
SStateWindowOperatorInfo
;
typedef
struct
SStreamStateAggOperatorInfo
{
...
...
source/libs/executor/src/executor.c
浏览文件 @
ebc9e438
...
...
@@ -60,9 +60,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayAddAll
(
p
->
pDataBlock
,
pDataBlock
->
pDataBlock
);
taosArrayPush
(
pInfo
->
pBlockLists
,
&
p
);
}
}
else
if
(
type
==
STREAM_INPUT__
DATA
_SCAN
)
{
}
else
if
(
type
==
STREAM_INPUT__
TABLE
_SCAN
)
{
// do nothing
ASSERT
(
pInfo
->
blockType
==
STREAM_INPUT__
DATA
_SCAN
);
ASSERT
(
pInfo
->
blockType
==
STREAM_INPUT__
TABLE
_SCAN
);
}
else
{
ASSERT
(
0
);
}
...
...
@@ -76,7 +76,7 @@ int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
return
TSDB_CODE_QRY_APP_ERROR
;
}
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
return
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
NULL
,
0
,
STREAM_INPUT__
DATA
_SCAN
,
0
,
NULL
);
return
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
NULL
,
0
,
STREAM_INPUT__
TABLE
_SCAN
,
0
,
NULL
);
}
int32_t
qSetStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
input
,
int32_t
type
,
bool
assignUid
)
{
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
ebc9e438
...
...
@@ -267,7 +267,46 @@ const STqOffset* qExtractStatusFromStreamScanner(void* scanner) {
return
&
pInfo
->
offset
;
}
int32_t
qStreamPrepareScan
(
qTaskInfo_t
tinfo
,
uint64_t
uid
,
int64_t
ts
)
{
void
*
qStreamExtractMetaMsg
(
qTaskInfo_t
tinfo
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
);
return
pTaskInfo
->
streamInfo
.
metaBlk
;
}
int32_t
qStreamExtractOffset
(
qTaskInfo_t
tinfo
,
STqOffsetVal
*
pOffset
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
);
memcpy
(
pOffset
,
&
pTaskInfo
->
streamInfo
.
lastStatus
,
sizeof
(
STqOffsetVal
));
return
0
;
}
int32_t
qStreamPrepareScan1
(
qTaskInfo_t
tinfo
,
const
STqOffsetVal
*
pOffset
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SOperatorInfo
*
pOperator
=
pTaskInfo
->
pRoot
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
);
pTaskInfo
->
streamInfo
.
prepareStatus
=
*
pOffset
;
// TODO: optimize
/*if (pTaskInfo->streamInfo.lastStatus.type != pOffset->type ||*/
/*pTaskInfo->streamInfo.prepareStatus.version != pTaskInfo->streamInfo.lastStatus.version) {*/
while
(
1
)
{
uint8_t
type
=
pOperator
->
operatorType
;
pOperator
->
status
=
OP_OPENED
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
if
(
tqSeekVer
(
pInfo
->
tqReader
,
pOffset
->
version
)
<
0
)
{
return
-
1
;
}
return
0
;
}
else
{
ASSERT
(
pOperator
->
numOfDownstream
==
1
);
pOperator
=
pOperator
->
pDownstream
[
0
];
}
}
/*}*/
return
0
;
}
int32_t
qStreamPrepareTsdbScan
(
qTaskInfo_t
tinfo
,
uint64_t
uid
,
int64_t
ts
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
if
(
uid
==
0
)
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
ebc9e438
...
...
@@ -2852,7 +2852,7 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStreamScanInfo
*
pScanInfo
=
pOperator
->
info
;
pScanInfo
->
blockType
=
STREAM_INPUT__
DATA
_SCAN
;
pScanInfo
->
blockType
=
STREAM_INPUT__
TABLE
_SCAN
;
pScanInfo
->
pTableScanOp
->
status
=
OP_OPENED
;
...
...
@@ -3287,7 +3287,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
// TODO optimize
/*if (pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) {*/
doSetOperatorCompleted
(
pOperator
);
/*}*/
break
;
}
if
(
pBlock
->
info
.
type
==
STREAM_RETRIEVE
)
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
ebc9e438
...
...
@@ -210,7 +210,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
bool
allColumnsHaveAgg
=
true
;
SColumnDataAgg
**
pColAgg
=
NULL
;
int32_t
code
=
tsdbRetrieveDatablockSMA
(
pTableScanInfo
->
dataReader
,
&
pColAgg
,
&
allColumnsHaveAgg
);
int32_t
code
=
tsdbRetrieveDatablockSMA
(
pTableScanInfo
->
dataReader
,
&
pColAgg
,
&
allColumnsHaveAgg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -595,7 +595,7 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
if
(
pTableScanInfo
->
pColMatchInfo
!=
NULL
)
{
taosArrayDestroy
(
pTableScanInfo
->
pColMatchInfo
);
}
taosMemoryFreeClear
(
param
);
}
...
...
@@ -745,7 +745,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
static
void
destroyBlockDistScanOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SBlockDistInfo
*
pDistInfo
=
(
SBlockDistInfo
*
)
param
;
blockDataDestroy
(
pDistInfo
->
pResBlock
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -1130,15 +1130,121 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32
uidCol
[
i
]
=
getGroupId
(
pOperator
,
uidCol
[
i
]);
}
}
static
int32_t
setBlockIntoRes
(
SStreamScanInfo
*
pInfo
,
const
SSDataBlock
*
pBlock
)
{
SDataBlockInfo
*
pBlockInfo
=
&
pInfo
->
pRes
->
info
;
SOperatorInfo
*
pOperator
=
pInfo
->
pStreamScanOp
;
SExecTaskInfo
*
pTaskInfo
=
pInfo
->
pStreamScanOp
->
pTaskInfo
;
pInfo
->
pRes
->
info
.
rows
=
pBlock
->
info
.
rows
;
pInfo
->
pRes
->
info
.
uid
=
pBlock
->
info
.
uid
;
pInfo
->
pRes
->
info
.
type
=
STREAM_NORMAL
;
pInfo
->
pRes
->
info
.
capacity
=
pBlock
->
info
.
rows
;
// for generating rollup SMA result, each time is an independent time serie.
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
if
(
pInfo
->
assignBlockUid
)
{
pInfo
->
pRes
->
info
.
groupId
=
pBlock
->
info
.
uid
;
}
uint64_t
*
groupIdPre
=
taosHashGet
(
pOperator
->
pTaskInfo
->
tableqinfoList
.
map
,
&
pBlock
->
info
.
uid
,
sizeof
(
int64_t
));
if
(
groupIdPre
)
{
pInfo
->
pRes
->
info
.
groupId
=
*
groupIdPre
;
}
else
{
pInfo
->
pRes
->
info
.
groupId
=
0
;
}
// todo extract method
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
pColMatchInfo
);
++
i
)
{
SColMatchInfo
*
pColMatchInfo
=
taosArrayGet
(
pInfo
->
pColMatchInfo
,
i
);
if
(
!
pColMatchInfo
->
output
)
{
continue
;
}
bool
colExists
=
false
;
for
(
int32_t
j
=
0
;
j
<
blockDataGetNumOfCols
(
pBlock
);
++
j
)
{
SColumnInfoData
*
pResCol
=
bdGetColumnInfoData
(
pBlock
,
j
);
if
(
pResCol
->
info
.
colId
==
pColMatchInfo
->
colId
)
{
taosArraySet
(
pInfo
->
pRes
->
pDataBlock
,
pColMatchInfo
->
targetSlotId
,
pResCol
);
colExists
=
true
;
break
;
}
}
// the required column does not exists in submit block, let's set it to be all null value
if
(
!
colExists
)
{
SColumnInfoData
*
pDst
=
taosArrayGet
(
pInfo
->
pRes
->
pDataBlock
,
pColMatchInfo
->
targetSlotId
);
colDataAppendNNULL
(
pDst
,
0
,
pBlockInfo
->
rows
);
}
}
taosArrayDestroy
(
pBlock
->
pDataBlock
);
ASSERT
(
pInfo
->
pRes
->
pDataBlock
!=
NULL
);
#if 0
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
pOperator->status = OP_EXEC_DONE;
pTaskInfo->code = terrno;
return -1;
}
#endif
// currently only the tbname pseudo column
if
(
pInfo
->
numOfPseudoExpr
>
0
)
{
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
pInfo
->
pRes
);
}
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
);
blockDataUpdateTsWindow
(
pInfo
->
pRes
,
pInfo
->
primaryTsIndex
);
if
(
pBlockInfo
->
rows
>
0
)
{
return
0
;
}
return
0
;
}
static
SSDataBlock
*
doStreamScan
(
SOperatorInfo
*
pOperator
)
{
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
pTaskInfo
->
code
=
pOperator
->
fpSet
.
_openFn
(
pOperator
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
||
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
/*pTaskInfo->code = pOperator->fpSet._openFn(pOperator);*/
/*if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {*/
/*return NULL;*/
/*}*/
if
(
pTaskInfo
->
streamInfo
.
prepareStatus
.
type
==
TMQ_OFFSET__LOG
)
{
while
(
1
)
{
SFetchRet
ret
=
{
0
};
tqNextBlock
(
pInfo
->
tqReader
,
&
ret
);
if
(
ret
.
fetchType
==
FETCH_TYPE__DATA
)
{
blockDataCleanup
(
pInfo
->
pRes
);
if
(
setBlockIntoRes
(
pInfo
,
&
ret
.
data
)
<
0
)
{
ASSERT
(
0
);
}
/*pTaskInfo->streamInfo.lastStatus = ret.offset;*/
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
return
pInfo
->
pRes
;
/*} else {*/
/*tDeleteSSDataBlock(&ret.data);*/
}
}
else
if
(
ret
.
fetchType
==
FETCH_TYPE__META
)
{
ASSERT
(
0
);
pTaskInfo
->
streamInfo
.
lastStatus
=
ret
.
offset
;
pTaskInfo
->
streamInfo
.
metaBlk
=
ret
.
meta
;
return
NULL
;
}
else
if
(
ret
.
fetchType
==
FETCH_TYPE__NONE
)
{
if
(
ret
.
offset
.
version
==
-
1
)
{
pTaskInfo
->
streamInfo
.
lastStatus
.
type
=
TMQ_OFFSET__LOG
;
pTaskInfo
->
streamInfo
.
lastStatus
.
version
=
pTaskInfo
->
streamInfo
.
prepareStatus
.
version
-
1
;
}
else
{
pTaskInfo
->
streamInfo
.
lastStatus
=
ret
.
offset
;
}
return
NULL
;
}
else
{
ASSERT
(
0
);
}
}
}
size_t
total
=
taosArrayGetSize
(
pInfo
->
pBlockLists
);
...
...
@@ -1146,7 +1252,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if
(
pInfo
->
blockType
==
STREAM_INPUT__DATA_BLOCK
)
{
if
(
pInfo
->
validBlockIndex
>=
total
)
{
/*doClearBufferedBlocks(pInfo);*/
pOperator
->
status
=
OP_EXEC_DONE
;
/*pOperator->status = OP_EXEC_DONE;*/
return
NULL
;
}
...
...
@@ -1255,8 +1361,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo
->
pRes
->
info
.
type
=
STREAM_NORMAL
;
pInfo
->
pRes
->
info
.
capacity
=
block
.
info
.
rows
;
uint64_t
*
groupIdPre
=
taosHashGet
(
pOperator
->
pTaskInfo
->
tableqinfoList
.
map
,
&
block
.
info
.
uid
,
sizeof
(
int64_t
));
if
(
groupIdPre
)
{
pInfo
->
pRes
->
info
.
groupId
=
*
groupIdPre
;
...
...
@@ -1295,6 +1399,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
taosArrayDestroy
(
block
.
pDataBlock
);
ASSERT
(
pInfo
->
pRes
->
pDataBlock
!=
NULL
);
#if 0
if (pInfo->pRes->pDataBlock == NULL) {
// TODO add log
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
...
...
@@ -1302,6 +1409,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTaskInfo->code = terrno;
return NULL;
}
#endif
// currently only the tbname pseudo column
if
(
pInfo
->
numOfPseudoExpr
>
0
)
{
...
...
@@ -1321,7 +1429,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if
(
pBlockInfo
->
rows
==
0
)
{
updateInfoDestoryColseWinSBF
(
pInfo
->
pUpdateInfo
);
pOperator
->
status
=
OP_EXEC_DONE
;
/*pOperator->status = OP_EXEC_DONE;*/
}
else
if
(
pInfo
->
pUpdateInfo
)
{
pInfo
->
tsArrayIndex
=
0
;
checkUpdateData
(
pInfo
,
true
,
pInfo
->
pRes
,
true
);
...
...
@@ -1339,7 +1447,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return
(
pBlockInfo
->
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
else
if
(
pInfo
->
blockType
==
STREAM_INPUT__
DATA
_SCAN
)
{
}
else
if
(
pInfo
->
blockType
==
STREAM_INPUT__
TABLE
_SCAN
)
{
// check reader last status
// if not match, reset status
SSDataBlock
*
pResult
=
doTableScan
(
pInfo
->
pTableScanOp
);
...
...
@@ -2177,7 +2285,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
static
void
destroyTagScanOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
STagScanInfo
*
pInfo
=
(
STagScanInfo
*
)
param
;
pInfo
->
pRes
=
blockDataDestroy
(
pInfo
->
pRes
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -2669,7 +2777,7 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
pTableScanInfo
->
pSortInputBlock
=
blockDataDestroy
(
pTableScanInfo
->
pSortInputBlock
);
taosArrayDestroy
(
pTableScanInfo
->
pSortInfo
);
taosMemoryFreeClear
(
param
);
}
...
...
source/libs/wal/src/walRead.c
浏览文件 @
ebc9e438
...
...
@@ -147,9 +147,10 @@ static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
return
0
;
}
static
int32_t
walReadSeekVer
(
SWalReader
*
pRead
,
int64_t
ver
)
{
int32_t
walReadSeekVer
(
SWalReader
*
pRead
,
int64_t
ver
)
{
SWal
*
pWal
=
pRead
->
pWal
;
if
(
ver
==
pRead
->
curVersion
)
{
wDebug
(
"wal version %ld match, no need to reset"
,
ver
);
return
0
;
}
if
(
ver
>
pWal
->
vers
.
lastVer
||
ver
<
pWal
->
vers
.
firstVer
)
{
...
...
@@ -177,6 +178,8 @@ static int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
return
-
1
;
}
wDebug
(
"wal version reset from %ld to %ld"
,
pRead
->
curVersion
,
ver
);
pRead
->
curVersion
=
ver
;
return
0
;
...
...
@@ -187,7 +190,10 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity
static
int32_t
walFetchHeadNew
(
SWalReader
*
pRead
,
int64_t
fetchVer
)
{
int64_t
contLen
;
if
(
pRead
->
curVersion
!=
fetchVer
)
{
if
(
walReadSeekVer
(
pRead
,
fetchVer
)
<
0
)
return
-
1
;
if
(
walReadSeekVer
(
pRead
,
fetchVer
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
}
contLen
=
taosReadFile
(
pRead
->
pLogFile
,
pRead
->
pHead
,
sizeof
(
SWalCkHead
));
if
(
contLen
!=
sizeof
(
SWalCkHead
))
{
...
...
@@ -196,6 +202,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
}
else
{
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
ASSERT
(
0
);
pRead
->
curVersion
=
-
1
;
return
-
1
;
}
...
...
@@ -249,6 +256,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
}
pRead
->
curVersion
=
ver
+
1
;
wDebug
(
"version advance to %ld, fetch body"
,
pRead
->
curVersion
);
return
0
;
}
...
...
@@ -261,10 +269,12 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
if
(
code
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
pRead
->
curVersion
=
-
1
;
ASSERT
(
0
);
return
-
1
;
}
pRead
->
curVersion
++
;
wDebug
(
"version advance to %ld, skip fetch"
,
pRead
->
curVersion
);
return
0
;
}
...
...
@@ -355,22 +365,6 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
return
0
;
}
int32_t
walReadWithHandle_s
(
SWalReader
*
pRead
,
int64_t
ver
,
SWalCont
**
ppHead
)
{
taosThreadMutexLock
(
&
pRead
->
mutex
);
if
(
walReadVer
(
pRead
,
ver
)
<
0
)
{
taosThreadMutexUnlock
(
&
pRead
->
mutex
);
return
-
1
;
}
*
ppHead
=
taosMemoryMalloc
(
sizeof
(
SWalCont
)
+
pRead
->
pHead
->
head
.
bodyLen
);
if
(
*
ppHead
==
NULL
)
{
taosThreadMutexUnlock
(
&
pRead
->
mutex
);
return
-
1
;
}
memcpy
(
*
ppHead
,
&
pRead
->
pHead
->
head
,
sizeof
(
SWalCont
)
+
pRead
->
pHead
->
head
.
bodyLen
);
taosThreadMutexUnlock
(
&
pRead
->
mutex
);
return
0
;
}
int32_t
walReadVer
(
SWalReader
*
pRead
,
int64_t
ver
)
{
int64_t
code
;
...
...
tests/script/jenkins/basic.txt
浏览文件 @
ebc9e438
...
...
@@ -80,7 +80,7 @@
./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim
./test.sh -f tsim/mnode/basic3.sim
./test.sh -f tsim/mnode/basic4.sim
#
./test.sh -f tsim/mnode/basic4.sim
./test.sh -f tsim/mnode/basic5.sim
# ---- show
...
...
tests/system-test/failed.txt
浏览文件 @
ebc9e438
#python3 ./test.py -f 2-query/last.py -Q 3
#./test.sh -f tsim/mnode/basic4.sim
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录