Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2a7de0cd
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2a7de0cd
编写于
12月 21, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: remove assert
上级
86ff8f38
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
228 addition
and
197 deletion
+228
-197
include/common/tcommon.h
include/common/tcommon.h
+10
-9
include/libs/stream/streamState.h
include/libs/stream/streamState.h
+4
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+57
-55
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+17
-11
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+26
-24
source/dnode/vnode/src/tq/tqOffset.c
source/dnode/vnode/src/tq/tqOffset.c
+7
-12
source/dnode/vnode/src/tq/tqOffsetSnapshot.c
source/dnode/vnode/src/tq/tqOffsetSnapshot.c
+15
-9
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+4
-10
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+9
-25
source/dnode/vnode/src/tq/tqSink.c
source/dnode/vnode/src/tq/tqSink.c
+2
-8
source/dnode/vnode/src/tq/tqSnapshot.c
source/dnode/vnode/src/tq/tqSnapshot.c
+0
-2
source/dnode/vnode/src/tq/tqStreamStateSnap.c
source/dnode/vnode/src/tq/tqStreamStateSnap.c
+0
-3
source/dnode/vnode/src/tq/tqStreamTaskSnap.c
source/dnode/vnode/src/tq/tqStreamTaskSnap.c
+4
-7
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+55
-19
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+17
-3
source/libs/wal/src/walRef.c
source/libs/wal/src/walRef.c
+1
-0
未找到文件。
include/common/tcommon.h
浏览文件 @
2a7de0cd
...
...
@@ -195,7 +195,7 @@ typedef struct SDataBlockInfo {
uint32_t
capacity
;
SBlockID
id
;
int16_t
hasVarCol
;
int16_t
dataLoad
;
// denote if the data is loaded or not
int16_t
dataLoad
;
// denote if the data is loaded or not
// TODO: optimize and remove following
int64_t
version
;
// used for stream, and need serialization
...
...
@@ -204,8 +204,9 @@ typedef struct SDataBlockInfo {
STimeWindow
calWin
;
// used for stream, do not serialize
TSKEY
watermark
;
// used for stream
char
parTbName
[
TSDB_TABLE_NAME_LEN
];
// used for stream partition
STag
*
pTag
;
// used for stream partition
char
parTbName
[
TSDB_TABLE_NAME_LEN
];
// used for stream partition
int32_t
tagLen
;
void
*
pTag
;
// used for stream partition
}
SDataBlockInfo
;
typedef
struct
SSDataBlock
{
...
...
@@ -239,22 +240,22 @@ typedef struct SVarColAttr {
// pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null.
typedef
struct
SColumnInfoData
{
char
*
pData
;
// the corresponding block data in memory
char
*
pData
;
// the corresponding block data in memory
union
{
char
*
nullbitmap
;
// bitmap, one bit for each item in the list
SVarColAttr
varmeta
;
};
SColumnInfo
info
;
// column info
bool
hasNull
;
// if current column data has null value.
SColumnInfo
info
;
// column info
bool
hasNull
;
// if current column data has null value.
}
SColumnInfoData
;
typedef
struct
SQueryTableDataCond
{
uint64_t
suid
;
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
numOfCols
;
SColumnInfo
*
colList
;
int32_t
*
pSlotList
;
// the column output destation slot, and it may be null
int32_t
type
;
// data block load type:
int32_t
*
pSlotList
;
// the column output destation slot, and it may be null
int32_t
type
;
// data block load type:
STimeWindow
twindows
;
int64_t
startVersion
;
int64_t
endVersion
;
...
...
include/libs/stream/streamState.h
浏览文件 @
2a7de0cd
...
...
@@ -35,6 +35,7 @@ typedef struct STdbState {
TTB
*
pFillStateDb
;
// todo refactor
TTB
*
pSessionStateDb
;
TTB
*
pParNameDb
;
TTB
*
pParTagDb
;
TXN
*
txn
;
}
STdbState
;
...
...
@@ -108,6 +109,9 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
int32_t
streamStatePutParName
(
SStreamState
*
pState
,
int64_t
groupId
,
const
char
*
tbname
);
int32_t
streamStateGetParName
(
SStreamState
*
pState
,
int64_t
groupId
,
void
**
pVal
);
int32_t
streamStatePutParTag
(
SStreamState
*
pState
,
int64_t
groupId
,
const
void
*
tag
,
int32_t
tagLen
);
int32_t
streamStateGetParTag
(
SStreamState
*
pState
,
int64_t
groupId
,
void
**
tagVal
,
int32_t
*
tagLen
);
#if 0
char* streamStateSessionDump(SStreamState* pState);
#endif
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
2a7de0cd
...
...
@@ -92,21 +92,21 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
taosHashSetFreeFp
(
pTq
->
pCheckInfo
,
(
FDelete
)
tDeleteSTqCheckInfo
);
if
(
tqMetaOpen
(
pTq
)
<
0
)
{
ASSERT
(
0
)
;
return
NULL
;
}
pTq
->
pOffsetStore
=
tqOffsetOpen
(
pTq
);
if
(
pTq
->
pOffsetStore
==
NULL
)
{
ASSERT
(
0
)
;
return
NULL
;
}
pTq
->
pStreamMeta
=
streamMetaOpen
(
path
,
pTq
,
(
FTaskExpand
*
)
tqExpandTask
,
pTq
->
pVnode
->
config
.
vgId
);
if
(
pTq
->
pStreamMeta
==
NULL
)
{
ASSERT
(
0
)
;
return
NULL
;
}
if
(
streamLoadTasks
(
pTq
->
pStreamMeta
)
<
0
)
{
ASSERT
(
0
)
;
return
NULL
;
}
return
pTq
;
...
...
@@ -166,19 +166,17 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
int32_t
tqPushDataRsp
(
STQ
*
pTq
,
STqPushEntry
*
pPushEntry
)
{
SMqDataRsp
*
pRsp
=
&
pPushEntry
->
dataRsp
;
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockData
)
==
pRsp
->
blockNum
);
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockDataLen
)
==
pRsp
->
blockNum
);
#if 0
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
A
SSERT
(
!
pRsp
->
withSchema
);
A
SSERT
(
taosArrayGetSize
(
pRsp
->
blockSchema
)
==
0
);
A(!pRsp->withSchema);
A(taosArrayGetSize(pRsp->blockSchema) == 0);
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
/*if (pRsp->blockNum > 0) {*/
/*ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);*/
/*} else {*/
ASSERT
(
pRsp
->
rspOffset
.
version
>
pRsp
->
reqOffset
.
version
);
/*}*/
A(pRsp->rspOffset.version > pRsp->reqOffset.version);
}
#endif
int32_t
len
=
0
;
int32_t
code
=
0
;
...
...
@@ -223,19 +221,21 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
}
int32_t
tqSendDataRsp
(
STQ
*
pTq
,
const
SRpcMsg
*
pMsg
,
const
SMqPollReq
*
pReq
,
const
SMqDataRsp
*
pRsp
)
{
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockData
)
==
pRsp
->
blockNum
);
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockDataLen
)
==
pRsp
->
blockNum
);
#if 0
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
A
SSERT
(
!
pRsp
->
withSchema
);
A
SSERT
(
taosArrayGetSize
(
pRsp
->
blockSchema
)
==
0
);
A(!pRsp->withSchema);
A(taosArrayGetSize(pRsp->blockSchema) == 0);
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
if (pRsp->blockNum > 0) {
A
SSERT
(
pRsp
->
rspOffset
.
version
>
pRsp
->
reqOffset
.
version
);
A(pRsp->rspOffset.version > pRsp->reqOffset.version);
} else {
A
SSERT
(
pRsp
->
rspOffset
.
version
>=
pRsp
->
reqOffset
.
version
);
A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
}
}
#endif
int32_t
len
=
0
;
int32_t
code
=
0
;
...
...
@@ -279,22 +279,24 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
}
int32_t
tqSendTaosxRsp
(
STQ
*
pTq
,
const
SRpcMsg
*
pMsg
,
const
SMqPollReq
*
pReq
,
const
STaosxRsp
*
pRsp
)
{
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockData
)
==
pRsp
->
blockNum
);
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockDataLen
)
==
pRsp
->
blockNum
);
#if 0
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
if (pRsp->withSchema) {
A
SSERT
(
taosArrayGetSize
(
pRsp
->
blockSchema
)
==
pRsp
->
blockNum
);
A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
} else {
A
SSERT
(
taosArrayGetSize
(
pRsp
->
blockSchema
)
==
0
);
A(taosArrayGetSize(pRsp->blockSchema) == 0);
}
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
if (pRsp->blockNum > 0) {
A
SSERT
(
pRsp
->
rspOffset
.
version
>
pRsp
->
reqOffset
.
version
);
A(pRsp->rspOffset.version > pRsp->reqOffset.version);
} else {
A
SSERT
(
pRsp
->
rspOffset
.
version
>=
pRsp
->
reqOffset
.
version
);
A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
}
}
#endif
int32_t
len
=
0
;
int32_t
code
=
0
;
...
...
@@ -348,7 +350,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m
SDecoder
decoder
;
tDecoderInit
(
&
decoder
,
msg
,
msgLen
);
if
(
tDecodeSTqOffset
(
&
decoder
,
&
offset
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
tDecoderClear
(
&
decoder
);
...
...
@@ -362,8 +363,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m
if
(
offset
.
val
.
version
+
1
==
version
)
{
offset
.
val
.
version
+=
1
;
}
}
else
{
ASSERT
(
0
);
/*} else {*/
/*A(0);*/
}
STqOffset
*
pOffset
=
tqOffsetRead
(
pTq
->
pOffsetStore
,
offset
.
subKey
);
if
(
pOffset
!=
NULL
&&
tqOffsetLessOrEqual
(
&
offset
,
pOffset
))
{
...
...
@@ -371,7 +372,6 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m
}
if
(
tqOffsetWrite
(
pTq
->
pOffsetStore
,
&
offset
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -434,7 +434,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
}
#endif
ASSERT
(
subType
==
TOPIC_SUB_TYPE__COLUMN
);
/*A(subType == TOPIC_SUB_TYPE__COLUMN);*/
pRsp
->
withSchema
=
false
;
return
0
;
...
...
@@ -473,7 +473,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
// 1.find handle
STqHandle
*
pHandle
=
taosHashGet
(
pTq
->
pHandle
,
req
.
subKey
,
strlen
(
req
.
subKey
));
/*ASSERT(pHandle);*/
if
(
pHandle
==
NULL
)
{
tqError
(
"tmq poll: no consumer handle for consumer:%"
PRId64
", in vgId:%d, subkey %s"
,
consumerId
,
TD_VID
(
pTq
->
pVnode
),
req
.
subKey
);
...
...
@@ -560,7 +559,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqInitDataRsp
(
&
dataRsp
,
&
req
,
pHandle
->
execHandle
.
subType
);
// lock
taosWLockLatch
(
&
pTq
->
pushLock
);
tqScanData
(
pTq
,
pHandle
,
&
dataRsp
,
&
fetchOffsetNew
);
if
(
tqScanData
(
pTq
,
pHandle
,
&
dataRsp
,
&
fetchOffsetNew
)
<
0
)
{
return
-
1
;
}
#if 1
if
(
dataRsp
.
blockNum
==
0
&&
dataRsp
.
reqOffset
.
type
==
TMQ_OFFSET__LOG
&&
...
...
@@ -599,7 +600,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
}
// for taosx
ASSERT
(
pHandle
->
execHandle
.
subType
!=
TOPIC_SUB_TYPE__COLUMN
);
/*A(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN);*/
SMqMetaRsp
metaRsp
=
{
0
};
...
...
@@ -607,7 +608,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqInitTaosxRsp
(
&
taosxRsp
,
&
req
);
if
(
fetchOffsetNew
.
type
!=
TMQ_OFFSET__LOG
)
{
tqScanTaosx
(
pTq
,
pHandle
,
&
taosxRsp
,
&
metaRsp
,
&
fetchOffsetNew
);
if
(
tqScanTaosx
(
pTq
,
pHandle
,
&
taosxRsp
,
&
metaRsp
,
&
fetchOffsetNew
)
<
0
)
{
return
-
1
;
}
if
(
metaRsp
.
metaRspLen
>
0
)
{
if
(
tqSendMetaPollRsp
(
pTq
,
pMsg
,
&
req
,
&
metaRsp
)
<
0
)
{
...
...
@@ -690,8 +693,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
}
}
else
{
ASSERT
(
pHandle
->
fetchMeta
);
ASSERT
(
IS_META_MSG
(
pHead
->
msgType
));
/*A(pHandle->fetchMeta);*/
/*A(IS_META_MSG(pHead->msgType));*/
tqDebug
(
"fetch meta msg, ver:%"
PRId64
", type:%d"
,
pHead
->
version
,
pHead
->
msgType
);
tqOffsetResetToLog
(
&
metaRsp
.
rspOffset
,
fetchVer
);
metaRsp
.
resMsgType
=
pHead
->
msgType
;
...
...
@@ -808,7 +811,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
// TODO version should be assigned and refed during preprocess
SWalRef
*
pRef
=
walRefCommittedVer
(
pTq
->
pVnode
->
pWal
);
if
(
pRef
==
NULL
)
{
ASSERT
(
0
);
return
-
1
;
}
int64_t
ver
=
pRef
->
refVer
;
...
...
@@ -829,12 +831,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
pHandle
->
execHandle
.
task
=
qCreateQueueExecTaskInfo
(
pHandle
->
execHandle
.
execCol
.
qmsg
,
&
handle
,
&
pHandle
->
execHandle
.
numOfCols
,
NULL
);
ASSERT
(
pHandle
->
execHandle
.
task
);
/*A(pHandle->execHandle.task);*/
void
*
scanner
=
NULL
;
qExtractStreamScanner
(
pHandle
->
execHandle
.
task
,
&
scanner
);
ASSERT
(
scanner
);
/*A(scanner);*/
pHandle
->
execHandle
.
pExecReader
=
qExtractReaderFromStreamScanner
(
scanner
);
ASSERT
(
pHandle
->
execHandle
.
pExecReader
);
/*A(pHandle->execHandle.pExecReader);*/
}
else
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__DB
)
{
pHandle
->
pWalReader
=
walOpenReader
(
pTq
->
pVnode
->
pWal
,
NULL
);
pHandle
->
execHandle
.
pExecReader
=
tqOpenReader
(
pTq
->
pVnode
);
...
...
@@ -867,19 +869,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
taosHashPut
(
pTq
->
pHandle
,
req
.
subKey
,
strlen
(
req
.
subKey
),
pHandle
,
sizeof
(
STqHandle
));
tqDebug
(
"try to persist handle %s consumer %"
PRId64
,
req
.
subKey
,
pHandle
->
consumerId
);
if
(
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
)
<
0
)
{
// TODO
ASSERT
(
0
);
}
}
else
{
/*ASSERT(pExec->consumerId == req.oldConsumerId);*/
// TODO handle qmsg and exec modification
atomic_store_32
(
&
pHandle
->
epoch
,
-
1
);
atomic_store_64
(
&
pHandle
->
consumerId
,
req
.
newConsumerId
);
atomic_add_fetch_32
(
&
pHandle
->
epoch
,
1
);
taosMemoryFree
(
req
.
qmsg
);
if
(
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
)
<
0
)
{
// TODO
ASSERT
(
0
);
}
// close handle
}
...
...
@@ -888,9 +885,11 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
}
int32_t
tqExpandTask
(
STQ
*
pTq
,
SStreamTask
*
pTask
,
int64_t
ver
)
{
#if 0
if (pTask->taskLevel == TASK_LEVEL__AGG) {
A
SSERT
(
taosArrayGetSize
(
pTask
->
childEpInfo
)
!=
0
);
A(taosArrayGetSize(pTask->childEpInfo) != 0);
}
#endif
pTask
->
refCnt
=
1
;
pTask
->
schedStatus
=
TASK_SCHED_STATUS__INACTIVE
;
...
...
@@ -927,7 +926,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
.
pStateBackend
=
pTask
->
pState
,
};
pTask
->
exec
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
handle
);
ASSERT
(
pTask
->
exec
.
executor
);
if
(
pTask
->
exec
.
executor
==
NULL
)
{
return
-
1
;
}
}
else
if
(
pTask
->
taskLevel
==
TASK_LEVEL__AGG
)
{
pTask
->
pState
=
streamStateOpen
(
pTq
->
pStreamMeta
->
path
,
pTask
,
false
,
-
1
,
-
1
);
...
...
@@ -940,7 +941,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
.
pStateBackend
=
pTask
->
pState
,
};
pTask
->
exec
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
mgHandle
);
ASSERT
(
pTask
->
exec
.
executor
);
if
(
pTask
->
exec
.
executor
==
NULL
)
{
return
-
1
;
}
}
// sink
...
...
@@ -952,12 +955,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask
->
tbSink
.
vnode
=
pTq
->
pVnode
;
pTask
->
tbSink
.
tbSinkFunc
=
tqSinkToTablePipeline
;
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
);
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
->
pSchema
);
/*A(pTask->tbSink.pSchemaWrapper);*/
/*A(pTask->tbSink.pSchemaWrapper->pSchema);*/
pTask
->
tbSink
.
pTSchema
=
tdGetSTSChemaFromSSChema
(
pTask
->
tbSink
.
pSchemaWrapper
->
pSchema
,
pTask
->
tbSink
.
pSchemaWrapper
->
nCols
,
1
);
ASSERT
(
pTask
->
tbSink
.
pTSchema
);
/*A(pTask->tbSink.pTSchema);*/
}
streamSetupTrigger
(
pTask
);
...
...
@@ -1003,7 +1006,8 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t
len
;
tEncodeSize
(
tEncodeSStreamTaskCheckRsp
,
&
rsp
,
len
,
code
);
if
(
code
<
0
)
{
ASSERT
(
0
);
tqError
(
"unable to encode rsp %d"
,
__LINE__
);
return
-
1
;
}
void
*
buf
=
rpcMallocCont
(
sizeof
(
SMsgHead
)
+
len
);
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
req
.
upstreamNodeId
);
...
...
@@ -1096,12 +1100,10 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
if
(
pTask
==
NULL
)
{
return
-
1
;
}
ASSERT
(
pReq
->
taskId
==
pTask
->
taskId
);
// check param
int64_t
fillVer1
=
pTask
->
startVer
;
if
(
fillVer1
<=
0
)
{
ASSERT
(
0
);
streamMetaReleaseTask
(
pTq
->
pStreamMeta
,
pTask
);
return
-
1
;
}
...
...
@@ -1296,7 +1298,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
}
int32_t
ref
=
atomic_sub_fetch_32
(
pRef
,
1
);
ASSERT
(
ref
>=
0
);
/*A(ref >= 0);*/
if
(
ref
==
0
)
{
blockDataDestroy
(
pDelBlock
);
taosMemoryFree
(
pRef
);
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
2a7de0cd
...
...
@@ -29,7 +29,6 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
int32_t
actualLen
=
blockEncode
(
pBlock
,
pRetrieve
->
data
,
numOfCols
);
actualLen
+=
sizeof
(
SRetrieveTableRsp
);
ASSERT
(
actualLen
<=
dataStrLen
);
taosArrayPush
(
pRsp
->
blockDataLen
,
&
actualLen
);
taosArrayPush
(
pRsp
->
blockData
,
&
buf
);
return
0
;
...
...
@@ -62,7 +61,6 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i
int32_t
tqScanData
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
pOffset
)
{
const
STqExecHandle
*
pExec
=
&
pHandle
->
execHandle
;
ASSERT
(
pExec
->
subType
==
TOPIC_SUB_TYPE__COLUMN
);
qTaskInfo_t
task
=
pExec
->
task
;
...
...
@@ -87,7 +85,8 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
uint64_t
ts
=
0
;
tqDebug
(
"vgId:%d, tmq task start to execute"
,
pTq
->
pVnode
->
config
.
vgId
);
if
(
qExecTask
(
task
,
&
pDataBlock
,
&
ts
)
<
0
)
{
ASSERT
(
0
);
tqError
(
"vgId:%d task exec error since %s"
,
pTq
->
pVnode
->
config
.
vgId
,
terrstr
());
return
-
1
;
}
tqDebug
(
"vgId:%d, tmq task executed, get %p"
,
pTq
->
pVnode
->
config
.
vgId
,
pDataBlock
);
...
...
@@ -105,10 +104,14 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
}
if
(
qStreamExtractOffset
(
task
,
&
pRsp
->
rspOffset
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
ASSERT
(
pRsp
->
rspOffset
.
type
!=
0
);
if
(
pRsp
->
rspOffset
.
type
==
0
)
{
tqError
(
"expected rsp offset: type %d %"
PRId64
" %"
PRId64
" %"
PRId64
,
pRsp
->
rspOffset
.
type
,
pRsp
->
rspOffset
.
ts
,
pRsp
->
rspOffset
.
uid
,
pRsp
->
rspOffset
.
version
);
return
-
1
;
}
if
(
pRsp
->
withTbName
)
{
if
(
pRsp
->
rspOffset
.
type
==
TMQ_OFFSET__LOG
)
{
...
...
@@ -118,7 +121,6 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
pRsp
->
withTbName
=
false
;
}
}
ASSERT
(
pRsp
->
withSchema
==
false
);
return
0
;
}
...
...
@@ -148,7 +150,8 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
uint64_t
ts
=
0
;
tqDebug
(
"tmqsnap task start to execute"
);
if
(
qExecTask
(
task
,
&
pDataBlock
,
&
ts
)
<
0
)
{
ASSERT
(
0
);
tqError
(
"vgId:%d task exec error since %s"
,
pTq
->
pVnode
->
config
.
vgId
,
terrstr
());
return
-
1
;
}
tqDebug
(
"tmqsnap task execute end, get %p"
,
pDataBlock
);
...
...
@@ -215,17 +218,20 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
break
;
}
if
(
qStreamExtractOffset
(
task
,
&
pRsp
->
rspOffset
)
<
0
)
{
ASSERT
(
0
);
qStreamExtractOffset
(
task
,
&
pRsp
->
rspOffset
);
if
(
pRsp
->
rspOffset
.
type
==
0
)
{
tqError
(
"expected rsp offset: type %d %"
PRId64
" %"
PRId64
" %"
PRId64
,
pRsp
->
rspOffset
.
type
,
pRsp
->
rspOffset
.
ts
,
pRsp
->
rspOffset
.
uid
,
pRsp
->
rspOffset
.
version
);
return
-
1
;
}
ASSERT
(
pRsp
->
rspOffset
.
type
!=
0
);
return
0
;
}
int32_t
tqTaosxScanLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
SSubmitReq
*
pReq
,
STaosxRsp
*
pRsp
)
{
STqExecHandle
*
pExec
=
&
pHandle
->
execHandle
;
ASSERT
(
pExec
->
subType
!=
TOPIC_SUB_TYPE__COLUMN
);
/*A(pExec->subType != TOPIC_SUB_TYPE__COLUMN);*/
SArray
*
pBlocks
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
SArray
*
pSchemas
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
2a7de0cd
...
...
@@ -71,17 +71,14 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
int32_t
tqMetaOpen
(
STQ
*
pTq
)
{
if
(
tdbOpen
(
pTq
->
path
,
16
*
1024
,
1
,
&
pTq
->
pMetaDB
,
0
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
if
(
tdbTbOpen
(
"tq.db"
,
-
1
,
-
1
,
NULL
,
pTq
->
pMetaDB
,
&
pTq
->
pExecStore
,
0
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
if
(
tdbTbOpen
(
"tq.check.db"
,
-
1
,
-
1
,
NULL
,
pTq
->
pMetaDB
,
&
pTq
->
pCheckStore
,
0
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -135,19 +132,19 @@ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
if
(
tdbBegin
(
pTq
->
pMetaDB
,
&
txn
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
ASSERT
(
0
)
;
return
-
1
;
}
if
(
tdbTbDelete
(
pTq
->
pCheckStore
,
key
,
(
int
)
strlen
(
key
),
txn
)
<
0
)
{
/*ASSERT(0);*/
tqWarn
(
"vgId:%d, tq try delete checkinfo failed %s"
,
pTq
->
pVnode
->
config
.
vgId
,
key
);
}
if
(
tdbCommit
(
pTq
->
pMetaDB
,
txn
)
<
0
)
{
ASSERT
(
0
)
;
return
-
1
;
}
if
(
tdbPostCommit
(
pTq
->
pMetaDB
,
txn
)
<
0
)
{
ASSERT
(
0
)
;
return
-
1
;
}
return
0
;
...
...
@@ -156,7 +153,6 @@ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
int32_t
tqMetaRestoreCheckInfo
(
STQ
*
pTq
)
{
TBC
*
pCur
=
NULL
;
if
(
tdbTbcOpen
(
pTq
->
pCheckStore
,
&
pCur
,
NULL
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -197,40 +193,42 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
int32_t
code
;
int32_t
vlen
;
tEncodeSize
(
tEncodeSTqHandle
,
pHandle
,
vlen
,
code
);
ASSERT
(
code
==
0
);
if
(
code
<
0
)
{
return
-
1
;
}
tqDebug
(
"tq save %s(%d) consumer %"
PRId64
" vgId:%d"
,
pHandle
->
subKey
,
(
int32_t
)
strlen
(
pHandle
->
subKey
),
pHandle
->
consumerId
,
TD_VID
(
pTq
->
pVnode
));
void
*
buf
=
taosMemoryCalloc
(
1
,
vlen
);
if
(
buf
==
NULL
)
{
ASSERT
(
0
)
;
return
-
1
;
}
SEncoder
encoder
;
tEncoderInit
(
&
encoder
,
buf
,
vlen
);
if
(
tEncodeSTqHandle
(
&
encoder
,
pHandle
)
<
0
)
{
ASSERT
(
0
)
;
return
-
1
;
}
TXN
*
txn
;
if
(
tdbBegin
(
pTq
->
pMetaDB
,
&
txn
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
ASSERT
(
0
)
;
return
-
1
;
}
if
(
tdbTbUpsert
(
pTq
->
pExecStore
,
key
,
(
int
)
strlen
(
key
),
buf
,
vlen
,
txn
)
<
0
)
{
ASSERT
(
0
)
;
return
-
1
;
}
if
(
tdbCommit
(
pTq
->
pMetaDB
,
txn
)
<
0
)
{
ASSERT
(
0
)
;
return
-
1
;
}
if
(
tdbPostCommit
(
pTq
->
pMetaDB
,
txn
)
<
0
)
{
ASSERT
(
0
)
;
return
-
1
;
}
tEncoderClear
(
&
encoder
);
...
...
@@ -243,19 +241,18 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
if
(
tdbBegin
(
pTq
->
pMetaDB
,
&
txn
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
ASSERT
(
0
)
;
return
-
1
;
}
if
(
tdbTbDelete
(
pTq
->
pExecStore
,
key
,
(
int
)
strlen
(
key
),
txn
)
<
0
)
{
/*ASSERT(0);*/
}
if
(
tdbCommit
(
pTq
->
pMetaDB
,
txn
)
<
0
)
{
ASSERT
(
0
)
;
return
-
1
;
}
if
(
tdbPostCommit
(
pTq
->
pMetaDB
,
txn
)
<
0
)
{
ASSERT
(
0
)
;
return
-
1
;
}
return
0
;
...
...
@@ -264,7 +261,6 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
int32_t
tqMetaRestoreHandle
(
STQ
*
pTq
)
{
TBC
*
pCur
=
NULL
;
if
(
tdbTbcOpen
(
pTq
->
pExecStore
,
&
pCur
,
NULL
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -284,7 +280,6 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
handle
.
pRef
=
walOpenRef
(
pTq
->
pVnode
->
pWal
);
if
(
handle
.
pRef
==
NULL
)
{
ASSERT
(
0
);
return
-
1
;
}
walRefVer
(
handle
.
pRef
,
handle
.
snapshotVer
);
...
...
@@ -300,12 +295,19 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
if
(
handle
.
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
handle
.
execHandle
.
task
=
qCreateQueueExecTaskInfo
(
handle
.
execHandle
.
execCol
.
qmsg
,
&
reader
,
&
handle
.
execHandle
.
numOfCols
,
NULL
);
ASSERT
(
handle
.
execHandle
.
task
);
if
(
handle
.
execHandle
.
task
==
NULL
)
{
tqError
(
"cannot create exec task for %s"
,
handle
.
subKey
);
return
-
1
;
}
void
*
scanner
=
NULL
;
qExtractStreamScanner
(
handle
.
execHandle
.
task
,
&
scanner
);
ASSERT
(
scanner
);
if
(
scanner
==
NULL
)
{
tqError
(
"cannot extract stream scanner for %s"
,
handle
.
subKey
);
}
handle
.
execHandle
.
pExecReader
=
qExtractReaderFromStreamScanner
(
scanner
);
ASSERT
(
handle
.
execHandle
.
pExecReader
);
if
(
handle
.
execHandle
.
pExecReader
==
NULL
)
{
tqError
(
"cannot extract exec reader for %s"
,
handle
.
subKey
);
}
}
else
if
(
handle
.
execHandle
.
subType
==
TOPIC_SUB_TYPE__DB
)
{
handle
.
pWalReader
=
walOpenReader
(
pTq
->
pVnode
->
pWal
,
NULL
);
handle
.
execHandle
.
pExecReader
=
tqOpenReader
(
pTq
->
pVnode
);
...
...
source/dnode/vnode/src/tq/tqOffset.c
浏览文件 @
2a7de0cd
...
...
@@ -40,26 +40,23 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
if
(
code
==
0
)
{
break
;
}
else
{
ASSERT
(
0
);
// TODO handle error
return
-
1
;
}
}
int32_t
size
=
htonl
(
head
.
size
);
void
*
memBuf
=
taosMemoryCalloc
(
1
,
size
);
if
((
code
=
taosReadFile
(
pFile
,
memBuf
,
size
))
!=
size
)
{
ASSERT
(
0
);
// TODO handle error
return
-
1
;
}
STqOffset
offset
;
SDecoder
decoder
;
tDecoderInit
(
&
decoder
,
memBuf
,
size
);
if
(
tDecodeSTqOffset
(
&
decoder
,
&
offset
)
<
0
)
{
ASSERT
(
0
)
;
return
-
1
;
}
tDecoderClear
(
&
decoder
);
if
(
taosHashPut
(
pStore
->
pHash
,
offset
.
subKey
,
strlen
(
offset
.
subKey
),
&
offset
,
sizeof
(
STqOffset
))
<
0
)
{
ASSERT
(
0
);
// TODO
return
-
1
;
}
taosMemoryFree
(
memBuf
);
}
...
...
@@ -85,7 +82,9 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
}
char
*
fname
=
tqOffsetBuildFName
(
pStore
->
pTq
->
path
,
0
);
if
(
tqOffsetRestoreFromFile
(
pStore
,
fname
)
<
0
)
{
ASSERT
(
0
);
taosMemoryFree
(
fname
);
taosMemoryFree
(
pStore
);
return
NULL
;
}
taosMemoryFree
(
fname
);
return
pStore
;
...
...
@@ -124,7 +123,6 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
const
char
*
sysErrStr
=
strerror
(
errno
);
tqError
(
"vgId:%d, cannot open file %s when commit offset since %s"
,
pStore
->
pTq
->
pVnode
->
config
.
vgId
,
fname
,
sysErrStr
);
ASSERT
(
0
);
return
-
1
;
}
taosMemoryFree
(
fname
);
...
...
@@ -136,9 +134,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
int32_t
bodyLen
;
int32_t
code
;
tEncodeSize
(
tEncodeSTqOffset
,
pOffset
,
bodyLen
,
code
);
ASSERT
(
code
==
0
);
if
(
code
<
0
)
{
ASSERT
(
0
);
taosHashCancelIterate
(
pStore
->
pHash
,
pIter
);
return
-
1
;
}
...
...
@@ -154,7 +150,6 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
// write file
int64_t
writeLen
;
if
((
writeLen
=
taosWriteFile
(
pFile
,
buf
,
totLen
))
!=
totLen
)
{
ASSERT
(
0
);
tqError
(
"write offset incomplete, len %d, write len %"
PRId64
,
bodyLen
,
writeLen
);
taosHashCancelIterate
(
pStore
->
pHash
,
pIter
);
taosMemoryFree
(
buf
);
...
...
source/dnode/vnode/src/tq/tqOffsetSnapshot.c
浏览文件 @
2a7de0cd
...
...
@@ -56,24 +56,28 @@ int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData) {
TdFilePtr
pFile
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
if
(
pFile
==
NULL
)
{
taosMemoryFree
(
fname
);
return
0
;
return
-
1
;
}
int64_t
sz
=
0
;
if
(
taosStatFile
(
fname
,
&
sz
,
NULL
)
<
0
)
{
ASSERT
(
0
);
taosCloseFile
(
&
pFile
);
taosMemoryFree
(
fname
);
return
-
1
;
}
taosMemoryFree
(
fname
);
SSnapDataHdr
*
buf
=
taosMemoryCalloc
(
1
,
sz
+
sizeof
(
SSnapDataHdr
));
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
taosCloseFile
(
&
pFile
);
return
terrno
;
}
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SSnapDataHdr
));
int64_t
contLen
=
taosReadFile
(
pFile
,
abuf
,
sz
);
if
(
contLen
!=
sz
)
{
ASSERT
(
0
);
taosCloseFile
(
&
pFile
);
taosMemoryFree
(
buf
);
return
-
1
;
}
buf
->
size
=
sz
;
...
...
@@ -122,14 +126,17 @@ int32_t tqOffsetWriterClose(STqOffsetWriter** ppWriter, int8_t rollback) {
if
(
rollback
)
{
if
(
taosRemoveFile
(
pWriter
->
fname
)
<
0
)
{
ASSERT
(
0
);
taosMemoryFree
(
fname
);
return
-
1
;
}
}
else
{
if
(
taosRenameFile
(
pWriter
->
fname
,
fname
)
<
0
)
{
ASSERT
(
0
);
taosMemoryFree
(
fname
);
return
-
1
;
}
if
(
tqOffsetRestoreFromFile
(
pTq
->
pOffsetStore
,
fname
)
<
0
)
{
ASSERT
(
0
);
taosMemoryFree
(
fname
);
return
-
1
;
}
}
taosMemoryFree
(
fname
);
...
...
@@ -146,14 +153,13 @@ int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nDa
TdFilePtr
pFile
=
taosOpenFile
(
pWriter
->
fname
,
TD_FILE_CREATE
|
TD_FILE_WRITE
);
SSnapDataHdr
*
pHdr
=
(
SSnapDataHdr
*
)
pData
;
int64_t
size
=
pHdr
->
size
;
ASSERT
(
size
==
nData
-
sizeof
(
SSnapDataHdr
));
if
(
pFile
)
{
int64_t
contLen
=
taosWriteFile
(
pFile
,
pHdr
->
data
,
size
);
if
(
contLen
!=
size
)
{
ASSERT
(
0
);
taosCloseFile
(
&
pFile
);
return
-
1
;
}
}
else
{
ASSERT
(
0
);
return
-
1
;
}
return
0
;
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
2a7de0cd
...
...
@@ -25,9 +25,7 @@ void tqTmrRspFunc(void* param, void* tmrId) {
static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataRsp* pRsp) {
SStreamDataSubmit* pSubmit = *ppSubmit;
while (pSubmit != NULL) {
ASSERT(pSubmit->ver == pHandle->pushHandle.processedVer + 1);
if (tqLogScanExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) {
/*ASSERT(0);*/
}
// update processed
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
...
...
@@ -160,8 +158,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
if (msgType == TDMT_VND_SUBMIT) {
tqLogScanExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId);
} else {
// TODO
ASSERT(0);
tqError("tq push unexpected msg type %d", msgType);
}
if (rsp.blockNum == 0) {
...
...
@@ -169,9 +166,6 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
continue;
}
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
rsp.rspOffset = fetchOffset;
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
...
...
@@ -263,7 +257,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
SSDataBlock
*
pDataBlock
=
NULL
;
uint64_t
ts
=
0
;
if
(
qExecTask
(
task
,
&
pDataBlock
,
&
ts
)
<
0
)
{
ASSERT
(
0
);
tqDebug
(
"vgId:%d, tq exec error since %s"
,
pTq
->
pVnode
->
config
.
vgId
,
terrstr
()
);
}
if
(
pDataBlock
==
NULL
)
{
...
...
@@ -282,7 +276,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
// remove from hash
size_t
kLen
;
void
*
key
=
taosHashGetKey
(
pIter
,
&
kLen
);
void
*
keyCopy
=
taosMemory
Malloc
(
kLen
);
void
*
keyCopy
=
taosMemory
Calloc
(
1
,
kLen
+
1
);
memcpy
(
keyCopy
,
key
,
kLen
);
taosArrayPush
(
cachedKeys
,
&
keyCopy
);
...
...
@@ -296,7 +290,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
void
*
key
=
taosArrayGetP
(
cachedKeys
,
i
);
size_t
kLen
=
*
(
size_t
*
)
taosArrayGet
(
cachedKeyLens
,
i
);
if
(
taosHashRemove
(
pTq
->
pPushMgr
,
key
,
kLen
)
!=
0
)
{
ASSERT
(
0
);
tqError
(
"vgId:%d, tq push hash remove key error, key: %s"
,
pTq
->
pVnode
->
config
.
vgId
,
(
char
*
)
key
);
}
}
taosArrayDestroyP
(
cachedKeys
,
(
FDelete
)
taosMemoryFree
);
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
2a7de0cd
...
...
@@ -176,8 +176,6 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
goto
end
;
}
realTbSuid
=
req
.
suid
;
}
else
{
ASSERT
(
0
);
}
end:
...
...
@@ -206,7 +204,6 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
code
=
walFetchBody
(
pHandle
->
pWalReader
,
ppCkHead
);
if
(
code
<
0
)
{
ASSERT
(
0
);
*
fetchOffset
=
offset
;
code
=
-
1
;
goto
END
;
...
...
@@ -220,7 +217,6 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
if
(
IS_META_MSG
(
pHead
->
msgType
))
{
code
=
walFetchBody
(
pHandle
->
pWalReader
,
ppCkHead
);
if
(
code
<
0
)
{
ASSERT
(
0
);
*
fetchOffset
=
offset
;
code
=
-
1
;
goto
END
;
...
...
@@ -238,7 +234,6 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
}
code
=
walSkipFetchBody
(
pHandle
->
pWalReader
,
*
ppCkHead
);
if
(
code
<
0
)
{
ASSERT
(
0
);
*
fetchOffset
=
offset
;
code
=
-
1
;
goto
END
;
...
...
@@ -297,11 +292,8 @@ void tqCloseReader(STqReader* pReader) {
int32_t
tqSeekVer
(
STqReader
*
pReader
,
int64_t
ver
)
{
if
(
walReadSeekVer
(
pReader
->
pWalReader
,
ver
)
<
0
)
{
ASSERT
(
pReader
->
pWalReader
->
curInvalid
);
ASSERT
(
pReader
->
pWalReader
->
curVersion
==
ver
);
return
-
1
;
}
ASSERT
(
pReader
->
pWalReader
->
curVersion
==
ver
);
return
0
;
}
...
...
@@ -317,7 +309,6 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
ret
->
offset
.
version
=
pReader
->
ver
;
ret
->
fetchType
=
FETCH_TYPE__NONE
;
tqDebug
(
"return offset %"
PRId64
", no more valid"
,
ret
->
offset
.
version
);
ASSERT
(
ret
->
offset
.
version
>=
0
);
return
-
1
;
}
void
*
body
=
pReader
->
pWalReader
->
pHead
->
head
.
body
;
...
...
@@ -340,7 +331,6 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
memset
(
&
ret
->
data
,
0
,
sizeof
(
SSDataBlock
));
int32_t
code
=
tqRetrieveDataBlock
(
&
ret
->
data
,
pReader
);
if
(
code
!=
0
||
ret
->
data
.
info
.
rows
==
0
)
{
ASSERT
(
0
);
continue
;
}
ret
->
fetchType
=
FETCH_TYPE__DATA
;
...
...
@@ -351,7 +341,6 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
if
(
fromProcessedMsg
)
{
ret
->
offset
.
type
=
TMQ_OFFSET__LOG
;
ret
->
offset
.
version
=
pReader
->
ver
;
ASSERT
(
pReader
->
ver
>=
0
);
ret
->
fetchType
=
FETCH_TYPE__SEP
;
tqDebug
(
"return offset %"
PRId64
", processed finish"
,
ret
->
offset
.
version
);
return
0
;
...
...
@@ -434,7 +423,6 @@ bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) {
}
if
(
pHandle
->
pBlock
==
NULL
)
return
false
;
ASSERT
(
pHandle
->
tbIdHash
==
NULL
);
void
*
ret
=
taosHashGet
(
filterOutUids
,
&
pHandle
->
msgIter
.
uid
,
sizeof
(
int64_t
));
if
(
ret
==
NULL
)
{
return
true
;
...
...
@@ -453,7 +441,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
if
(
pReader
->
pSchema
==
NULL
)
{
tqWarn
(
"cannot found tsschema for table: uid:%"
PRId64
" (suid:%"
PRId64
"), version %d, possibly dropped table"
,
pReader
->
msgIter
.
uid
,
pReader
->
msgIter
.
suid
,
pReader
->
cachedSchemaVer
);
/*ASSERT(0);*/
pReader
->
cachedSchemaSuid
=
0
;
terrno
=
TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND
;
return
-
1
;
...
...
@@ -464,7 +451,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
if
(
pReader
->
pSchemaWrapper
==
NULL
)
{
tqWarn
(
"cannot found schema wrapper for table: suid:%"
PRId64
", version %d, possibly dropped table"
,
pReader
->
msgIter
.
uid
,
pReader
->
cachedSchemaVer
);
/*ASSERT(0);*/
pReader
->
cachedSchemaSuid
=
0
;
terrno
=
TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND
;
return
-
1
;
...
...
@@ -567,7 +553,6 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
if
(
pReader
->
pSchema
==
NULL
)
{
tqWarn
(
"cannot found tsschema for table: uid:%"
PRId64
" (suid:%"
PRId64
"), version %d, possibly dropped table"
,
pReader
->
msgIter
.
uid
,
pReader
->
msgIter
.
suid
,
pReader
->
cachedSchemaVer
);
/*ASSERT(0);*/
pReader
->
cachedSchemaSuid
=
0
;
terrno
=
TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND
;
return
-
1
;
...
...
@@ -578,7 +563,6 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
if
(
pReader
->
pSchemaWrapper
==
NULL
)
{
tqWarn
(
"cannot found schema wrapper for table: suid:%"
PRId64
", version %d, possibly dropped table"
,
pReader
->
msgIter
.
uid
,
pReader
->
cachedSchemaVer
);
/*ASSERT(0);*/
pReader
->
cachedSchemaSuid
=
0
;
terrno
=
TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND
;
return
-
1
;
...
...
@@ -671,8 +655,6 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas
break
;
}
ASSERT
(
sVal
.
valType
!=
TD_VTYPE_NONE
);
if
(
colDataAppend
(
pColData
,
curRow
,
sVal
.
val
,
sVal
.
valType
==
TD_VTYPE_NULL
)
<
0
)
{
goto
FAIL
;
}
...
...
@@ -732,8 +714,6 @@ int tqReaderAddTbUidList(STqReader* pReader, const SArray* tbUidList) {
}
int
tqReaderRemoveTbUidList
(
STqReader
*
pReader
,
const
SArray
*
tbUidList
)
{
ASSERT
(
pReader
->
tbIdHash
!=
NULL
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tbUidList
);
i
++
)
{
int64_t
*
pKey
=
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
taosHashRemove
(
pReader
->
tbIdHash
,
pKey
,
sizeof
(
int64_t
));
...
...
@@ -750,7 +730,10 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
STqHandle
*
pExec
=
(
STqHandle
*
)
pIter
;
if
(
pExec
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
int32_t
code
=
qUpdateQualifiedTableId
(
pExec
->
execHandle
.
task
,
tbUidList
,
isAdd
);
ASSERT
(
code
==
0
);
if
(
code
!=
0
)
{
tqError
(
"update qualified table error for %s"
,
pExec
->
subKey
);
continue
;
}
}
else
if
(
pExec
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__DB
)
{
if
(
!
isAdd
)
{
int32_t
sz
=
taosArrayGetSize
(
tbUidList
);
...
...
@@ -769,7 +752,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
int32_t
code
=
metaGetTableEntryByUidCache
(
&
mr
,
*
id
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:%"
PRIu64
" code:%s"
,
*
id
,
tstrerror
(
terrno
));
t
qError
(
"failed to get table meta, uid:%"
PRIu64
" code:%s"
,
*
id
,
tstrerror
(
terrno
));
continue
;
}
...
...
@@ -790,8 +773,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
else
{
// TODO handle delete table from stb
}
}
else
{
ASSERT
(
0
);
}
}
while
(
1
)
{
...
...
@@ -800,7 +781,10 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
SStreamTask
*
pTask
=
*
(
SStreamTask
**
)
pIter
;
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
)
{
int32_t
code
=
qUpdateQualifiedTableId
(
pTask
->
exec
.
executor
,
tbUidList
,
isAdd
);
ASSERT
(
code
==
0
);
if
(
code
!=
0
)
{
tqError
(
"update qualified table error for stream task %d"
,
pTask
->
taskId
);
continue
;
}
}
}
return
0
;
...
...
source/dnode/vnode/src/tq/tqSink.c
浏览文件 @
2a7de0cd
...
...
@@ -19,7 +19,6 @@
int32_t
tqBuildDeleteReq
(
SVnode
*
pVnode
,
const
char
*
stbFullName
,
const
SSDataBlock
*
pDataBlock
,
SBatchDeleteReq
*
deleteReq
)
{
ASSERT
(
pDataBlock
->
info
.
type
==
STREAM_DELETE_RESULT
);
int32_t
totRow
=
pDataBlock
->
info
.
rows
;
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pEndTsCol
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
...
...
@@ -334,8 +333,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
int32_t
code
;
tEncodeSize
(
tEncodeSBatchDeleteReq
,
&
deleteReq
,
len
,
code
);
if
(
code
<
0
)
{
//
ASSERT
(
0
)
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
;
}
SEncoder
encoder
;
void
*
serializedDeleteReq
=
rpcMallocCont
(
len
+
sizeof
(
SMsgHead
));
...
...
@@ -559,7 +558,6 @@ void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data
tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size);
ASSERT(pTask->tbSink.pTSchema);
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true,
pTask->tbSink.stbUid, pTask->tbSink.stbFullName, &deleteReq);
...
...
@@ -570,10 +568,6 @@ void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data
int32_t code;
int32_t len;
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
if (code < 0) {
//
ASSERT(0);
}
SEncoder encoder;
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
...
...
source/dnode/vnode/src/tq/tqSnapshot.c
浏览文件 @
2a7de0cd
...
...
@@ -100,8 +100,6 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
}
}
ASSERT
(
pVal
&&
vLen
);
*
ppData
=
taosMemoryMalloc
(
sizeof
(
SSnapDataHdr
)
+
vLen
);
if
(
*
ppData
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
source/dnode/vnode/src/tq/tqStreamStateSnap.c
浏览文件 @
2a7de0cd
...
...
@@ -100,8 +100,6 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
}
}
ASSERT
(
pVal
&&
vLen
);
*
ppData
=
taosMemoryMalloc
(
sizeof
(
SSnapDataHdr
)
+
vLen
);
if
(
*
ppData
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -168,7 +166,6 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
if
(
rollback
)
{
tdbAbort
(
pWriter
->
pTq
->
pMetaDB
,
pWriter
->
txn
);
ASSERT
(
0
);
}
else
{
code
=
tdbCommit
(
pWriter
->
pTq
->
pMetaDB
,
pWriter
->
txn
);
if
(
code
)
goto
_err
;
...
...
source/dnode/vnode/src/tq/tqStreamTaskSnap.c
浏览文件 @
2a7de0cd
...
...
@@ -100,8 +100,6 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
}
}
ASSERT
(
pVal
&&
vLen
);
*
ppData
=
taosMemoryMalloc
(
sizeof
(
SSnapDataHdr
)
+
vLen
);
if
(
*
ppData
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -146,7 +144,7 @@ int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** p
pWriter
->
sver
=
sver
;
pWriter
->
ever
=
ever
;
if
(
tdbBegin
(
pTq
->
pMeta
Store
,
&
pWriter
->
txn
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
0
)
<
0
)
{
if
(
tdbBegin
(
pTq
->
pMeta
DB
,
&
pWriter
->
txn
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
0
)
<
0
)
{
code
=
-
1
;
taosMemoryFree
(
pWriter
);
goto
_err
;
...
...
@@ -167,12 +165,11 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
STQ
*
pTq
=
pWriter
->
pTq
;
if
(
rollback
)
{
tdbAbort
(
pWriter
->
pTq
->
pMetaStore
,
pWriter
->
txn
);
ASSERT
(
0
);
tdbAbort
(
pWriter
->
pTq
->
pMetaDB
,
pWriter
->
txn
);
}
else
{
code
=
tdbCommit
(
pWriter
->
pTq
->
pMeta
Store
,
pWriter
->
txn
);
code
=
tdbCommit
(
pWriter
->
pTq
->
pMeta
DB
,
pWriter
->
txn
);
if
(
code
)
goto
_err
;
code
=
tdbPostCommit
(
pWriter
->
pTq
->
pMeta
Store
,
pWriter
->
txn
);
code
=
tdbPostCommit
(
pWriter
->
pTq
->
pMeta
DB
,
pWriter
->
txn
);
if
(
code
)
goto
_err
;
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
2a7de0cd
...
...
@@ -768,8 +768,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
tableListGetGroupList
(
pTaskInfo
->
pTableInfoList
,
pInfo
->
currentGroupId
,
&
pList
,
&
num
);
ASSERT
(
pInfo
->
base
.
dataReader
==
NULL
);
int32_t
code
=
tsdbReaderOpen
(
pInfo
->
base
.
readHandle
.
vnode
,
&
pInfo
->
base
.
cond
,
pList
,
num
,
pInfo
->
pResBlock
,
(
STsdbReader
**
)
&
pInfo
->
base
.
dataReader
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
tsdbReaderOpen
(
pInfo
->
base
.
readHandle
.
vnode
,
&
pInfo
->
base
.
cond
,
pList
,
num
,
pInfo
->
pResBlock
,
(
STsdbReader
**
)
&
pInfo
->
base
.
dataReader
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -986,8 +986,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
SSDataBlock
*
pBlock
=
pTableScanInfo
->
pResBlock
;
STsdbReader
*
pReader
=
NULL
;
int32_t
code
=
tsdbReaderOpen
(
pTableScanInfo
->
base
.
readHandle
.
vnode
,
&
cond
,
&
tblInfo
,
1
,
pBlock
,
(
STsdbReader
**
)
&
pReader
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
tsdbReaderOpen
(
pTableScanInfo
->
base
.
readHandle
.
vnode
,
&
cond
,
&
tblInfo
,
1
,
pBlock
,
(
STsdbReader
**
)
&
pReader
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
...
...
@@ -995,7 +995,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
}
if
(
tsdbNextDataBlock
(
pReader
))
{
/*SSDataBlock* p = */
tsdbRetrieveDataBlock
(
pReader
,
NULL
);
/*SSDataBlock* p = */
tsdbRetrieveDataBlock
(
pReader
,
NULL
);
doSetTagColumnData
(
&
pTableScanInfo
->
base
,
pBlock
,
pTaskInfo
,
pBlock
->
info
.
rows
);
pBlock
->
info
.
id
.
groupId
=
getTableGroupId
(
pTaskInfo
->
pTableInfoList
,
pBlock
->
info
.
id
.
uid
);
}
...
...
@@ -1224,7 +1224,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
SColumnInfoData
*
pSrcUidCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
SColumnInfoData
*
pSrcGpCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
uint64_t
*
srcUidData
=
(
uint64_t
*
)
pSrcUidCol
->
pData
;
uint64_t
*
srcUidData
=
(
uint64_t
*
)
pSrcUidCol
->
pData
;
ASSERT
(
pSrcStartTsCol
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
TSKEY
*
srcStartTsCol
=
(
TSKEY
*
)
pSrcStartTsCol
->
pData
;
TSKEY
*
srcEndTsCol
=
(
TSKEY
*
)
pSrcEndTsCol
->
pData
;
...
...
@@ -1347,6 +1347,36 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
return
code
;
}
#if 0
void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
SExprSupp* pTagCalSup = &pInfo->tagCalSup;
SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return;
if (pBlock == NULL || pBlock->info.rows == 0) return;
void* tag = NULL;
int32_t tagLen = 0;
if (streamStateGetParTag(pState, pBlock->info.id.groupId, &tag, &tagLen) == 0) {
pBlock->info.tagLen = tagLen;
void* pTag = taosMemoryRealloc(pBlock->info.pTag, tagLen);
if (pTag == NULL) {
tdbFree(tag);
taosMemoryFree(pBlock->info.pTag);
pBlock->info.pTag = NULL;
pBlock->info.tagLen = 0;
return;
}
pBlock->info.pTag = pTag;
memcpy(pBlock->info.pTag, tag, tagLen);
tdbFree(tag);
return;
} else {
pBlock->info.pTag = NULL;
}
tdbFree(tag);
}
#endif
void
calBlockTbName
(
SStreamScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
)
{
SExprSupp
*
pTbNameCalSup
=
&
pInfo
->
tbnameCalSup
;
SStreamState
*
pState
=
pInfo
->
pStreamScanOp
->
pTaskInfo
->
streamInfo
.
pState
;
...
...
@@ -1354,10 +1384,12 @@ void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
if
(
pBlock
==
NULL
||
pBlock
->
info
.
rows
==
0
)
return
;
void
*
tbname
=
NULL
;
if
(
streamStateGetParName
(
pInfo
->
pStreamScanOp
->
pTaskInfo
->
streamInfo
.
pState
,
pBlock
->
info
.
id
.
groupId
,
&
tbname
)
<
0
)
{
pBlock
->
info
.
parTbName
[
0
]
=
0
;
}
else
{
if
(
streamStateGetParName
(
pState
,
pBlock
->
info
.
id
.
groupId
,
&
tbname
)
==
0
)
{
memcpy
(
pBlock
->
info
.
parTbName
,
tbname
,
TSDB_TABLE_NAME_LEN
);
tdbFree
(
tbname
);
return
;
}
else
{
pBlock
->
info
.
parTbName
[
0
]
=
0
;
}
tdbFree
(
tbname
);
...
...
@@ -2285,7 +2317,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if
(
pHandle
->
initTableReader
)
{
pTSInfo
->
scanMode
=
TABLE_SCAN__TABLE_ORDER
;
pTSInfo
->
base
.
dataReader
=
NULL
;
code
=
tsdbReaderOpen
(
pHandle
->
vnode
,
&
pTSInfo
->
base
.
cond
,
pList
,
num
,
pTSInfo
->
pResBlock
,
&
pTSInfo
->
base
.
dataReader
,
NULL
);
code
=
tsdbReaderOpen
(
pHandle
->
vnode
,
&
pTSInfo
->
base
.
cond
,
pList
,
num
,
pTSInfo
->
pResBlock
,
&
pTSInfo
->
base
.
dataReader
,
NULL
);
if
(
code
!=
0
)
{
terrno
=
code
;
destroyTableScanOperatorInfo
(
pTableScanOp
);
...
...
@@ -2355,7 +2388,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pOperator
->
exprSupp
.
numOfExprs
=
taosArrayGetSize
(
pInfo
->
pRes
->
pDataBlock
);
__optr_fn_t
nextFn
=
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
?
doStreamScan
:
doQueueScan
;
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
nextFn
,
NULL
,
destroyStreamScanOperatorInfo
,
optrDefaultBufFn
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
nextFn
,
NULL
,
destroyStreamScanOperatorInfo
,
optrDefaultBufFn
,
NULL
);
return
pOperator
;
...
...
@@ -2492,7 +2526,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doTagScan
,
NULL
,
destroyTagScanOperatorInfo
,
optrDefaultBufFn
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doTagScan
,
NULL
,
destroyTagScanOperatorInfo
,
optrDefaultBufFn
,
NULL
);
return
pOperator
;
...
...
@@ -2513,11 +2548,12 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SQueryTableDataCond
*
pQueryCond
=
taosArrayGet
(
pInfo
->
queryConds
,
readIdx
);
int64_t
st
=
taosGetTimestampUs
();
void
*
p
=
tableListGetInfo
(
pTaskInfo
->
pTableInfoList
,
readIdx
+
pInfo
->
tableStartIndex
);
int64_t
st
=
taosGetTimestampUs
();
void
*
p
=
tableListGetInfo
(
pTaskInfo
->
pTableInfoList
,
readIdx
+
pInfo
->
tableStartIndex
);
SReadHandle
*
pHandle
=
&
pInfo
->
base
.
readHandle
;
int32_t
code
=
tsdbReaderOpen
(
pHandle
->
vnode
,
pQueryCond
,
p
,
1
,
pBlock
,
&
pInfo
->
base
.
dataReader
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
tsdbReaderOpen
(
pHandle
->
vnode
,
pQueryCond
,
p
,
1
,
pBlock
,
&
pInfo
->
base
.
dataReader
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
0
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -2915,8 +2951,8 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo*
SSDataBlock
*
pRes
,
char
*
dbName
);
static
void
buildVnodeFilteredTbCount
(
SOperatorInfo
*
pOperator
,
STableCountScanOperatorInfo
*
pInfo
,
STableCountScanSupp
*
pSupp
,
SSDataBlock
*
pRes
,
char
*
dbName
);
static
void
buildVnodeGroupedTableCount
(
SOperatorInfo
*
pOperator
,
STableCountScanOperatorInfo
*
pInfo
,
STableCountScanSupp
*
pSupp
,
SSDataBlock
*
pRes
,
int32_t
vgId
,
char
*
dbName
);
static
void
buildVnodeGroupedTableCount
(
SOperatorInfo
*
pOperator
,
STableCountScanOperatorInfo
*
pInfo
,
STableCountScanSupp
*
pSupp
,
SSDataBlock
*
pRes
,
int32_t
vgId
,
char
*
dbName
);
static
SSDataBlock
*
buildVnodeDbTableCount
(
SOperatorInfo
*
pOperator
,
STableCountScanOperatorInfo
*
pInfo
,
STableCountScanSupp
*
pSupp
,
SSDataBlock
*
pRes
);
static
void
buildSysDbGroupedTableCount
(
SOperatorInfo
*
pOperator
,
STableCountScanOperatorInfo
*
pInfo
,
...
...
@@ -3041,8 +3077,8 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC
setOperatorInfo
(
pOperator
,
"TableCountScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doTableCountScan
,
NULL
,
destoryTableCountScanOperator
,
optrDefaultBufFn
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doTableCountScan
,
NULL
,
destoryTableCountScanOperator
,
optrDefaultBufFn
,
NULL
);
return
pOperator
;
_error:
...
...
source/libs/stream/src/streamState.c
浏览文件 @
2a7de0cd
...
...
@@ -159,6 +159,11 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
goto
_err
;
}
if
(
tdbTbOpen
(
"partag.state.db"
,
sizeof
(
int64_t
),
-
1
,
NULL
,
pState
->
pTdbState
->
db
,
&
pState
->
pTdbState
->
pParTagDb
,
0
)
<
0
)
{
goto
_err
;
}
if
(
streamStateBegin
(
pState
)
<
0
)
{
goto
_err
;
}
...
...
@@ -173,6 +178,7 @@ _err:
tdbTbClose
(
pState
->
pTdbState
->
pFillStateDb
);
tdbTbClose
(
pState
->
pTdbState
->
pSessionStateDb
);
tdbTbClose
(
pState
->
pTdbState
->
pParNameDb
);
tdbTbClose
(
pState
->
pTdbState
->
pParTagDb
);
tdbClose
(
pState
->
pTdbState
->
db
);
streamStateDestroy
(
pState
);
return
NULL
;
...
...
@@ -186,6 +192,7 @@ void streamStateClose(SStreamState* pState) {
tdbTbClose
(
pState
->
pTdbState
->
pFillStateDb
);
tdbTbClose
(
pState
->
pTdbState
->
pSessionStateDb
);
tdbTbClose
(
pState
->
pTdbState
->
pParNameDb
);
tdbTbClose
(
pState
->
pTdbState
->
pParTagDb
);
tdbClose
(
pState
->
pTdbState
->
db
);
streamStateDestroy
(
pState
);
...
...
@@ -821,10 +828,17 @@ _end:
return
res
;
}
int32_t
streamStatePutParTag
(
SStreamState
*
pState
,
int64_t
groupId
,
const
void
*
tag
,
int32_t
tagLen
)
{
return
tdbTbUpsert
(
pState
->
pTdbState
->
pParTagDb
,
&
groupId
,
sizeof
(
int64_t
),
tag
,
tagLen
,
pState
->
pTdbState
->
txn
);
}
int32_t
streamStateGetParTag
(
SStreamState
*
pState
,
int64_t
groupId
,
void
**
tagVal
,
int32_t
*
tagLen
)
{
return
tdbTbGet
(
pState
->
pTdbState
->
pParTagDb
,
&
groupId
,
sizeof
(
int64_t
),
tagVal
,
tagLen
);
}
int32_t
streamStatePutParName
(
SStreamState
*
pState
,
int64_t
groupId
,
const
char
tbname
[
TSDB_TABLE_NAME_LEN
])
{
tdbTbUpsert
(
pState
->
pTdbState
->
pParNameDb
,
&
groupId
,
sizeof
(
int64_t
),
tbname
,
TSDB_TABLE_NAME_LEN
,
pState
->
pTdbState
->
txn
);
return
0
;
return
tdbTbUpsert
(
pState
->
pTdbState
->
pParNameDb
,
&
groupId
,
sizeof
(
int64_t
),
tbname
,
TSDB_TABLE_NAME_LEN
,
pState
->
pTdbState
->
txn
);
}
int32_t
streamStateGetParName
(
SStreamState
*
pState
,
int64_t
groupId
,
void
**
pVal
)
{
...
...
source/libs/wal/src/walRef.c
浏览文件 @
2a7de0cd
...
...
@@ -80,6 +80,7 @@ void walUnrefVer(SWalRef *pRef) {
SWalRef
*
walRefCommittedVer
(
SWal
*
pWal
)
{
SWalRef
*
pRef
=
walOpenRef
(
pWal
);
if
(
pRef
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
taosThreadMutexLock
(
&
pWal
->
mutex
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录