Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e1bd65ec
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
e1bd65ec
编写于
4月 27, 2023
作者:
H
Haojun Liao
提交者:
GitHub
4月 27, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #21105 from taosdata/fix/liaohj_main
fix(query): check the init status for pReader->status.merger TD-23903
上级
4e07f546
3093726c
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
130 addition
and
95 deletion
+130
-95
include/common/tmsg.h
include/common/tmsg.h
+1
-1
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+1
-1
source/common/src/tmsg.c
source/common/src/tmsg.c
+1
-1
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+4
-7
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+2
-2
source/dnode/vnode/src/sma/smaTimeRange.c
source/dnode/vnode/src/sma/smaTimeRange.c
+1
-1
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+16
-18
source/dnode/vnode/src/tq/tqScan.c
source/dnode/vnode/src/tq/tqScan.c
+2
-2
source/dnode/vnode/src/tq/tqSink.c
source/dnode/vnode/src/tq/tqSink.c
+3
-3
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+80
-39
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+1
-1
source/libs/executor/src/dataInserter.c
source/libs/executor/src/dataInserter.c
+3
-3
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+14
-15
source/libs/parser/src/parInsertUtil.c
source/libs/parser/src/parInsertUtil.c
+1
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
e1bd65ec
...
@@ -3424,7 +3424,7 @@ typedef struct {
...
@@ -3424,7 +3424,7 @@ typedef struct {
int32_t
tEncodeSSubmitReq2
(
SEncoder
*
pCoder
,
const
SSubmitReq2
*
pReq
);
int32_t
tEncodeSSubmitReq2
(
SEncoder
*
pCoder
,
const
SSubmitReq2
*
pReq
);
int32_t
tDecodeSSubmitReq2
(
SDecoder
*
pCoder
,
SSubmitReq2
*
pReq
);
int32_t
tDecodeSSubmitReq2
(
SDecoder
*
pCoder
,
SSubmitReq2
*
pReq
);
void
tDestroySSubmitTbData
(
SSubmitTbData
*
pTbData
,
int32_t
flag
);
void
tDestroySSubmitTbData
(
SSubmitTbData
*
pTbData
,
int32_t
flag
);
void
tDestroySSubmitReq
2
(
SSubmitReq2
*
pReq
,
int32_t
flag
);
void
tDestroySSubmitReq
(
SSubmitReq2
*
pReq
,
int32_t
flag
);
typedef
struct
{
typedef
struct
{
int32_t
affectedRows
;
int32_t
affectedRows
;
...
...
source/common/src/tdatablock.c
浏览文件 @
e1bd65ec
...
@@ -2388,7 +2388,7 @@ _end:
...
@@ -2388,7 +2388,7 @@ _end:
if
(
terrno
!=
0
)
{
if
(
terrno
!=
0
)
{
*
ppReq
=
NULL
;
*
ppReq
=
NULL
;
if
(
pReq
)
{
if
(
pReq
)
{
tDestroySSubmitReq
2
(
pReq
,
TSDB_MSG_FLG_ENCODE
);
tDestroySSubmitReq
(
pReq
,
TSDB_MSG_FLG_ENCODE
);
taosMemoryFreeClear
(
pReq
);
taosMemoryFreeClear
(
pReq
);
}
}
...
...
source/common/src/tmsg.c
浏览文件 @
e1bd65ec
...
@@ -7437,7 +7437,7 @@ void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
...
@@ -7437,7 +7437,7 @@ void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
}
}
}
}
void
tDestroySSubmitReq
2
(
SSubmitReq2
*
pReq
,
int32_t
flag
)
{
void
tDestroySSubmitReq
(
SSubmitReq2
*
pReq
,
int32_t
flag
)
{
if
(
pReq
->
aSubmitTbData
==
NULL
)
return
;
if
(
pReq
->
aSubmitTbData
==
NULL
)
return
;
int32_t
nSubmitTbData
=
TARRAY_SIZE
(
pReq
->
aSubmitTbData
);
int32_t
nSubmitTbData
=
TARRAY_SIZE
(
pReq
->
aSubmitTbData
);
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
e1bd65ec
...
@@ -259,17 +259,14 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
...
@@ -259,17 +259,14 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
int32_t
tqReaderRemoveTbUidList
(
STqReader
*
pReader
,
const
SArray
*
tbUidList
);
int32_t
tqReaderRemoveTbUidList
(
STqReader
*
pReader
,
const
SArray
*
tbUidList
);
int32_t
tqSeekVer
(
STqReader
*
pReader
,
int64_t
ver
,
const
char
*
id
);
int32_t
tqSeekVer
(
STqReader
*
pReader
,
int64_t
ver
,
const
char
*
id
);
void
tqNextBlock
(
STqReader
*
pReader
,
SFetchRet
*
ret
);
int32_t
tqNextBlock
(
STqReader
*
pReader
,
SSDataBlock
*
pBlock
);
int32_t
extractSubmitMsgFromWal
(
SWalReader
*
pReader
,
SPackedData
*
pPackedData
);
int32_t
extractSubmitMsgFromWal
(
SWalReader
*
pReader
,
SPackedData
*
pPackedData
);
int32_t
tqReaderSetSubmitMsg
(
STqReader
*
pReader
,
void
*
msgStr
,
int32_t
msgLen
,
int64_t
ver
);
int32_t
tqReaderSetSubmitMsg
(
STqReader
*
pReader
,
void
*
msgStr
,
int32_t
msgLen
,
int64_t
ver
);
// int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver);
bool
tqNextBlockImpl
(
STqReader
*
pReader
);
bool
tqNextDataBlock
(
STqReader
*
pReader
);
bool
tqNextDataBlockFilterOut
(
STqReader
*
pReader
,
SHashObj
*
filterOutUids
);
bool
tqNextDataBlockFilterOut2
(
STqReader
*
pReader
,
SHashObj
*
filterOutUids
);
int32_t
tqRetrieveDataBlock
(
SSDataBlock
*
pBlock
,
STqReader
*
pReader
,
SSubmitTbData
**
pSubmitTbDataRet
);
int32_t
tqRetrieveDataBlock2
(
SSDataBlock
*
pBlock
,
STqReader
*
pReader
,
SSubmitTbData
**
pSubmitTbDataRet
);
int32_t
tqRetrieveTaosxBlock2
(
STqReader
*
pReader
,
SArray
*
blocks
,
SArray
*
schemas
,
SSubmitTbData
**
pSubmitTbDataRet
);
int32_t
tqRetrieveTaosxBlock2
(
STqReader
*
pReader
,
SArray
*
blocks
,
SArray
*
schemas
,
SSubmitTbData
**
pSubmitTbDataRet
);
// int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
// int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas);
int32_t
vnodeEnqueueStreamMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int32_t
vnodeEnqueueStreamMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
e1bd65ec
...
@@ -684,7 +684,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
...
@@ -684,7 +684,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
}
}
if
(
pReq
&&
tdProcessSubmitReq
(
sinkTsdb
,
output
->
info
.
version
,
pReq
)
<
0
)
{
if
(
pReq
&&
tdProcessSubmitReq
(
sinkTsdb
,
output
->
info
.
version
,
pReq
)
<
0
)
{
tDestroySSubmitReq
2
(
pReq
,
TSDB_MSG_FLG_ENCODE
);
tDestroySSubmitReq
(
pReq
,
TSDB_MSG_FLG_ENCODE
);
taosMemoryFree
(
pReq
);
taosMemoryFree
(
pReq
);
smaError
(
"vgId:%d, process submit req for rsma suid:%"
PRIu64
", uid:%"
PRIu64
" level %"
PRIi8
smaError
(
"vgId:%d, process submit req for rsma suid:%"
PRIu64
", uid:%"
PRIu64
" level %"
PRIi8
" failed since %s"
,
" failed since %s"
,
...
@@ -696,7 +696,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
...
@@ -696,7 +696,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
SMA_VID
(
pSma
),
suid
,
output
->
info
.
id
.
groupId
,
pItem
->
level
,
output
->
info
.
version
);
SMA_VID
(
pSma
),
suid
,
output
->
info
.
id
.
groupId
,
pItem
->
level
,
output
->
info
.
version
);
if
(
pReq
)
{
if
(
pReq
)
{
tDestroySSubmitReq
2
(
pReq
,
TSDB_MSG_FLG_ENCODE
);
tDestroySSubmitReq
(
pReq
,
TSDB_MSG_FLG_ENCODE
);
taosMemoryFree
(
pReq
);
taosMemoryFree
(
pReq
);
}
}
}
}
...
...
source/dnode/vnode/src/sma/smaTimeRange.c
浏览文件 @
e1bd65ec
...
@@ -332,7 +332,7 @@ _end:
...
@@ -332,7 +332,7 @@ _end:
taosArrayDestroy
(
tagArray
);
taosArrayDestroy
(
tagArray
);
taosArrayDestroy
(
pVals
);
taosArrayDestroy
(
pVals
);
if
(
pReq
)
{
if
(
pReq
)
{
tDestroySSubmitReq
2
(
pReq
,
TSDB_MSG_FLG_ENCODE
);
tDestroySSubmitReq
(
pReq
,
TSDB_MSG_FLG_ENCODE
);
taosMemoryFree
(
pReq
);
taosMemoryFree
(
pReq
);
}
}
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
e1bd65ec
...
@@ -288,7 +288,7 @@ void tqCloseReader(STqReader* pReader) {
...
@@ -288,7 +288,7 @@ void tqCloseReader(STqReader* pReader) {
}
}
// free hash
// free hash
taosHashCleanup
(
pReader
->
tbIdHash
);
taosHashCleanup
(
pReader
->
tbIdHash
);
tDestroySSubmitReq
2
(
&
pReader
->
submit
,
TSDB_MSG_FLG_DECODE
);
tDestroySSubmitReq
(
&
pReader
->
submit
,
TSDB_MSG_FLG_DECODE
);
taosMemoryFree
(
pReader
);
taosMemoryFree
(
pReader
);
}
}
...
@@ -322,12 +322,11 @@ int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) {
...
@@ -322,12 +322,11 @@ int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) {
return
0
;
return
0
;
}
}
void
tqNextBlock
(
STqReader
*
pReader
,
SFetchRet
*
ret
)
{
int32_t
tqNextBlock
(
STqReader
*
pReader
,
SSDataBlock
*
pBlock
)
{
while
(
1
)
{
while
(
1
)
{
if
(
pReader
->
msg2
.
msgStr
==
NULL
)
{
if
(
pReader
->
msg2
.
msgStr
==
NULL
)
{
if
(
walNextValidMsg
(
pReader
->
pWalReader
)
<
0
)
{
if
(
walNextValidMsg
(
pReader
->
pWalReader
)
<
0
)
{
ret
->
fetchType
=
FETCH_TYPE__NONE
;
return
FETCH_TYPE__NONE
;
return
;
}
}
void
*
pBody
=
POINTER_SHIFT
(
pReader
->
pWalReader
->
pHead
->
head
.
body
,
sizeof
(
SSubmitReq2Msg
));
void
*
pBody
=
POINTER_SHIFT
(
pReader
->
pWalReader
->
pHead
->
head
.
body
,
sizeof
(
SSubmitReq2Msg
));
...
@@ -337,15 +336,14 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) {
...
@@ -337,15 +336,14 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) {
tqReaderSetSubmitMsg
(
pReader
,
pBody
,
bodyLen
,
ver
);
tqReaderSetSubmitMsg
(
pReader
,
pBody
,
bodyLen
,
ver
);
}
}
while
(
tqNext
DataBlock
(
pReader
))
{
while
(
tqNext
BlockImpl
(
pReader
))
{
memset
(
&
ret
->
data
,
0
,
sizeof
(
SSDataBlock
));
memset
(
pBlock
,
0
,
sizeof
(
SSDataBlock
));
int32_t
code
=
tqRetrieveDataBlock
2
(
&
ret
->
data
,
pReader
,
NULL
);
int32_t
code
=
tqRetrieveDataBlock
(
pBlock
,
pReader
,
NULL
);
if
(
code
!=
0
||
ret
->
data
.
info
.
rows
==
0
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
||
pBlock
->
info
.
rows
==
0
)
{
continue
;
continue
;
}
}
ret
->
fetchType
=
FETCH_TYPE__DATA
;
return
FETCH_TYPE__DATA
;
return
;
}
}
}
}
}
}
...
@@ -367,7 +365,7 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i
...
@@ -367,7 +365,7 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i
return
0
;
return
0
;
}
}
bool
tqNext
DataBlock
(
STqReader
*
pReader
)
{
bool
tqNext
BlockImpl
(
STqReader
*
pReader
)
{
if
(
pReader
->
msg2
.
msgStr
==
NULL
)
{
if
(
pReader
->
msg2
.
msgStr
==
NULL
)
{
return
false
;
return
false
;
}
}
...
@@ -387,20 +385,20 @@ bool tqNextDataBlock(STqReader* pReader) {
...
@@ -387,20 +385,20 @@ bool tqNextDataBlock(STqReader* pReader) {
tqDebug
(
"tq reader block found, ver:%"
PRId64
", uid:%"
PRId64
,
pReader
->
msg2
.
ver
,
pSubmitTbData
->
uid
);
tqDebug
(
"tq reader block found, ver:%"
PRId64
", uid:%"
PRId64
,
pReader
->
msg2
.
ver
,
pSubmitTbData
->
uid
);
return
true
;
return
true
;
}
else
{
}
else
{
tqDebug
(
"tq reader discard block, uid:%"
PRId64
", continue"
,
pSubmitTbData
->
uid
);
tqDebug
(
"tq reader discard
submit
block, uid:%"
PRId64
", continue"
,
pSubmitTbData
->
uid
);
}
}
pReader
->
nextBlk
++
;
pReader
->
nextBlk
++
;
}
}
tDestroySSubmitReq
2
(
&
pReader
->
submit
,
TSDB_MSG_FLG_DECODE
);
tDestroySSubmitReq
(
&
pReader
->
submit
,
TSDB_MSG_FLG_DECODE
);
pReader
->
nextBlk
=
0
;
pReader
->
nextBlk
=
0
;
pReader
->
msg2
.
msgStr
=
NULL
;
pReader
->
msg2
.
msgStr
=
NULL
;
return
false
;
return
false
;
}
}
bool
tqNextDataBlockFilterOut
2
(
STqReader
*
pReader
,
SHashObj
*
filterOutUids
)
{
bool
tqNextDataBlockFilterOut
(
STqReader
*
pReader
,
SHashObj
*
filterOutUids
)
{
if
(
pReader
->
msg2
.
msgStr
==
NULL
)
return
false
;
if
(
pReader
->
msg2
.
msgStr
==
NULL
)
return
false
;
int32_t
blockSz
=
taosArrayGetSize
(
pReader
->
submit
.
aSubmitTbData
);
int32_t
blockSz
=
taosArrayGetSize
(
pReader
->
submit
.
aSubmitTbData
);
...
@@ -415,7 +413,7 @@ bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) {
...
@@ -415,7 +413,7 @@ bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) {
pReader
->
nextBlk
++
;
pReader
->
nextBlk
++
;
}
}
tDestroySSubmitReq
2
(
&
pReader
->
submit
,
TSDB_MSG_FLG_DECODE
);
tDestroySSubmitReq
(
&
pReader
->
submit
,
TSDB_MSG_FLG_DECODE
);
pReader
->
nextBlk
=
0
;
pReader
->
nextBlk
=
0
;
pReader
->
msg2
.
msgStr
=
NULL
;
pReader
->
msg2
.
msgStr
=
NULL
;
...
@@ -451,7 +449,7 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap
...
@@ -451,7 +449,7 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap
return
0
;
return
0
;
}
}
int32_t
tqRetrieveDataBlock
2
(
SSDataBlock
*
pBlock
,
STqReader
*
pReader
,
SSubmitTbData
**
pSubmitTbDataRet
)
{
int32_t
tqRetrieveDataBlock
(
SSDataBlock
*
pBlock
,
STqReader
*
pReader
,
SSubmitTbData
**
pSubmitTbDataRet
)
{
tqDebug
(
"tq reader retrieve data block %p, index:%d"
,
pReader
->
msg2
.
msgStr
,
pReader
->
nextBlk
);
tqDebug
(
"tq reader retrieve data block %p, index:%d"
,
pReader
->
msg2
.
msgStr
,
pReader
->
nextBlk
);
SSubmitTbData
*
pSubmitTbData
=
taosArrayGet
(
pReader
->
submit
.
aSubmitTbData
,
pReader
->
nextBlk
);
SSubmitTbData
*
pSubmitTbData
=
taosArrayGet
(
pReader
->
submit
.
aSubmitTbData
,
pReader
->
nextBlk
);
pReader
->
nextBlk
++
;
pReader
->
nextBlk
++
;
...
@@ -560,7 +558,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD
...
@@ -560,7 +558,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD
int32_t
sourceIdx
=
0
;
int32_t
sourceIdx
=
0
;
while
(
targetIdx
<
colActual
)
{
while
(
targetIdx
<
colActual
)
{
if
(
sourceIdx
>=
numOfCols
){
if
(
sourceIdx
>=
numOfCols
){
tqError
(
"tqRetrieveDataBlock
2
sourceIdx:%d >= numOfCols:%d"
,
sourceIdx
,
numOfCols
);
tqError
(
"tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d"
,
sourceIdx
,
numOfCols
);
goto
FAIL
;
goto
FAIL
;
}
}
SColData
*
pCol
=
taosArrayGet
(
pCols
,
sourceIdx
);
SColData
*
pCol
=
taosArrayGet
(
pCols
,
sourceIdx
);
...
@@ -568,7 +566,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD
...
@@ -568,7 +566,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD
SColVal
colVal
;
SColVal
colVal
;
if
(
pCol
->
nVal
!=
numOfRows
){
if
(
pCol
->
nVal
!=
numOfRows
){
tqError
(
"tqRetrieveDataBlock
2
pCol->nVal:%d != numOfRows:%d"
,
pCol
->
nVal
,
numOfRows
);
tqError
(
"tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d"
,
pCol
->
nVal
,
numOfRows
);
goto
FAIL
;
goto
FAIL
;
}
}
...
...
source/dnode/vnode/src/tq/tqScan.c
浏览文件 @
e1bd65ec
...
@@ -203,7 +203,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
...
@@ -203,7 +203,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
STqReader
*
pReader
=
pExec
->
pTqReader
;
STqReader
*
pReader
=
pExec
->
pTqReader
;
tqReaderSetSubmitMsg
(
pReader
,
submit
.
msgStr
,
submit
.
msgLen
,
submit
.
ver
);
tqReaderSetSubmitMsg
(
pReader
,
submit
.
msgStr
,
submit
.
msgLen
,
submit
.
ver
);
while
(
tqNext
DataBlock
(
pReader
))
{
while
(
tqNext
BlockImpl
(
pReader
))
{
taosArrayClear
(
pBlocks
);
taosArrayClear
(
pBlocks
);
taosArrayClear
(
pSchemas
);
taosArrayClear
(
pSchemas
);
SSubmitTbData
*
pSubmitTbDataRet
=
NULL
;
SSubmitTbData
*
pSubmitTbDataRet
=
NULL
;
...
@@ -262,7 +262,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
...
@@ -262,7 +262,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
}
else
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__DB
)
{
}
else
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__DB
)
{
STqReader
*
pReader
=
pExec
->
pTqReader
;
STqReader
*
pReader
=
pExec
->
pTqReader
;
tqReaderSetSubmitMsg
(
pReader
,
submit
.
msgStr
,
submit
.
msgLen
,
submit
.
ver
);
tqReaderSetSubmitMsg
(
pReader
,
submit
.
msgStr
,
submit
.
msgLen
,
submit
.
ver
);
while
(
tqNextDataBlockFilterOut
2
(
pReader
,
pExec
->
execDb
.
pFilterOutTbUid
))
{
while
(
tqNextDataBlockFilterOut
(
pReader
,
pExec
->
execDb
.
pFilterOutTbUid
))
{
taosArrayClear
(
pBlocks
);
taosArrayClear
(
pBlocks
);
taosArrayClear
(
pSchemas
);
taosArrayClear
(
pSchemas
);
SSubmitTbData
*
pSubmitTbDataRet
=
NULL
;
SSubmitTbData
*
pSubmitTbDataRet
=
NULL
;
...
...
source/dnode/vnode/src/tq/tqSink.c
浏览文件 @
e1bd65ec
...
@@ -695,7 +695,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
...
@@ -695,7 +695,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
len
+=
sizeof
(
SSubmitReq2Msg
);
len
+=
sizeof
(
SSubmitReq2Msg
);
pBuf
=
rpcMallocCont
(
len
);
pBuf
=
rpcMallocCont
(
len
);
if
(
NULL
==
pBuf
)
{
if
(
NULL
==
pBuf
)
{
tDestroySSubmitReq
2
(
&
submitReq
,
TSDB_MSG_FLG_ENCODE
);
tDestroySSubmitReq
(
&
submitReq
,
TSDB_MSG_FLG_ENCODE
);
goto
_end
;
goto
_end
;
}
}
((
SSubmitReq2Msg
*
)
pBuf
)
->
header
.
vgId
=
TD_VID
(
pVnode
);
((
SSubmitReq2Msg
*
)
pBuf
)
->
header
.
vgId
=
TD_VID
(
pVnode
);
...
@@ -707,11 +707,11 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
...
@@ -707,11 +707,11 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tqError
(
"failed to encode submit req since %s"
,
terrstr
());
tqError
(
"failed to encode submit req since %s"
,
terrstr
());
tEncoderClear
(
&
encoder
);
tEncoderClear
(
&
encoder
);
rpcFreeCont
(
pBuf
);
rpcFreeCont
(
pBuf
);
tDestroySSubmitReq
2
(
&
submitReq
,
TSDB_MSG_FLG_ENCODE
);
tDestroySSubmitReq
(
&
submitReq
,
TSDB_MSG_FLG_ENCODE
);
continue
;
continue
;
}
}
tEncoderClear
(
&
encoder
);
tEncoderClear
(
&
encoder
);
tDestroySSubmitReq
2
(
&
submitReq
,
TSDB_MSG_FLG_ENCODE
);
tDestroySSubmitReq
(
&
submitReq
,
TSDB_MSG_FLG_ENCODE
);
SRpcMsg
msg
=
{
SRpcMsg
msg
=
{
.
msgType
=
TDMT_VND_SUBMIT
,
.
msgType
=
TDMT_VND_SUBMIT
,
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
e1bd65ec
...
@@ -246,8 +246,6 @@ static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order
...
@@ -246,8 +246,6 @@ static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order
static
STableBlockScanInfo
*
getTableBlockScanInfo
(
SSHashObj
*
pTableMap
,
uint64_t
uid
,
const
char
*
id
);
static
STableBlockScanInfo
*
getTableBlockScanInfo
(
SSHashObj
*
pTableMap
,
uint64_t
uid
,
const
char
*
id
);
static
STSchema
*
getLatestTableSchema
(
STsdbReader
*
pReader
,
uint64_t
uid
);
static
bool
outOfTimeWindow
(
int64_t
ts
,
STimeWindow
*
pWindow
)
{
return
(
ts
>
pWindow
->
ekey
)
||
(
ts
<
pWindow
->
skey
);
}
static
bool
outOfTimeWindow
(
int64_t
ts
,
STimeWindow
*
pWindow
)
{
return
(
ts
>
pWindow
->
ekey
)
||
(
ts
<
pWindow
->
skey
);
}
static
int32_t
setColumnIdSlotList
(
SBlockLoadSuppInfo
*
pSupInfo
,
SColumnInfo
*
pCols
,
const
int32_t
*
pSlotIdList
,
static
int32_t
setColumnIdSlotList
(
SBlockLoadSuppInfo
*
pSupInfo
,
SColumnInfo
*
pCols
,
const
int32_t
*
pSlotIdList
,
...
@@ -1354,16 +1352,40 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
...
@@ -1354,16 +1352,40 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
FORCE_INLINE
STSchema
*
getTableSchemaImpl
(
STsdbReader
*
pReader
,
uint64_t
uid
)
{
ASSERT
(
pReader
->
pSchema
==
NULL
);
int32_t
code
=
metaGetTbTSchemaEx
(
pReader
->
pTsdb
->
pVnode
->
pMeta
,
pReader
->
suid
,
uid
,
-
1
,
&
pReader
->
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pReader
->
pSchema
==
NULL
)
{
terrno
=
code
;
tsdbError
(
"failed to get table schema, uid:%"
PRIu64
", it may have been dropped, ver:-1, %s"
,
uid
,
pReader
->
idStr
);
return
NULL
;
}
code
=
tsdbRowMergerInit
(
&
pReader
->
status
.
merger
,
pReader
->
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
tsdbError
(
"failed to init merger, code:%s, %s"
,
tstrerror
(
code
),
pReader
->
idStr
);
return
NULL
;
}
return
pReader
->
pSchema
;
}
static
int32_t
doLoadFileBlockData
(
STsdbReader
*
pReader
,
SDataBlockIter
*
pBlockIter
,
SBlockData
*
pBlockData
,
static
int32_t
doLoadFileBlockData
(
STsdbReader
*
pReader
,
SDataBlockIter
*
pBlockIter
,
SBlockData
*
pBlockData
,
uint64_t
uid
)
{
uint64_t
uid
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int64_t
st
=
taosGetTimestampUs
();
STSchema
*
pSchema
=
pReader
->
pSchema
;
int64_t
st
=
taosGetTimestampUs
();
tBlockDataReset
(
pBlockData
);
tBlockDataReset
(
pBlockData
);
STSchema
*
pSchema
=
getLatestTableSchema
(
pReader
,
uid
);
if
(
pSchema
==
NULL
)
{
if
(
pReader
->
pSchema
==
NULL
)
{
tsdbDebug
(
"%p table uid:%"
PRIu64
" has been dropped, no data existed, %s"
,
pReader
,
uid
,
pReader
->
idStr
);
pSchema
=
getTableSchemaImpl
(
pReader
,
uid
);
return
code
;
if
(
pSchema
==
NULL
)
{
tsdbDebug
(
"%p table uid:%"
PRIu64
" has been dropped, no data existed, %s"
,
pReader
,
uid
,
pReader
->
idStr
);
return
code
;
}
}
}
SBlockLoadSuppInfo
*
pSup
=
&
pReader
->
suppInfo
;
SBlockLoadSuppInfo
*
pSup
=
&
pReader
->
suppInfo
;
...
@@ -1912,33 +1934,11 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
...
@@ -1912,33 +1934,11 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
return
code
;
return
code
;
}
}
STSchema
*
getLatestTableSchema
(
STsdbReader
*
pReader
,
uint64_t
uid
)
{
if
(
pReader
->
pSchema
!=
NULL
)
{
return
pReader
->
pSchema
;
}
int32_t
code
=
metaGetTbTSchemaEx
(
pReader
->
pTsdb
->
pVnode
->
pMeta
,
pReader
->
suid
,
uid
,
-
1
,
&
pReader
->
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pReader
->
pSchema
==
NULL
)
{
tsdbError
(
"failed to get table schema, uid:%"
PRIu64
", it may have been dropped, ver:-1, %s"
,
uid
,
pReader
->
idStr
);
}
return
pReader
->
pSchema
;
}
static
FORCE_INLINE
STSchema
*
doGetSchemaForTSRow
(
int32_t
sversion
,
STsdbReader
*
pReader
,
uint64_t
uid
)
{
static
FORCE_INLINE
STSchema
*
doGetSchemaForTSRow
(
int32_t
sversion
,
STsdbReader
*
pReader
,
uint64_t
uid
)
{
int32_t
code
=
0
;
// always set the newest schema version in pReader->pSchema
// always set the newest schema version in pReader->pSchema
if
(
pReader
->
pSchema
==
NULL
)
{
if
(
pReader
->
pSchema
==
NULL
)
{
code
=
metaGetTbTSchemaEx
(
pReader
->
pTsdb
->
pVnode
->
pMeta
,
pReader
->
suid
,
uid
,
-
1
,
&
pReader
->
pSchema
);
STSchema
*
ps
=
getTableSchemaImpl
(
pReader
,
uid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
ps
==
NULL
)
{
terrno
=
code
;
return
NULL
;
}
code
=
tsdbRowMergerInit
(
&
pReader
->
status
.
merger
,
pReader
->
pSchema
);
if
(
code
!=
0
)
{
terrno
=
code
;
return
NULL
;
return
NULL
;
}
}
}
}
...
@@ -1953,7 +1953,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
...
@@ -1953,7 +1953,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
}
}
STSchema
*
ptr
=
NULL
;
STSchema
*
ptr
=
NULL
;
code
=
metaGetTbTSchemaEx
(
pReader
->
pTsdb
->
pVnode
->
pMeta
,
pReader
->
suid
,
uid
,
sversion
,
&
ptr
);
int32_t
code
=
metaGetTbTSchemaEx
(
pReader
->
pTsdb
->
pVnode
->
pMeta
,
pReader
->
suid
,
uid
,
sversion
,
&
ptr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
terrno
=
code
;
return
NULL
;
return
NULL
;
...
@@ -1982,6 +1982,15 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
...
@@ -1982,6 +1982,15 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBKEY
k
=
TSDBROW_KEY
(
pRow
);
TSDBKEY
k
=
TSDBROW_KEY
(
pRow
);
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
// merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized
if
(
pMerger
->
pArray
==
NULL
)
{
ASSERT
(
pReader
->
pSchema
==
NULL
);
STSchema
*
ps
=
getTableSchemaImpl
(
pReader
,
pBlockScanInfo
->
uid
);
if
(
ps
==
NULL
)
{
return
terrno
;
}
}
int64_t
minKey
=
0
;
int64_t
minKey
=
0
;
if
(
pReader
->
order
==
TSDB_ORDER_ASC
)
{
if
(
pReader
->
order
==
TSDB_ORDER_ASC
)
{
minKey
=
INT64_MAX
;
// chosen the minimum value
minKey
=
INT64_MAX
;
// chosen the minimum value
...
@@ -2011,13 +2020,14 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
...
@@ -2011,13 +2020,14 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
}
}
// todo remove init
bool
init
=
false
;
bool
init
=
false
;
// ASC: file block ---> last block -----> imem -----> mem
// ASC: file block ---> last block -----> imem -----> mem
// DESC: mem -----> imem -----> last block -----> file block
// DESC: mem -----> imem -----> last block -----> file block
if
(
pReader
->
order
==
TSDB_ORDER_ASC
)
{
if
(
pReader
->
order
==
TSDB_ORDER_ASC
)
{
if
(
minKey
==
key
)
{
if
(
minKey
==
key
)
{
init
=
true
;
// todo check if pReader->pSchema is null or not
init
=
true
;
int32_t
code
=
tsdbRowMergerAdd
(
pMerger
,
&
fRow
,
pReader
->
pSchema
);
int32_t
code
=
tsdbRowMergerAdd
(
pMerger
,
&
fRow
,
pReader
->
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
return
code
;
...
@@ -2203,6 +2213,16 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
...
@@ -2203,6 +2213,16 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
static
int32_t
mergeFileBlockAndLastBlock
(
STsdbReader
*
pReader
,
SLastBlockReader
*
pLastBlockReader
,
int64_t
key
,
static
int32_t
mergeFileBlockAndLastBlock
(
STsdbReader
*
pReader
,
SLastBlockReader
*
pLastBlockReader
,
int64_t
key
,
STableBlockScanInfo
*
pBlockScanInfo
,
SBlockData
*
pBlockData
)
{
STableBlockScanInfo
*
pBlockScanInfo
,
SBlockData
*
pBlockData
)
{
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
SRowMerger
*
pMerger
=
&
pReader
->
status
.
merger
;
// merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized
if
(
pMerger
->
pArray
==
NULL
)
{
ASSERT
(
pReader
->
pSchema
==
NULL
);
STSchema
*
ps
=
getTableSchemaImpl
(
pReader
,
pBlockScanInfo
->
uid
);
if
(
ps
==
NULL
)
{
return
terrno
;
}
}
if
(
hasDataInFileBlock
(
pBlockData
,
pDumpInfo
))
{
if
(
hasDataInFileBlock
(
pBlockData
,
pDumpInfo
))
{
// no last block available, only data block exists
// no last block available, only data block exists
...
@@ -2220,8 +2240,6 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
...
@@ -2220,8 +2240,6 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
return
mergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
key
,
pReader
);
return
mergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
key
,
pReader
);
}
else
if
(
key
==
ts
)
{
}
else
if
(
key
==
ts
)
{
SRow
*
pTSRow
=
NULL
;
SRow
*
pTSRow
=
NULL
;
SRowMerger
*
pMerger
=
&
pReader
->
status
.
merger
;
int32_t
code
=
tsdbRowMergerAdd
(
pMerger
,
&
fRow
,
pReader
->
pSchema
);
int32_t
code
=
tsdbRowMergerAdd
(
pMerger
,
&
fRow
,
pReader
->
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
return
code
;
...
@@ -2285,6 +2303,15 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
...
@@ -2285,6 +2303,15 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return
code
;
return
code
;
}
}
// merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized
if
(
pMerger
->
pArray
==
NULL
)
{
ASSERT
(
pReader
->
pSchema
==
NULL
);
STSchema
*
ps
=
getTableSchemaImpl
(
pReader
,
pBlockScanInfo
->
uid
);
if
(
ps
==
NULL
)
{
return
terrno
;
}
}
int64_t
minKey
=
0
;
int64_t
minKey
=
0
;
if
(
ASCENDING_TRAVERSE
(
pReader
->
order
))
{
if
(
ASCENDING_TRAVERSE
(
pReader
->
order
))
{
minKey
=
INT64_MAX
;
// let's find the minimum
minKey
=
INT64_MAX
;
// let's find the minimum
...
@@ -2596,6 +2623,7 @@ bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo*
...
@@ -2596,6 +2623,7 @@ bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo*
int32_t
mergeRowsInFileBlocks
(
SBlockData
*
pBlockData
,
STableBlockScanInfo
*
pBlockScanInfo
,
int64_t
key
,
int32_t
mergeRowsInFileBlocks
(
SBlockData
*
pBlockData
,
STableBlockScanInfo
*
pBlockScanInfo
,
int64_t
key
,
STsdbReader
*
pReader
)
{
STsdbReader
*
pReader
)
{
SRowMerger
*
pMerger
=
&
pReader
->
status
.
merger
;
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
bool
copied
=
false
;
bool
copied
=
false
;
int32_t
code
=
tryCopyDistinctRowFromFileBlock
(
pReader
,
pBlockData
,
key
,
pDumpInfo
,
&
copied
);
int32_t
code
=
tryCopyDistinctRowFromFileBlock
(
pReader
,
pBlockData
,
key
,
pDumpInfo
,
&
copied
);
...
@@ -2603,6 +2631,15 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
...
@@ -2603,6 +2631,15 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
return
code
;
return
code
;
}
}
// merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized
if
(
pMerger
->
pArray
==
NULL
)
{
ASSERT
(
pReader
->
pSchema
==
NULL
);
STSchema
*
ps
=
getTableSchemaImpl
(
pReader
,
pBlockScanInfo
->
uid
);
if
(
ps
==
NULL
)
{
return
terrno
;
}
}
if
(
copied
)
{
if
(
copied
)
{
pBlockScanInfo
->
lastKey
=
key
;
pBlockScanInfo
->
lastKey
=
key
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -2610,13 +2647,13 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
...
@@ -2610,13 +2647,13 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
SRow
*
pTSRow
=
NULL
;
SRow
*
pTSRow
=
NULL
;
code
=
tsdbRowMergerAdd
(
&
pReader
->
status
.
m
erger
,
&
fRow
,
pReader
->
pSchema
);
code
=
tsdbRowMergerAdd
(
pM
erger
,
&
fRow
,
pReader
->
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
return
code
;
}
}
doMergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
pReader
);
doMergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
pReader
);
code
=
tsdbRowMergerGetRow
(
&
pReader
->
status
.
m
erger
,
&
pTSRow
);
code
=
tsdbRowMergerGetRow
(
pM
erger
,
&
pTSRow
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
return
code
;
}
}
...
@@ -2624,7 +2661,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
...
@@ -2624,7 +2661,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
code
=
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
);
code
=
doAppendRowFromTSRow
(
pReader
->
pResBlock
,
pReader
,
pTSRow
,
pBlockScanInfo
);
taosMemoryFree
(
pTSRow
);
taosMemoryFree
(
pTSRow
);
tsdbRowMergerClear
(
&
pReader
->
status
.
m
erger
);
tsdbRowMergerClear
(
pM
erger
);
return
code
;
return
code
;
}
}
}
}
...
@@ -4412,6 +4449,10 @@ static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) {
...
@@ -4412,6 +4449,10 @@ static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) {
pDst
->
pSchema
=
pSrc
->
pSchema
;
pDst
->
pSchema
=
pSrc
->
pSchema
;
pDst
->
pSchemaMap
=
pSrc
->
pSchemaMap
;
pDst
->
pSchemaMap
=
pSrc
->
pSchemaMap
;
pDst
->
pReadSnap
=
pSrc
->
pReadSnap
;
pDst
->
pReadSnap
=
pSrc
->
pReadSnap
;
if
(
pDst
->
pSchema
)
{
tsdbRowMergerInit
(
&
pDst
->
status
.
merger
,
pDst
->
pSchema
);
}
}
}
void
tsdbReaderClose
(
STsdbReader
*
pReader
)
{
void
tsdbReaderClose
(
STsdbReader
*
pReader
)
{
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
e1bd65ec
...
@@ -1388,7 +1388,7 @@ _exit:
...
@@ -1388,7 +1388,7 @@ _exit:
// clear
// clear
taosArrayDestroy
(
newTbUids
);
taosArrayDestroy
(
newTbUids
);
tDestroySSubmitReq
2
(
pSubmitReq
,
0
==
pMsg
->
version
?
TSDB_MSG_FLG_CMPT
:
TSDB_MSG_FLG_DECODE
);
tDestroySSubmitReq
(
pSubmitReq
,
0
==
pMsg
->
version
?
TSDB_MSG_FLG_CMPT
:
TSDB_MSG_FLG_DECODE
);
tDestroySSubmitRsp2
(
pSubmitRsp
,
TSDB_MSG_FLG_ENCODE
);
tDestroySSubmitRsp2
(
pSubmitRsp
,
TSDB_MSG_FLG_ENCODE
);
if
(
code
)
terrno
=
code
;
if
(
code
)
terrno
=
code
;
...
...
source/libs/executor/src/dataInserter.c
浏览文件 @
e1bd65ec
...
@@ -301,7 +301,7 @@ _end:
...
@@ -301,7 +301,7 @@ _end:
if
(
terrno
!=
0
)
{
if
(
terrno
!=
0
)
{
*
ppReq
=
NULL
;
*
ppReq
=
NULL
;
if
(
pReq
)
{
if
(
pReq
)
{
tDestroySSubmitReq
2
(
pReq
,
TSDB_MSG_FLG_ENCODE
);
tDestroySSubmitReq
(
pReq
,
TSDB_MSG_FLG_ENCODE
);
taosMemoryFree
(
pReq
);
taosMemoryFree
(
pReq
);
}
}
return
terrno
;
return
terrno
;
...
@@ -326,7 +326,7 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32
...
@@ -326,7 +326,7 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32
code
=
buildSubmitReqFromBlock
(
pInserter
,
&
pReq
,
pDataBlock
,
pTSchema
,
uid
,
vgId
,
suid
);
code
=
buildSubmitReqFromBlock
(
pInserter
,
&
pReq
,
pDataBlock
,
pTSchema
,
uid
,
vgId
,
suid
);
if
(
code
)
{
if
(
code
)
{
if
(
pReq
)
{
if
(
pReq
)
{
tDestroySSubmitReq
2
(
pReq
,
TSDB_MSG_FLG_ENCODE
);
tDestroySSubmitReq
(
pReq
,
TSDB_MSG_FLG_ENCODE
);
taosMemoryFree
(
pReq
);
taosMemoryFree
(
pReq
);
}
}
...
@@ -335,7 +335,7 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32
...
@@ -335,7 +335,7 @@ int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32
}
}
code
=
submitReqToMsg
(
vgId
,
pReq
,
pMsg
,
msgLen
);
code
=
submitReqToMsg
(
vgId
,
pReq
,
pMsg
,
msgLen
);
tDestroySSubmitReq
2
(
pReq
,
TSDB_MSG_FLG_ENCODE
);
tDestroySSubmitReq
(
pReq
,
TSDB_MSG_FLG_ENCODE
);
taosMemoryFree
(
pReq
);
taosMemoryFree
(
pReq
);
return
code
;
return
code
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
e1bd65ec
...
@@ -1646,10 +1646,10 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
...
@@ -1646,10 +1646,10 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
blockDataCleanup
(
pInfo
->
pRes
);
blockDataCleanup
(
pInfo
->
pRes
);
SDataBlockInfo
*
pBlockInfo
=
&
pInfo
->
pRes
->
info
;
SDataBlockInfo
*
pBlockInfo
=
&
pInfo
->
pRes
->
info
;
while
(
tqNext
DataBlock
(
pInfo
->
tqReader
))
{
while
(
tqNext
BlockImpl
(
pInfo
->
tqReader
))
{
SSDataBlock
block
=
{
0
};
SSDataBlock
block
=
{
0
};
int32_t
code
=
tqRetrieveDataBlock
2
(
&
block
,
pInfo
->
tqReader
,
NULL
);
int32_t
code
=
tqRetrieveDataBlock
(
&
block
,
pInfo
->
tqReader
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
block
.
info
.
rows
==
0
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
||
block
.
info
.
rows
==
0
)
{
continue
;
continue
;
}
}
...
@@ -1687,23 +1687,23 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
...
@@ -1687,23 +1687,23 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if
(
pTaskInfo
->
streamInfo
.
currentOffset
.
type
==
TMQ_OFFSET__LOG
)
{
if
(
pTaskInfo
->
streamInfo
.
currentOffset
.
type
==
TMQ_OFFSET__LOG
)
{
while
(
1
)
{
while
(
1
)
{
S
FetchRet
ret
=
{
0
};
S
SDataBlock
block
=
{
0
};
tqNextBlock
(
pInfo
->
tqReader
,
&
ret
);
int32_t
type
=
tqNextBlock
(
pInfo
->
tqReader
,
&
block
);
tqOffsetResetToLog
(
&
pTaskInfo
->
streamInfo
.
currentOffset
,
// curVersion move to next, so currentOffset = curVersion - 1
pInfo
->
tqReader
->
pWalReader
->
curVersion
-
1
);
// curVersion move to next, so currentOffset = curVersion - 1
tqOffsetResetToLog
(
&
pTaskInfo
->
streamInfo
.
currentOffset
,
pInfo
->
tqReader
->
pWalReader
->
curVersion
-
1
);
if
(
ret
.
fetchT
ype
==
FETCH_TYPE__DATA
)
{
if
(
t
ype
==
FETCH_TYPE__DATA
)
{
qDebug
(
"doQueueScan get data from log %"
PRId64
" rows, version:%"
PRId64
,
ret
.
data
.
info
.
rows
,
qDebug
(
"doQueueScan get data from log %"
PRId64
" rows, version:%"
PRId64
,
block
.
info
.
rows
,
pTaskInfo
->
streamInfo
.
currentOffset
.
version
);
pTaskInfo
->
streamInfo
.
currentOffset
.
version
);
blockDataCleanup
(
pInfo
->
pRes
);
blockDataCleanup
(
pInfo
->
pRes
);
setBlockIntoRes
(
pInfo
,
&
ret
.
data
,
true
);
setBlockIntoRes
(
pInfo
,
&
block
,
true
);
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
qDebug
(
"doQueueScan get data from log %"
PRId64
" rows, return, version:%"
PRId64
,
pInfo
->
pRes
->
info
.
rows
,
qDebug
(
"doQueueScan get data from log %"
PRId64
" rows, return, version:%"
PRId64
,
pInfo
->
pRes
->
info
.
rows
,
pTaskInfo
->
streamInfo
.
currentOffset
.
version
);
pTaskInfo
->
streamInfo
.
currentOffset
.
version
);
return
pInfo
->
pRes
;
return
pInfo
->
pRes
;
}
}
}
else
if
(
ret
.
fetchT
ype
==
FETCH_TYPE__NONE
)
{
}
else
if
(
t
ype
==
FETCH_TYPE__NONE
)
{
qDebug
(
"doQueueScan get none from log, return, version:%"
PRId64
,
pTaskInfo
->
streamInfo
.
currentOffset
.
version
);
qDebug
(
"doQueueScan get none from log, return, version:%"
PRId64
,
pTaskInfo
->
streamInfo
.
currentOffset
.
version
);
return
NULL
;
return
NULL
;
}
}
...
@@ -2072,11 +2072,10 @@ FETCH_NEXT_BLOCK:
...
@@ -2072,11 +2072,10 @@ FETCH_NEXT_BLOCK:
blockDataCleanup
(
pInfo
->
pRes
);
blockDataCleanup
(
pInfo
->
pRes
);
while
(
tqNext
DataBlock
(
pInfo
->
tqReader
))
{
while
(
tqNext
BlockImpl
(
pInfo
->
tqReader
))
{
SSDataBlock
block
=
{
0
};
SSDataBlock
block
=
{
0
};
int32_t
code
=
tqRetrieveDataBlock2
(
&
block
,
pInfo
->
tqReader
,
NULL
);
int32_t
code
=
tqRetrieveDataBlock
(
&
block
,
pInfo
->
tqReader
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
block
.
info
.
rows
==
0
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
||
block
.
info
.
rows
==
0
)
{
continue
;
continue
;
}
}
...
...
source/libs/parser/src/parInsertUtil.c
浏览文件 @
e1bd65ec
...
@@ -324,7 +324,7 @@ void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
...
@@ -324,7 +324,7 @@ void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
return
;
return
;
}
}
tDestroySSubmitReq
2
(
pVgCxt
->
pData
,
TSDB_MSG_FLG_ENCODE
);
tDestroySSubmitReq
(
pVgCxt
->
pData
,
TSDB_MSG_FLG_ENCODE
);
taosMemoryFree
(
pVgCxt
->
pData
);
taosMemoryFree
(
pVgCxt
->
pData
);
taosMemoryFree
(
pVgCxt
);
taosMemoryFree
(
pVgCxt
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录