Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
af18697c
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
af18697c
编写于
7月 20, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into fix/avoidRetryMemleak
上级
503e89f0
cfe00471
变更
14
显示空白变更内容
内联
并排
Showing
14 changed file
with
280 addition
and
26 deletion
+280
-26
include/libs/wal/wal.h
include/libs/wal/wal.h
+5
-1
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+4
-0
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+2
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+2
-3
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+2
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+106
-11
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+20
-0
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+2
-0
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+8
-2
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+6
-0
tests/script/tsim/stream/basic1.sim
tests/script/tsim/stream/basic1.sim
+107
-0
tests/script/tsim/stream/sliding.sim
tests/script/tsim/stream/sliding.sim
+14
-6
tests/system-test/2-query/last_row.py
tests/system-test/2-query/last_row.py
+1
-1
tests/system-test/7-tmq/tmqAutoCreateTbl.py
tests/system-test/7-tmq/tmqAutoCreateTbl.py
+1
-1
未找到文件。
include/libs/wal/wal.h
浏览文件 @
af18697c
...
@@ -64,6 +64,7 @@ typedef struct {
...
@@ -64,6 +64,7 @@ typedef struct {
int64_t
verInSnapshotting
;
int64_t
verInSnapshotting
;
int64_t
snapshotVer
;
int64_t
snapshotVer
;
int64_t
commitVer
;
int64_t
commitVer
;
int64_t
appliedVer
;
int64_t
lastVer
;
int64_t
lastVer
;
}
SWalVer
;
}
SWalVer
;
...
@@ -172,6 +173,9 @@ int32_t walRollback(SWal *, int64_t ver);
...
@@ -172,6 +173,9 @@ int32_t walRollback(SWal *, int64_t ver);
int32_t
walBeginSnapshot
(
SWal
*
,
int64_t
ver
);
int32_t
walBeginSnapshot
(
SWal
*
,
int64_t
ver
);
int32_t
walEndSnapshot
(
SWal
*
);
int32_t
walEndSnapshot
(
SWal
*
);
int32_t
walRestoreFromSnapshot
(
SWal
*
,
int64_t
ver
);
int32_t
walRestoreFromSnapshot
(
SWal
*
,
int64_t
ver
);
// for tq
int32_t
walApplyVer
(
SWal
*
,
int64_t
ver
);
// int32_t walDataCorrupted(SWal*);
// int32_t walDataCorrupted(SWal*);
// read
// read
...
@@ -186,7 +190,6 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
...
@@ -186,7 +190,6 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
int32_t
walFetchHead
(
SWalReader
*
pRead
,
int64_t
ver
,
SWalCkHead
*
pHead
);
int32_t
walFetchHead
(
SWalReader
*
pRead
,
int64_t
ver
,
SWalCkHead
*
pHead
);
int32_t
walFetchBody
(
SWalReader
*
pRead
,
SWalCkHead
**
ppHead
);
int32_t
walFetchBody
(
SWalReader
*
pRead
,
SWalCkHead
**
ppHead
);
int32_t
walSkipFetchBody
(
SWalReader
*
pRead
,
const
SWalCkHead
*
pHead
);
int32_t
walSkipFetchBody
(
SWalReader
*
pRead
,
const
SWalCkHead
*
pHead
);
typedef
struct
{
typedef
struct
{
int64_t
refId
;
int64_t
refId
;
int64_t
ver
;
int64_t
ver
;
...
@@ -206,6 +209,7 @@ int64_t walGetFirstVer(SWal *);
...
@@ -206,6 +209,7 @@ int64_t walGetFirstVer(SWal *);
int64_t
walGetSnapshotVer
(
SWal
*
);
int64_t
walGetSnapshotVer
(
SWal
*
);
int64_t
walGetLastVer
(
SWal
*
);
int64_t
walGetLastVer
(
SWal
*
);
int64_t
walGetCommittedVer
(
SWal
*
);
int64_t
walGetCommittedVer
(
SWal
*
);
int64_t
walGetAppliedVer
(
SWal
*
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
af18697c
...
@@ -265,6 +265,10 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
...
@@ -265,6 +265,10 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
int64_t
consumerId
=
be64toh
(
pReq
->
consumerId
);
int64_t
consumerId
=
be64toh
(
pReq
->
consumerId
);
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMnode
,
consumerId
);
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMnode
,
consumerId
);
if
(
pConsumer
==
NULL
)
{
terrno
=
TSDB_CODE_MND_CONSUMER_NOT_EXIST
;
return
-
1
;
}
atomic_store_32
(
&
pConsumer
->
hbStatus
,
0
);
atomic_store_32
(
&
pConsumer
->
hbStatus
,
0
);
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
af18697c
...
@@ -237,6 +237,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
...
@@ -237,6 +237,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif
#endif
int
tqPushMsg
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
)
{
int
tqPushMsg
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
)
{
walApplyVer
(
pTq
->
pVnode
->
pWal
,
ver
);
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
taosHashGetSize
(
pTq
->
pStreamTasks
)
==
0
)
return
0
;
if
(
taosHashGetSize
(
pTq
->
pStreamTasks
)
==
0
)
return
0
;
...
@@ -253,4 +255,3 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
...
@@ -253,4 +255,3 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
return
0
;
return
0
;
}
}
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
af18697c
...
@@ -1455,7 +1455,6 @@ static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVer
...
@@ -1455,7 +1455,6 @@ static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVer
(
pBlock
->
minVersion
<=
pVerRange
->
maxVer
);
(
pBlock
->
minVersion
<=
pVerRange
->
maxVer
);
}
}
static
bool
doCheckforDatablockOverlap
(
STableBlockScanInfo
*
pBlockScanInfo
,
const
SBlock
*
pBlock
)
{
static
bool
doCheckforDatablockOverlap
(
STableBlockScanInfo
*
pBlockScanInfo
,
const
SBlock
*
pBlock
)
{
size_t
num
=
taosArrayGetSize
(
pBlockScanInfo
->
delSkyline
);
size_t
num
=
taosArrayGetSize
(
pBlockScanInfo
->
delSkyline
);
...
@@ -1507,7 +1506,7 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBl
...
@@ -1507,7 +1506,7 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBl
return
doCheckforDatablockOverlap
(
pBlockScanInfo
,
pBlock
);
return
doCheckforDatablockOverlap
(
pBlockScanInfo
,
pBlock
);
}
else
{
}
else
{
int32_t
index
=
pBlockScanInfo
->
fileDelIndex
;
int32_t
index
=
pBlockScanInfo
->
fileDelIndex
;
while
(
1
)
{
while
(
1
)
{
TSDBKEY
*
p
=
taosArrayGet
(
pBlockScanInfo
->
delSkyline
,
index
);
TSDBKEY
*
p
=
taosArrayGet
(
pBlockScanInfo
->
delSkyline
,
index
);
if
(
p
->
ts
>
pBlock
->
minKey
.
ts
&&
index
>
0
)
{
if
(
p
->
ts
>
pBlock
->
minKey
.
ts
&&
index
>
0
)
{
index
-=
1
;
index
-=
1
;
...
@@ -2808,7 +2807,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
...
@@ -2808,7 +2807,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
if
(
pCond
->
suid
!=
0
)
{
if
(
pCond
->
suid
!=
0
)
{
(
*
ppReader
)
->
pSchema
=
metaGetTbTSchema
((
*
ppReader
)
->
pTsdb
->
pVnode
->
pMeta
,
(
*
ppReader
)
->
suid
,
-
1
);
(
*
ppReader
)
->
pSchema
=
metaGetTbTSchema
((
*
ppReader
)
->
pTsdb
->
pVnode
->
pMeta
,
(
*
ppReader
)
->
suid
,
-
1
);
ASSERT
((
*
ppReader
)
->
pSchema
);
//
ASSERT((*ppReader)->pSchema);
}
else
if
(
taosArrayGetSize
(
pTableList
)
>
0
)
{
}
else
if
(
taosArrayGetSize
(
pTableList
)
>
0
)
{
STableKeyInfo
*
pKey
=
taosArrayGet
(
pTableList
,
0
);
STableKeyInfo
*
pKey
=
taosArrayGet
(
pTableList
,
0
);
(
*
ppReader
)
->
pSchema
=
metaGetTbTSchema
((
*
ppReader
)
->
pTsdb
->
pVnode
->
pMeta
,
pKey
->
uid
,
-
1
);
(
*
ppReader
)
->
pSchema
=
metaGetTbTSchema
((
*
ppReader
)
->
pTsdb
->
pVnode
->
pMeta
,
pKey
->
uid
,
-
1
);
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
af18697c
...
@@ -878,6 +878,8 @@ _exit:
...
@@ -878,6 +878,8 @@ _exit:
tdProcessRSmaSubmit
(
pVnode
->
pSma
,
pReq
,
STREAM_INPUT__DATA_SUBMIT
);
tdProcessRSmaSubmit
(
pVnode
->
pSma
,
pReq
,
STREAM_INPUT__DATA_SUBMIT
);
}
}
vDebug
(
"successful submit in vg %d version %ld"
,
pVnode
->
config
.
vgId
,
version
);
return
0
;
return
0
;
}
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
af18697c
...
@@ -660,6 +660,62 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag) {
...
@@ -660,6 +660,62 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag) {
taosMemoryFree
(
pBuf
);
taosMemoryFree
(
pBuf
);
}
}
typedef
int32_t
(
*
__compare_fn_t
)(
void
*
pKey
,
void
*
data
,
int32_t
index
);
int32_t
binarySearchCom
(
void
*
keyList
,
int
num
,
void
*
pKey
,
int
order
,
__compare_fn_t
comparefn
)
{
int
firstPos
=
0
,
lastPos
=
num
-
1
,
midPos
=
-
1
;
int
numOfRows
=
0
;
if
(
num
<=
0
)
return
-
1
;
if
(
order
==
TSDB_ORDER_DESC
)
{
// find the first position which is smaller or equal than the key
while
(
1
)
{
if
(
comparefn
(
pKey
,
keyList
,
lastPos
)
>=
0
)
return
lastPos
;
if
(
comparefn
(
pKey
,
keyList
,
firstPos
)
==
0
)
return
firstPos
;
if
(
comparefn
(
pKey
,
keyList
,
firstPos
)
<
0
)
return
firstPos
-
1
;
numOfRows
=
lastPos
-
firstPos
+
1
;
midPos
=
(
numOfRows
>>
1
)
+
firstPos
;
if
(
comparefn
(
pKey
,
keyList
,
midPos
)
<
0
)
{
lastPos
=
midPos
-
1
;
}
else
if
(
comparefn
(
pKey
,
keyList
,
midPos
)
>
0
)
{
firstPos
=
midPos
+
1
;
}
else
{
break
;
}
}
}
else
{
// find the first position which is bigger or equal than the key
while
(
1
)
{
if
(
comparefn
(
pKey
,
keyList
,
firstPos
)
<=
0
)
return
firstPos
;
if
(
comparefn
(
pKey
,
keyList
,
lastPos
)
==
0
)
return
lastPos
;
if
(
comparefn
(
pKey
,
keyList
,
lastPos
)
>
0
)
{
lastPos
=
lastPos
+
1
;
if
(
lastPos
>=
num
)
return
-
1
;
else
return
lastPos
;
}
numOfRows
=
lastPos
-
firstPos
+
1
;
midPos
=
(
numOfRows
>>
1
)
+
firstPos
;
if
(
comparefn
(
pKey
,
keyList
,
midPos
)
<
0
)
{
lastPos
=
midPos
-
1
;
}
else
if
(
comparefn
(
pKey
,
keyList
,
midPos
)
>
0
)
{
firstPos
=
midPos
+
1
;
}
else
{
break
;
}
}
}
return
midPos
;
}
typedef
int64_t
(
*
__get_value_fn_t
)(
void
*
data
,
int32_t
index
);
typedef
int64_t
(
*
__get_value_fn_t
)(
void
*
data
,
int32_t
index
);
int32_t
binarySearch
(
void
*
keyList
,
int
num
,
TSKEY
key
,
int
order
,
__get_value_fn_t
getValuefn
)
{
int32_t
binarySearch
(
void
*
keyList
,
int
num
,
TSKEY
key
,
int
order
,
__get_value_fn_t
getValuefn
)
{
...
@@ -722,14 +778,31 @@ int64_t getReskey(void* data, int32_t index) {
...
@@ -722,14 +778,31 @@ int64_t getReskey(void* data, int32_t index) {
return
*
(
int64_t
*
)
pos
->
key
;
return
*
(
int64_t
*
)
pos
->
key
;
}
}
int32_t
compareResKey
(
void
*
pKey
,
void
*
data
,
int32_t
index
)
{
SArray
*
res
=
(
SArray
*
)
data
;
SResKeyPos
*
pos
=
taosArrayGetP
(
res
,
index
);
SWinRes
*
pData
=
(
SWinRes
*
)
pKey
;
if
(
pData
->
ts
==
*
(
int64_t
*
)
pos
->
key
)
{
if
(
pData
->
groupId
>
pos
->
groupId
)
{
return
1
;
}
else
if
(
pData
->
groupId
<
pos
->
groupId
)
{
return
-
1
;
}
return
0
;
}
else
if
(
pData
->
ts
>
*
(
int64_t
*
)
pos
->
key
)
{
return
1
;
}
return
-
1
;
}
static
int32_t
saveResult
(
int64_t
ts
,
int32_t
pageId
,
int32_t
offset
,
uint64_t
groupId
,
SArray
*
pUpdated
)
{
static
int32_t
saveResult
(
int64_t
ts
,
int32_t
pageId
,
int32_t
offset
,
uint64_t
groupId
,
SArray
*
pUpdated
)
{
int32_t
size
=
taosArrayGetSize
(
pUpdated
);
int32_t
size
=
taosArrayGetSize
(
pUpdated
);
int32_t
index
=
binarySearch
(
pUpdated
,
size
,
ts
,
TSDB_ORDER_DESC
,
getReskey
);
SWinRes
data
=
{.
ts
=
ts
,
.
groupId
=
groupId
};
int32_t
index
=
binarySearchCom
(
pUpdated
,
size
,
&
data
,
TSDB_ORDER_DESC
,
compareResKey
);
if
(
index
==
-
1
)
{
if
(
index
==
-
1
)
{
index
=
0
;
index
=
0
;
}
else
{
}
else
{
TSKEY
resTs
=
getReskey
(
pUpdated
,
index
);
if
(
compareResKey
(
&
data
,
pUpdated
,
index
)
>
0
)
{
if
(
resTs
<
ts
)
{
index
++
;
index
++
;
}
else
{
}
else
{
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -753,10 +826,10 @@ static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpda
...
@@ -753,10 +826,10 @@ static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpda
return
saveResult
(
result
->
win
.
skey
,
result
->
pageId
,
result
->
offset
,
groupId
,
pUpdated
);
return
saveResult
(
result
->
win
.
skey
,
result
->
pageId
,
result
->
offset
,
groupId
,
pUpdated
);
}
}
static
void
removeResult
(
SArray
*
pUpdated
,
TSKEY
k
ey
)
{
static
void
removeResult
(
SArray
*
pUpdated
,
SWinRes
*
pK
ey
)
{
int32_t
size
=
taosArrayGetSize
(
pUpdated
);
int32_t
size
=
taosArrayGetSize
(
pUpdated
);
int32_t
index
=
binarySearch
(
pUpdated
,
size
,
key
,
TSDB_ORDER_DESC
,
getResk
ey
);
int32_t
index
=
binarySearch
Com
(
pUpdated
,
size
,
pKey
,
TSDB_ORDER_DESC
,
compareResK
ey
);
if
(
index
>=
0
&&
key
==
getReskey
(
pUpdated
,
index
))
{
if
(
index
>=
0
&&
0
==
compareResKey
(
pKey
,
pUpdated
,
index
))
{
taosArrayRemove
(
pUpdated
,
index
);
taosArrayRemove
(
pUpdated
,
index
);
}
}
}
}
...
@@ -765,7 +838,7 @@ static void removeResults(SArray* pWins, SArray* pUpdated) {
...
@@ -765,7 +838,7 @@ static void removeResults(SArray* pWins, SArray* pUpdated) {
int32_t
size
=
taosArrayGetSize
(
pWins
);
int32_t
size
=
taosArrayGetSize
(
pWins
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SWinRes
*
pW
=
taosArrayGet
(
pWins
,
i
);
SWinRes
*
pW
=
taosArrayGet
(
pWins
,
i
);
removeResult
(
pUpdated
,
pW
->
ts
);
removeResult
(
pUpdated
,
pW
);
}
}
}
}
...
@@ -775,14 +848,30 @@ int64_t getWinReskey(void* data, int32_t index) {
...
@@ -775,14 +848,30 @@ int64_t getWinReskey(void* data, int32_t index) {
return
pos
->
ts
;
return
pos
->
ts
;
}
}
int32_t
compareWinRes
(
void
*
pKey
,
void
*
data
,
int32_t
index
)
{
SArray
*
res
=
(
SArray
*
)
data
;
SWinRes
*
pos
=
taosArrayGetP
(
res
,
index
);
SResKeyPos
*
pData
=
(
SResKeyPos
*
)
pKey
;
if
(
*
(
int64_t
*
)
pData
->
key
==
pos
->
ts
)
{
if
(
pData
->
groupId
>
pos
->
groupId
)
{
return
1
;
}
else
if
(
pData
->
groupId
<
pos
->
groupId
)
{
return
-
1
;
}
return
0
;
}
else
if
(
*
(
int64_t
*
)
pData
->
key
>
pos
->
ts
)
{
return
1
;
}
return
-
1
;
}
static
void
removeDeleteResults
(
SArray
*
pUpdated
,
SArray
*
pDelWins
)
{
static
void
removeDeleteResults
(
SArray
*
pUpdated
,
SArray
*
pDelWins
)
{
int32_t
upSize
=
taosArrayGetSize
(
pUpdated
);
int32_t
upSize
=
taosArrayGetSize
(
pUpdated
);
int32_t
delSize
=
taosArrayGetSize
(
pDelWins
);
int32_t
delSize
=
taosArrayGetSize
(
pDelWins
);
for
(
int32_t
i
=
0
;
i
<
upSize
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
upSize
;
i
++
)
{
SResKeyPos
*
pResKey
=
taosArrayGetP
(
pUpdated
,
i
);
SResKeyPos
*
pResKey
=
taosArrayGetP
(
pUpdated
,
i
);
int64_t
key
=
*
(
int64_t
*
)
pResKey
->
key
;
int32_t
index
=
binarySearchCom
(
pDelWins
,
delSize
,
pResKey
,
TSDB_ORDER_DESC
,
compareWinRes
);
int32_t
index
=
binarySearch
(
pDelWins
,
delSize
,
key
,
TSDB_ORDER_DESC
,
getWinReskey
);
if
(
index
>=
0
&&
0
==
compareWinRes
(
pResKey
,
pDelWins
,
index
))
{
if
(
index
>=
0
&&
key
==
getWinReskey
(
pDelWins
,
index
))
{
taosArrayRemove
(
pDelWins
,
index
);
taosArrayRemove
(
pDelWins
,
index
);
}
}
}
}
...
@@ -924,11 +1013,17 @@ SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SRe
...
@@ -924,11 +1013,17 @@ SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SRe
int64_t
*
extractTsCol
(
SSDataBlock
*
pBlock
,
const
SIntervalAggOperatorInfo
*
pInfo
)
{
int64_t
*
extractTsCol
(
SSDataBlock
*
pBlock
,
const
SIntervalAggOperatorInfo
*
pInfo
)
{
TSKEY
*
tsCols
=
NULL
;
TSKEY
*
tsCols
=
NULL
;
if
(
pBlock
->
pDataBlock
!=
NULL
)
{
if
(
pBlock
->
pDataBlock
!=
NULL
)
{
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
SColumnInfoData
*
pColDataInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
tsCols
=
(
int64_t
*
)
pColDataInfo
->
pData
;
if
(
tsCols
!=
NULL
)
{
// no data in primary ts
if
(
tsCols
[
0
]
==
0
&&
tsCols
[
pBlock
->
info
.
rows
-
1
]
==
0
)
{
return
NULL
;
}
if
(
tsCols
[
0
]
!=
0
&&
(
pBlock
->
info
.
window
.
skey
==
0
&&
pBlock
->
info
.
window
.
ekey
==
0
))
{
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primaryTsIndex
);
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primaryTsIndex
);
}
}
}
}
...
...
source/libs/function/src/builtins.c
浏览文件 @
af18697c
...
@@ -1425,6 +1425,17 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
...
@@ -1425,6 +1425,17 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
}
}
static
int32_t
translateFirstLast
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
static
int32_t
translateFirstLast
(
SFunctionNode
*
pFunc
,
char
*
pErrBuf
,
int32_t
len
)
{
// forbid null as first/last input, since first(c0, null, 1) may have different number of input
int32_t
numOfParams
=
LIST_LENGTH
(
pFunc
->
pParameterList
);
for
(
int32_t
i
=
0
;
i
<
numOfParams
;
++
i
)
{
uint8_t
nodeType
=
nodeType
(
nodesListGetNode
(
pFunc
->
pParameterList
,
i
));
uint8_t
paraType
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
i
))
->
resType
.
type
;
if
(
IS_NULL_TYPE
(
paraType
)
&&
QUERY_NODE_VALUE
==
nodeType
)
{
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
}
pFunc
->
node
.
resType
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
))
->
resType
;
pFunc
->
node
.
resType
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
0
))
->
resType
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -1435,6 +1446,15 @@ static int32_t translateFirstLastImpl(SFunctionNode* pFunc, char* pErrBuf, int32
...
@@ -1435,6 +1446,15 @@ static int32_t translateFirstLastImpl(SFunctionNode* pFunc, char* pErrBuf, int32
uint8_t
paraType
=
((
SExprNode
*
)
pPara
)
->
resType
.
type
;
uint8_t
paraType
=
((
SExprNode
*
)
pPara
)
->
resType
.
type
;
int32_t
paraBytes
=
((
SExprNode
*
)
pPara
)
->
resType
.
bytes
;
int32_t
paraBytes
=
((
SExprNode
*
)
pPara
)
->
resType
.
bytes
;
if
(
isPartial
)
{
if
(
isPartial
)
{
int32_t
numOfParams
=
LIST_LENGTH
(
pFunc
->
pParameterList
);
for
(
int32_t
i
=
0
;
i
<
numOfParams
;
++
i
)
{
uint8_t
nodeType
=
nodeType
(
nodesListGetNode
(
pFunc
->
pParameterList
,
i
));
uint8_t
pType
=
((
SExprNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
i
))
->
resType
.
type
;
if
(
IS_NULL_TYPE
(
pType
)
&&
QUERY_NODE_VALUE
==
nodeType
)
{
return
invaildFuncParaTypeErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
}
pFunc
->
node
.
resType
=
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
getFirstLastInfoSize
(
paraBytes
)
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
};
(
SDataType
){.
bytes
=
getFirstLastInfoSize
(
paraBytes
)
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
};
}
else
{
}
else
{
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
af18697c
...
@@ -33,6 +33,8 @@ int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
...
@@ -33,6 +33,8 @@ int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
int64_t
FORCE_INLINE
walGetCommittedVer
(
SWal
*
pWal
)
{
return
pWal
->
vers
.
commitVer
;
}
int64_t
FORCE_INLINE
walGetCommittedVer
(
SWal
*
pWal
)
{
return
pWal
->
vers
.
commitVer
;
}
int64_t
FORCE_INLINE
walGetAppliedVer
(
SWal
*
pWal
)
{
return
pWal
->
vers
.
appliedVer
;
}
static
FORCE_INLINE
int
walBuildMetaName
(
SWal
*
pWal
,
int
metaVer
,
char
*
buf
)
{
static
FORCE_INLINE
int
walBuildMetaName
(
SWal
*
pWal
,
int
metaVer
,
char
*
buf
)
{
return
sprintf
(
buf
,
"%s/meta-ver%d"
,
pWal
->
path
,
metaVer
);
return
sprintf
(
buf
,
"%s/meta-ver%d"
,
pWal
->
path
,
metaVer
);
}
}
...
...
source/libs/wal/src/walRead.c
浏览文件 @
af18697c
...
@@ -66,9 +66,15 @@ void walCloseReader(SWalReader *pRead) {
...
@@ -66,9 +66,15 @@ void walCloseReader(SWalReader *pRead) {
}
}
int32_t
walNextValidMsg
(
SWalReader
*
pRead
)
{
int32_t
walNextValidMsg
(
SWalReader
*
pRead
)
{
wDebug
(
"vgId:%d wal start to fetch"
,
pRead
->
pWal
->
cfg
.
vgId
);
int64_t
fetchVer
=
pRead
->
curVersion
;
int64_t
fetchVer
=
pRead
->
curVersion
;
int64_t
endVer
=
pRead
->
cond
.
scanUncommited
?
walGetLastVer
(
pRead
->
pWal
)
:
walGetCommittedVer
(
pRead
->
pWal
);
int64_t
lastVer
=
walGetLastVer
(
pRead
->
pWal
);
int64_t
committedVer
=
walGetCommittedVer
(
pRead
->
pWal
);
int64_t
appliedVer
=
walGetAppliedVer
(
pRead
->
pWal
);
int64_t
endVer
=
pRead
->
cond
.
scanUncommited
?
lastVer
:
committedVer
;
endVer
=
TMIN
(
appliedVer
,
endVer
);
wDebug
(
"vgId:%d wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld"
,
pRead
->
pWal
->
cfg
.
vgId
,
fetchVer
,
lastVer
,
committedVer
,
appliedVer
,
endVer
);
while
(
fetchVer
<=
endVer
)
{
while
(
fetchVer
<=
endVer
)
{
if
(
walFetchHeadNew
(
pRead
,
fetchVer
)
<
0
)
{
if
(
walFetchHeadNew
(
pRead
,
fetchVer
)
<
0
)
{
return
-
1
;
return
-
1
;
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
af18697c
...
@@ -64,6 +64,12 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
...
@@ -64,6 +64,12 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
return
0
;
return
0
;
}
}
int32_t
walApplyVer
(
SWal
*
pWal
,
int64_t
ver
)
{
// TODO: error check
pWal
->
vers
.
appliedVer
=
ver
;
return
0
;
}
int32_t
walCommit
(
SWal
*
pWal
,
int64_t
ver
)
{
int32_t
walCommit
(
SWal
*
pWal
,
int64_t
ver
)
{
ASSERT
(
pWal
->
vers
.
commitVer
>=
pWal
->
vers
.
snapshotVer
);
ASSERT
(
pWal
->
vers
.
commitVer
>=
pWal
->
vers
.
snapshotVer
);
ASSERT
(
pWal
->
vers
.
commitVer
<=
pWal
->
vers
.
lastVer
);
ASSERT
(
pWal
->
vers
.
commitVer
<=
pWal
->
vers
.
lastVer
);
...
...
tests/script/tsim/stream/basic1.sim
浏览文件 @
af18697c
...
@@ -462,6 +462,113 @@ if $data25 != 3 then
...
@@ -462,6 +462,113 @@ if $data25 != 3 then
return -1
return -1
endi
endi
sql create database test2 vgroups 1
sql show databases
sql use test2
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(2,2,2);
sql create table t4 using st tags(2,2,2);
sql create table t5 using st tags(2,2,2);
sql create stream streams2 trigger at_once into streamt as select _wstart, count(*) c1, sum(a) c3,max(b) c4 from st partition by tbname interval(10s)
sql insert into t1 values(1648791213000,1,1,1,1.0) t2 values(1648791213000,2,2,2,2.0) t3 values(1648791213000,3,3,3,3.0) t4 values(1648791213000,4,4,4,4.0);
$loop_count = 0
loop0:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt;
if $rows != 4 then
print =====rows=$rows
goto loop0
endi
sql insert into t1 values(1648791213000,5,5,5,5.0) t2 values(1648791213000,6,6,6,6.0) t5 values(1648791213000,7,7,7,7.0);
$loop_count = 0
loop1:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt order by c4 desc;
if $rows != 5 then
print =====rows=$rows
goto loop1
endi
# row 0
if $data01 != 1 then
print =====data01=$data01
goto loop1
endi
if $data02 != 7 then
print =====data02=$data02
goto loop1
endi
# row 1
if $data11 != 1 then
print =====data11=$data11
goto loop1
endi
if $data12 != 6 then
print =====data12=$data12
goto loop1
endi
# row 2
if $data21 != 1 then
print =====data21=$data21
goto loop1
endi
if $data22 != 5 then
print =====data22=$data22
goto loop1
endi
sql insert into t1 values(1648791213000,8,8,8,8.0);
$loop_count = 0
loop2:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt order by c4 desc;
# row 0
if $data01 != 1 then
print =====data01=$data01
goto loop2
endi
if $data02 != 8 then
print =====data02=$data02
goto loop2
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/script/tsim/stream/sliding.sim
浏览文件 @
af18697c
...
@@ -366,18 +366,21 @@ if $data32 != 8 then
...
@@ -366,18 +366,21 @@ if $data32 != 8 then
goto loop1
goto loop1
endi
endi
#$loop_all = 0
#looptest:
sql drop database IF EXISTS test2;
sql drop database IF EXISTS test2;
sql drop stream IF EXISTS streams21;
sql drop stream IF EXISTS streams21;
sql drop stream IF EXISTS streams22;
sql drop stream IF EXISTS streams22;
sql create database test2 vgroups
2
;
sql create database test2 vgroups
6
;
sql use test2;
sql use test2;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t2 using st tags(2,2,2);
sql create stream streams21 trigger at_once into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
sql create stream streams21 trigger at_once into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s
, 5s
);
sql create stream streams22 trigger at_once into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s);
sql create stream streams22 trigger at_once into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s
, 5s
);
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,1.1);
sql insert into t1 values(1648791223001,2,2,2,1.1);
...
@@ -394,7 +397,7 @@ sql insert into t2 values(1648791213004,4,10,10,4.1);
...
@@ -394,7 +397,7 @@ sql insert into t2 values(1648791213004,4,10,10,4.1);
$loop_count = 0
$loop_count = 0
loop2:
loop2:
sleep
3
00
sleep
1
00
$loop_count = $loop_count + 1
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 10 then
...
@@ -452,7 +455,7 @@ print step 6
...
@@ -452,7 +455,7 @@ print step 6
$loop_count = 0
$loop_count = 0
loop3:
loop3:
sleep 300
#
sleep 300
$loop_count = $loop_count + 1
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 10 then
...
@@ -464,7 +467,7 @@ sql select * from streamt2;
...
@@ -464,7 +467,7 @@ sql select * from streamt2;
# row 0
# row 0
if $data01 != 4 then
if $data01 != 4 then
print =====data01=$data01
print =====data01=$data01
#
goto loop3
goto loop3
endi
endi
if $data02 != 10 then
if $data02 != 10 then
...
@@ -505,4 +508,9 @@ if $data32 != 8 then
...
@@ -505,4 +508,9 @@ if $data32 != 8 then
goto loop3
goto loop3
endi
endi
$loop_all = $loop_all + 1
print ============loop_all=$loop_all
#goto looptest
system sh/stop_dnodes.sh
system sh/stop_dnodes.sh
\ No newline at end of file
tests/system-test/2-query/last_row.py
浏览文件 @
af18697c
...
@@ -221,7 +221,7 @@ class TDTestCase:
...
@@ -221,7 +221,7 @@ class TDTestCase:
tdSql
.
execute
(
"use testdb"
)
tdSql
.
execute
(
"use testdb"
)
# bug need fix
# bug need fix
tdSql
.
query
(
"select last_row(c1 ,NULL) from testdb.t1"
)
tdSql
.
error
(
"select last_row(c1 ,NULL) from testdb.t1"
)
error_sql_lists
=
[
error_sql_lists
=
[
"select last_row from testdb.t1"
,
"select last_row from testdb.t1"
,
...
...
tests/system-test/7-tmq/tmqAutoCreateTbl.py
浏览文件 @
af18697c
...
@@ -225,7 +225,7 @@ class TDTestCase:
...
@@ -225,7 +225,7 @@ class TDTestCase:
tdSql
.
prepare
()
tdSql
.
prepare
()
self
.
prepareTestEnv
()
self
.
prepareTestEnv
()
self
.
tmqCase1
()
self
.
tmqCase1
()
# self.tmqCase2() TD-17267
# self.tmqCase2()
#
TD-17267
def
stop
(
self
):
def
stop
(
self
):
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录