Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
095f6aa4
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
095f6aa4
编写于
7月 23, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(tmq): correctly set reader status
上级
168340da
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
42 addition
and
12 deletion
+42
-12
include/libs/wal/wal.h
include/libs/wal/wal.h
+1
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+8
-0
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+10
-1
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+4
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+11
-8
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+8
-2
未找到文件。
include/libs/wal/wal.h
浏览文件 @
095f6aa4
...
...
@@ -135,6 +135,7 @@ typedef struct {
int64_t
curVersion
;
int64_t
capacity
;
int8_t
curInvalid
;
int8_t
curStopped
;
TdThreadMutex
mutex
;
SWalFilterCond
cond
;
SWalCkHead
*
pHead
;
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
095f6aa4
...
...
@@ -138,6 +138,14 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockSchema
)
==
0
);
}
if
(
pRsp
->
reqOffset
.
type
==
TMQ_OFFSET__LOG
)
{
if
(
pRsp
->
blockNum
>
0
)
{
ASSERT
(
pRsp
->
rspOffset
.
version
>
pRsp
->
reqOffset
.
version
);
}
else
{
ASSERT
(
pRsp
->
rspOffset
.
version
>=
pRsp
->
reqOffset
.
version
);
}
}
int32_t
len
;
int32_t
code
;
tEncodeSize
(
tEncodeSMqDataRsp
,
pRsp
,
len
,
code
);
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
095f6aa4
...
...
@@ -65,12 +65,14 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
qTaskInfo_t
task
=
pExec
->
execCol
.
task
;
if
(
qStreamPrepareScan
(
task
,
pOffset
)
<
0
)
{
tqDebug
(
"prepare scan failed, return"
);
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
pRsp
->
rspOffset
=
*
pOffset
;
return
0
;
}
else
{
tqOffsetResetToLog
(
pOffset
,
pHandle
->
snapshotVer
);
if
(
qStreamPrepareScan
(
task
,
pOffset
)
<
0
)
{
tqDebug
(
"prepare scan failed, return"
);
pRsp
->
rspOffset
=
*
pOffset
;
return
0
;
}
...
...
@@ -126,9 +128,16 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
ASSERT
(
pRsp
->
rspOffset
.
type
!=
0
);
#if 0
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
ASSERT
(
pRsp
->
rspOffset
.
version
+
1
>=
pRsp
->
reqOffset
.
version
);
if (pRsp->blockNum > 0) {
ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
} else {
ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
}
}
#endif
tqDebug
(
"task exec exited"
);
break
;
}
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
095f6aa4
...
...
@@ -132,10 +132,12 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
while
(
1
)
{
if
(
!
fromProcessedMsg
)
{
if
(
walNextValidMsg
(
pReader
->
pWalReader
)
<
0
)
{
pReader
->
ver
=
pReader
->
pWalReader
->
curVersion
-
pReader
->
pWalReader
->
curInvalid
;
pReader
->
ver
=
pReader
->
pWalReader
->
curVersion
-
(
pReader
->
pWalReader
->
curInvalid
|
pReader
->
pWalReader
->
curStopped
);
ret
->
offset
.
type
=
TMQ_OFFSET__LOG
;
ret
->
offset
.
version
=
pReader
->
ver
;
ret
->
fetchType
=
FETCH_TYPE__NONE
;
tqDebug
(
"return offset %ld, no more valid"
,
ret
->
offset
.
version
);
ASSERT
(
ret
->
offset
.
version
>=
0
);
return
-
1
;
}
...
...
@@ -167,6 +169,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
ret
->
offset
.
version
=
pReader
->
ver
;
ASSERT
(
pReader
->
ver
>=
0
);
ret
->
fetchType
=
FETCH_TYPE__NONE
;
tqDebug
(
"return offset %ld, processed finish"
,
ret
->
offset
.
version
);
return
0
;
}
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
095f6aa4
...
...
@@ -946,7 +946,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
}
blockDataCleanup
(
pDestBlock
);
int32_t
code
=
blockDataEnsureCapacity
(
pDestBlock
,
pSrcBlock
->
info
.
rows
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
ASSERT
(
taosArrayGetSize
(
pSrcBlock
->
pDataBlock
)
>=
3
);
...
...
@@ -994,16 +994,16 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
SColumnInfoData
*
pTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pUidCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
uint64_t
*
uidCol
=
(
uint64_t
*
)
pUidCol
->
pData
;
uint64_t
*
uidCol
=
(
uint64_t
*
)
pUidCol
->
pData
;
ASSERT
(
pTsCol
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
TSKEY
*
tsCol
=
(
TSKEY
*
)
pTsCol
->
pData
;
TSKEY
*
tsCol
=
(
TSKEY
*
)
pTsCol
->
pData
;
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pEndTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pGpCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
SColumnInfoData
*
pCalStartTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pCalEndTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_END_TS_COLUMN_INDEX
);
uint64_t
groupId
=
getGroupId
(
pInfo
->
pTableScanOp
,
uidCol
[
0
]);
for
(
int32_t
i
=
0
;
i
<
rows
;
)
{
uint64_t
groupId
=
getGroupId
(
pInfo
->
pTableScanOp
,
uidCol
[
0
]);
for
(
int32_t
i
=
0
;
i
<
rows
;)
{
colDataAppend
(
pCalStartTsCol
,
pDestBlock
->
info
.
rows
,
(
const
char
*
)(
tsCol
+
i
),
false
);
STimeWindow
win
=
getSlidingWindow
(
tsCol
,
&
pInfo
->
interval
,
&
pSrcBlock
->
info
,
&
i
);
colDataAppend
(
pCalEndTsCol
,
pDestBlock
->
info
.
rows
,
(
const
char
*
)(
tsCol
+
i
-
1
),
false
);
...
...
@@ -1167,8 +1167,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return
NULL
;
}
else
if
(
ret
.
fetchType
==
FETCH_TYPE__NONE
)
{
pTaskInfo
->
streamInfo
.
lastStatus
=
ret
.
offset
;
ASSERT
(
pTaskInfo
->
streamInfo
.
lastStatus
.
version
+
1
>=
pTaskInfo
->
streamInfo
.
prepareStatus
.
version
);
qDebug
(
"stream scan log return null"
);
ASSERT
(
pTaskInfo
->
streamInfo
.
lastStatus
.
version
>=
pTaskInfo
->
streamInfo
.
prepareStatus
.
version
);
ASSERT
(
pTaskInfo
->
streamInfo
.
lastStatus
.
version
+
1
==
pInfo
->
tqReader
->
pWalReader
->
curVersion
);
char
formatBuf
[
80
];
tFormatOffset
(
formatBuf
,
80
,
&
ret
.
offset
);
qDebug
(
"stream scan log return null, offset %s"
,
formatBuf
);
return
NULL
;
}
else
{
ASSERT
(
0
);
...
...
@@ -1272,7 +1275,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
default:
break
;
}
SStreamAggSupporter
*
pSup
=
pInfo
->
sessionSup
.
pStreamAggSup
;
if
(
isStateWindow
(
pInfo
)
&&
pSup
->
pScanBlock
->
info
.
rows
>
0
)
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER_RANGE
;
...
...
source/libs/wal/src/walRead.c
浏览文件 @
095f6aa4
...
...
@@ -21,7 +21,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead);
static
int32_t
walSkipFetchBodyNew
(
SWalReader
*
pRead
);
SWalReader
*
walOpenReader
(
SWal
*
pWal
,
SWalFilterCond
*
cond
)
{
SWalReader
*
pRead
=
taosMemory
Malloc
(
sizeof
(
SWalReader
));
SWalReader
*
pRead
=
taosMemory
Calloc
(
1
,
sizeof
(
SWalReader
));
if
(
pRead
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
...
...
@@ -75,6 +75,7 @@ int32_t walNextValidMsg(SWalReader *pRead) {
wDebug
(
"vgId:%d wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld"
,
pRead
->
pWal
->
cfg
.
vgId
,
fetchVer
,
lastVer
,
committedVer
,
appliedVer
,
endVer
);
pRead
->
curStopped
=
0
;
while
(
fetchVer
<=
endVer
)
{
if
(
walFetchHeadNew
(
pRead
,
fetchVer
)
<
0
)
{
return
-
1
;
...
...
@@ -93,6 +94,7 @@ int32_t walNextValidMsg(SWalReader *pRead) {
ASSERT
(
fetchVer
==
pRead
->
curVersion
);
}
}
pRead
->
curStopped
=
1
;
return
-
1
;
}
...
...
@@ -221,6 +223,8 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
int64_t
contLen
;
bool
seeked
=
false
;
wDebug
(
"vgId:%d, wal starts to fetch head %d"
,
pRead
->
pWal
->
cfg
.
vgId
,
fetchVer
);
if
(
pRead
->
curInvalid
||
pRead
->
curVersion
!=
fetchVer
)
{
if
(
walReadSeekVer
(
pRead
,
fetchVer
)
<
0
)
{
ASSERT
(
0
);
...
...
@@ -257,6 +261,8 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
SWalCont
*
pReadHead
=
&
pRead
->
pHead
->
head
;
int64_t
ver
=
pReadHead
->
version
;
wDebug
(
"vgId:%d, wal starts to fetch body %ld"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
if
(
pRead
->
capacity
<
pReadHead
->
bodyLen
)
{
void
*
ptr
=
taosMemoryRealloc
(
pRead
->
pHead
,
sizeof
(
SWalCkHead
)
+
pReadHead
->
bodyLen
);
if
(
ptr
==
NULL
)
{
...
...
@@ -300,8 +306,8 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
return
-
1
;
}
wDebug
(
"version %ld is fetched, cursor advance"
,
ver
);
pRead
->
curVersion
=
ver
+
1
;
wDebug
(
"version advance to %ld, fetch body"
,
pRead
->
curVersion
);
return
0
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录