Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8afb5454
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8afb5454
编写于
7月 10, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(stream)
上级
e9077c03
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
40 addition
and
36 deletion
+40
-36
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+3
-3
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+28
-21
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+9
-12
未找到文件。
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
8afb5454
...
...
@@ -22,8 +22,8 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
while
(
1
)
{
if
(
walFetchHead
(
pHandle
->
pWalReader
,
offset
,
*
ppCkHead
)
<
0
)
{
tqDebug
(
"tmq poll: consumer:%"
PRId64
", (epoch %d) vgId:%d offset %"
PRId64
", no more log to return"
,
pHandle
->
consumerId
,
pHandle
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
offset
);
tqDebug
(
"tmq poll: consumer:%"
PRId64
", (epoch %d) vgId:%d offset %"
PRId64
", no more log to return"
,
pHandle
->
consumerId
,
pHandle
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
offset
);
*
fetchOffset
=
offset
-
1
;
code
=
-
1
;
goto
END
;
...
...
@@ -119,7 +119,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
while
(
1
)
{
if
(
!
fromProcessedMsg
)
{
if
(
walNextValidMsg
(
pReader
->
pWalReader
)
<
0
)
{
pReader
->
ver
=
pReader
->
pWalReader
->
curVersion
;
pReader
->
ver
=
pReader
->
pWalReader
->
curVersion
-
pReader
->
pWalReader
->
curInvalid
;
ret
->
offset
.
type
=
TMQ_OFFSET__LOG
;
ret
->
offset
.
version
=
pReader
->
ver
;
ret
->
fetchType
=
FETCH_TYPE__NONE
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
8afb5454
...
...
@@ -40,8 +40,8 @@ static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta*
const
char
*
dbName
);
static
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
SExprInfo
*
pPseudoExpr
,
int32_t
numOfPseudoExpr
,
SSDataBlock
*
pBlock
,
const
char
*
idStr
);
static
bool
processBlockWithProbability
(
const
SSampleExecInfo
*
pInfo
);
SSDataBlock
*
pBlock
,
const
char
*
idStr
);
static
bool
processBlockWithProbability
(
const
SSampleExecInfo
*
pInfo
);
bool
processBlockWithProbability
(
const
SSampleExecInfo
*
pInfo
)
{
#if 0
...
...
@@ -265,7 +265,8 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
SExprSupp
*
pSup
=
&
pTableScanInfo
->
pseudoSup
;
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -303,7 +304,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
}
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
SExprInfo
*
pPseudoExpr
,
int32_t
numOfPseudoExpr
,
SSDataBlock
*
pBlock
,
const
char
*
idStr
)
{
SSDataBlock
*
pBlock
,
const
char
*
idStr
)
{
// currently only the tbname pseudo column
if
(
numOfPseudoExpr
==
0
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -313,7 +314,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
metaReaderInit
(
&
mr
,
pHandle
->
meta
,
0
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
pBlock
->
info
.
uid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
pBlock
->
info
.
uid
,
tstrerror
(
terrno
),
idStr
);
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
pBlock
->
info
.
uid
,
tstrerror
(
terrno
),
idStr
);
metaReaderClear
(
&
mr
);
return
terrno
;
}
...
...
@@ -697,7 +698,7 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, con
metaReaderInit
(
&
mr
,
pMeta
,
0
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
uid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
uid
,
tstrerror
(
terrno
),
idstr
);
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
uid
,
tstrerror
(
terrno
),
idstr
);
metaReaderClear
(
&
mr
);
return
terrno
;
}
...
...
@@ -711,7 +712,7 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, con
uint64_t
suid
=
mr
.
me
.
ctbEntry
.
suid
;
code
=
metaGetTableEntryByUid
(
&
mr
,
suid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
suid
,
tstrerror
(
terrno
),
idstr
);
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
suid
,
tstrerror
(
terrno
),
idstr
);
metaReaderClear
(
&
mr
);
return
terrno
;
}
...
...
@@ -738,12 +739,13 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
}
SBlockDistInfo
*
pBlockScanInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
STableBlockDistInfo
blockDistInfo
=
{.
minRows
=
INT_MAX
,
.
maxRows
=
INT_MIN
};
int32_t
code
=
doGetTableRowSize
(
pBlockScanInfo
->
readHandle
.
meta
,
pBlockScanInfo
->
uid
,
&
blockDistInfo
.
rowSize
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
doGetTableRowSize
(
pBlockScanInfo
->
readHandle
.
meta
,
pBlockScanInfo
->
uid
,
&
blockDistInfo
.
rowSize
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
longjmp
(
pTaskInfo
->
env
,
code
);
}
tsdbGetFileBlocksDistInfo
(
pBlockScanInfo
->
pHandle
,
&
blockDistInfo
);
...
...
@@ -938,7 +940,8 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t
setGroupId
(
pInfo
,
pSDB
,
GROUPID_COLUMN_INDEX
,
*
pRowIndex
);
(
*
pRowIndex
)
+=
updateSessionWindowInfo
(
pCurWin
,
tsCols
,
NULL
,
pSDB
->
info
.
rows
,
*
pRowIndex
,
gap
,
NULL
);
}
else
{
win
=
getActiveTimeWindow
(
NULL
,
&
dumyInfo
,
tsCols
[
*
pRowIndex
],
&
pInfo
->
interval
,
pInfo
->
interval
.
precision
,
TSDB_ORDER_ASC
);
win
=
getActiveTimeWindow
(
NULL
,
&
dumyInfo
,
tsCols
[
*
pRowIndex
],
&
pInfo
->
interval
,
pInfo
->
interval
.
precision
,
TSDB_ORDER_ASC
);
setGroupId
(
pInfo
,
pSDB
,
GROUPID_COLUMN_INDEX
,
*
pRowIndex
);
(
*
pRowIndex
)
+=
getNumOfRowsInTimeWindow
(
&
pSDB
->
info
,
tsCols
,
*
pRowIndex
,
win
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
...
...
@@ -1219,7 +1222,8 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
// currently only the tbname pseudo column
if
(
pInfo
->
numOfPseudoExpr
>
0
)
{
int32_t
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
pInfo
->
pRes
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
pInfo
->
pRes
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -1269,7 +1273,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
/*pTaskInfo->streamInfo.lastStatus.version = pTaskInfo->streamInfo.prepareStatus.version - 1;*/
/*} else {*/
pTaskInfo
->
streamInfo
.
lastStatus
=
ret
.
offset
;
ASSERT
(
pTaskInfo
->
streamInfo
.
lastStatus
.
version
>=
pTaskInfo
->
streamInfo
.
prepareStatus
.
version
);
ASSERT
(
pTaskInfo
->
streamInfo
.
lastStatus
.
version
+
1
>=
pTaskInfo
->
streamInfo
.
prepareStatus
.
version
);
/*}*/
return
NULL
;
}
else
{
...
...
@@ -1447,7 +1451,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// currently only the tbname pseudo column
if
(
pInfo
->
numOfPseudoExpr
>
0
)
{
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
pInfo
->
pRes
,
GET_TASKID
(
pTaskInfo
));
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
pInfo
->
pRes
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -1485,7 +1490,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return
(
pBlockInfo
->
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
else
if
(
pInfo
->
blockType
==
STREAM_INPUT__TABLE_SCAN
)
{
ASSERT
(
0
);
/*ASSERT(0);*/
// check reader last status
// if not match, reset status
SSDataBlock
*
pResult
=
doTableScan
(
pInfo
->
pTableScanOp
);
...
...
@@ -1878,9 +1883,10 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
metaReaderInit
(
&
mr
,
pInfo
->
readHandle
.
meta
,
0
);
uint64_t
suid
=
pInfo
->
pCur
->
mr
.
me
.
ctbEntry
.
suid
;
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
suid
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
suid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get super table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
suid
,
tstrerror
(
terrno
),
GET_TASKID
(
pTaskInfo
));
qError
(
"failed to get super table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
suid
,
tstrerror
(
terrno
),
GET_TASKID
(
pTaskInfo
));
metaReaderClear
(
&
mr
);
metaCloseTbCursor
(
pInfo
->
pCur
);
pInfo
->
pCur
=
NULL
;
...
...
@@ -2280,9 +2286,10 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
while
(
pInfo
->
curPos
<
size
&&
count
<
pOperator
->
resultInfo
.
capacity
)
{
STableKeyInfo
*
item
=
taosArrayGet
(
pInfo
->
pTableList
->
pTableList
,
pInfo
->
curPos
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
item
->
uid
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
item
->
uid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
item
->
uid
,
tstrerror
(
terrno
),
GET_TASKID
(
pTaskInfo
));
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
item
->
uid
,
tstrerror
(
terrno
),
GET_TASKID
(
pTaskInfo
));
metaReaderClear
(
&
mr
);
longjmp
(
pTaskInfo
->
env
,
terrno
);
}
...
...
@@ -2570,8 +2577,8 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
// currently only the tbname pseudo column
if
(
pTableScanInfo
->
numOfPseudoExpr
>
0
)
{
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pTableScanInfo
->
pPseudoExpr
,
pTableScanInfo
->
numOfPseudoExpr
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pTableScanInfo
->
pPseudoExpr
,
pTableScanInfo
->
numOfPseudoExpr
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
...
...
source/libs/wal/src/walRead.c
浏览文件 @
8afb5454
...
...
@@ -162,8 +162,8 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
pRead
->
curVersion
=
ver
;
if
(
ver
>
pWal
->
vers
.
lastVer
||
ver
<
pWal
->
vers
.
firstVer
)
{
w
Error
(
"vgId:$d, invalid index:%"
PRId64
", first index:%"
PRId64
", last index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
pWal
->
vers
.
firstVer
,
pWal
->
vers
.
lastVer
);
w
Debug
(
"vgId:%d, invalid index:%"
PRId64
", first index:%"
PRId64
", last index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
pWal
->
vers
.
firstVer
,
pWal
->
vers
.
lastVer
);
terrno
=
TSDB_CODE_WAL_LOG_NOT_EXIST
;
return
-
1
;
}
...
...
@@ -176,13 +176,13 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
SWalFileInfo
*
pRet
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmpInfo
,
compareWalFileInfo
,
TD_LE
);
ASSERT
(
pRet
!=
NULL
);
if
(
pRead
->
curFileFirstVer
!=
pRet
->
firstVer
)
{
// error code set inner
// error code
was
set inner
if
(
walReadChangeFile
(
pRead
,
pRet
->
firstVer
)
<
0
)
{
return
-
1
;
}
}
// error code set inner
// error code
was
set inner
if
(
walReadSeekFilePos
(
pRead
,
pRet
->
firstVer
,
ver
)
<
0
)
{
return
-
1
;
}
...
...
@@ -314,8 +314,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
code
=
walValidHeadCksum
(
pHead
);
if
(
code
!=
0
)
{
wError
(
"vgId:%d, unexpected wal log index:%"
PRId64
", since head checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
wError
(
"vgId:%d, unexpected wal log index:%"
PRId64
", since head checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
...
...
@@ -395,8 +394,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
}
if
(
ver
>
pRead
->
pWal
->
vers
.
lastVer
||
ver
<
pRead
->
pWal
->
vers
.
firstVer
)
{
wError
(
"vgId:%d, invalid index:%"
PRId64
", first index:%"
PRId64
", last index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
lastVer
);
wError
(
"vgId:%d, invalid index:%"
PRId64
", first index:%"
PRId64
", last index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
lastVer
);
terrno
=
TSDB_CODE_WAL_LOG_NOT_EXIST
;
return
-
1
;
}
...
...
@@ -416,8 +415,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
code
=
walValidHeadCksum
(
pRead
->
pHead
);
if
(
code
!=
0
)
{
wError
(
"vgId:%d, unexpected wal log index:%"
PRId64
", since head checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
wError
(
"vgId:%d, unexpected wal log index:%"
PRId64
", since head checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
...
...
@@ -453,8 +451,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
code
=
walValidBodyCksum
(
pRead
->
pHead
);
if
(
code
!=
0
)
{
wError
(
"vgId:%d, unexpected wal log index:%"
PRId64
", since body checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
wError
(
"vgId:%d, unexpected wal log index:%"
PRId64
", since body checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
pRead
->
curInvalid
=
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录