Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8356533e
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8356533e
编写于
5月 09, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'enh/standVer' into enh/rocksdbSstateMerge
上级
711c4756
d0e26d2e
变更
25
隐藏空白更改
内联
并排
Showing
25 changed file
with
406 addition
and
141 deletion
+406
-141
docs/en/12-taos-sql/24-show.md
docs/en/12-taos-sql/24-show.md
+1
-1
docs/examples/rust/nativeexample/examples/subscribe_demo.rs
docs/examples/rust/nativeexample/examples/subscribe_demo.rs
+1
-1
docs/zh/12-taos-sql/24-show.md
docs/zh/12-taos-sql/24-show.md
+1
-1
include/libs/wal/wal.h
include/libs/wal/wal.h
+1
-1
source/client/src/clientSml.c
source/client/src/clientSml.c
+3
-1
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+4
-4
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+29
-21
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+25
-32
source/dnode/vnode/src/tq/tqUtil.c
source/dnode/vnode/src/tq/tqUtil.c
+8
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+23
-6
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+1
-1
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+34
-1
source/libs/parser/test/parInitialCTest.cpp
source/libs/parser/test/parInitialCTest.cpp
+4
-0
source/libs/scalar/test/CMakeLists.txt
source/libs/scalar/test/CMakeLists.txt
+1
-1
source/libs/scalar/test/filter/filterTests.cpp
source/libs/scalar/test/filter/filterTests.cpp
+123
-0
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+5
-0
source/libs/sync/src/syncRespMgr.c
source/libs/sync/src/syncRespMgr.c
+2
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+15
-14
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+1
-1
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+11
-6
source/util/src/tcompare.c
source/util/src/tcompare.c
+42
-28
tests/pytest/auto_crash_gen.py
tests/pytest/auto_crash_gen.py
+23
-6
tests/pytest/auto_crash_gen_valgrind.py
tests/pytest/auto_crash_gen_valgrind.py
+24
-7
tests/pytest/auto_crash_gen_valgrind_cluster.py
tests/pytest/auto_crash_gen_valgrind_cluster.py
+23
-6
tests/system-test/win-test-file
tests/system-test/win-test-file
+1
-1
未找到文件。
docs/en/12-taos-sql/24-show.md
浏览文件 @
8356533e
...
...
@@ -189,7 +189,7 @@ show table distributed d0\G;
<summary>
Show Example
</summary>
<pre><code>
*************************** 1.row **************************
*
_block_
dist: Total_Blocks=[5] Total_Size=[93.65 K
b] Average_size=[18.73 Kb
] Compression_Ratio=[23.98 %]
_block_
dist: Total_Blocks=[5] Total_Size=[93.65 K
B] Average_size=[18.73 KB
] Compression_Ratio=[23.98 %]
Total_Blocks : Table
`d0`
contains total 5 blocks
...
...
docs/examples/rust/nativeexample/examples/subscribe_demo.rs
浏览文件 @
8356533e
...
...
@@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> {
taos
.exec_many
([
format!
(
"DROP TOPIC IF EXISTS tmq_meters"
),
format!
(
"DROP DATABASE IF EXISTS `{db}`"
),
format!
(
"CREATE DATABASE `{db}`"
),
format!
(
"CREATE DATABASE `{db}`
WAL_RETENTION_PERIOD 3600
"
),
format!
(
"USE `{db}`"
),
// create super table
format!
(
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"
),
...
...
docs/zh/12-taos-sql/24-show.md
浏览文件 @
8356533e
...
...
@@ -189,7 +189,7 @@ SHOW TABLE DISTRIBUTED table_name;
*************************** 1.row **************************
*
_block_
dist: Total_Blocks=[5] Total_Size=[93.65 K
b] Average_size=[18.73 Kb
] Compression_Ratio=[23.98 %]
_block_
dist: Total_Blocks=[5] Total_Size=[93.65 K
B] Average_size=[18.73 KB
] Compression_Ratio=[23.98 %]
Total_Blocks: 表 d0 占用的 block 个数为 5 个
...
...
include/libs/wal/wal.h
浏览文件 @
8356533e
...
...
@@ -132,7 +132,7 @@ typedef struct {
}
SWalRef
;
typedef
struct
{
int8_t
scanUncommited
;
//
int8_t scanUncommited;
int8_t
scanNotApplied
;
int8_t
scanMeta
;
int8_t
enableRef
;
...
...
source/client/src/clientSml.c
浏览文件 @
8356533e
...
...
@@ -1580,7 +1580,9 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
code
=
smlModifyDBSchemas
(
info
);
if
(
code
==
0
||
code
==
TSDB_CODE_SML_INVALID_DATA
||
code
==
TSDB_CODE_PAR_TOO_MANY_COLUMNS
||
code
==
TSDB_CODE_PAR_INVALID_TAGS_NUM
||
code
==
TSDB_CODE_PAR_INVALID_TAGS_LENGTH
||
code
==
TSDB_CODE_PAR_INVALID_ROW_LENGTH
)
break
;
||
code
==
TSDB_CODE_PAR_INVALID_ROW_LENGTH
||
code
==
TSDB_CODE_MND_FIELD_VALUE_OVERFLOW
)
{
break
;
}
taosMsleep
(
100
);
uInfo
(
"SML:0x%"
PRIx64
" smlModifyDBSchemas retry code:%s, times:%d"
,
info
->
id
,
tstrerror
(
code
),
retryNum
);
}
while
(
retryNum
++
<
taosHashGetSize
(
info
->
superTables
)
*
MAX_RETRY_TIMES
);
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
8356533e
...
...
@@ -932,7 +932,7 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
return
-
1
;
}
if
(
pDst
->
nextColId
<
0
&&
pDst
->
nextColId
>=
0x7fff
-
pDst
->
numOfColumns
-
pDst
->
numOfTags
){
if
(
pDst
->
nextColId
<
0
||
pDst
->
nextColId
>=
0x7fff
-
pDst
->
numOfColumns
-
pDst
->
numOfTags
){
terrno
=
TSDB_CODE_MND_FIELD_VALUE_OVERFLOW
;
return
-
1
;
}
...
...
@@ -1163,8 +1163,8 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
if
(
mndAllocStbSchemas
(
pOld
,
pNew
)
!=
0
)
{
return
-
1
;
}
if
(
pNew
->
nextColId
<
0
&&
pNew
->
nextColId
>=
0x7fff
-
ntags
){
if
(
pNew
->
nextColId
<
0
||
pNew
->
nextColId
>=
0x7fff
-
ntags
){
terrno
=
TSDB_CODE_MND_FIELD_VALUE_OVERFLOW
;
return
-
1
;
}
...
...
@@ -1476,7 +1476,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
return
-
1
;
}
if
(
pNew
->
nextColId
<
0
&&
pNew
->
nextColId
>=
0x7fff
-
ncols
){
if
(
pNew
->
nextColId
<
0
||
pNew
->
nextColId
>=
0x7fff
-
ncols
){
terrno
=
TSDB_CODE_MND_FIELD_VALUE_OVERFLOW
;
return
-
1
;
}
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
8356533e
...
...
@@ -133,10 +133,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
static
int32_t
mndPersistSubChangeVgReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqSubscribeObj
*
pSub
,
const
SMqRebOutputVg
*
pRebVg
)
{
if
(
pRebVg
->
oldConsumerId
==
pRebVg
->
newConsumerId
)
{
terrno
=
TSDB_CODE_MND_INVALID_SUB_OPTION
;
return
-
1
;
}
//
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
//
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
//
return -1;
//
}
void
*
buf
;
int32_t
tlen
;
...
...
@@ -269,6 +269,18 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
}
}
static
void
putNoTransferToOutput
(
SMqRebOutputObj
*
pOutput
,
SMqConsumerEp
*
pConsumerEp
){
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pConsumerEp
->
vgs
);
i
++
){
SMqVgEp
*
pVgEp
=
(
SMqVgEp
*
)
taosArrayGetP
(
pConsumerEp
->
vgs
,
i
);
SMqRebOutputVg
outputVg
=
{
.
oldConsumerId
=
pConsumerEp
->
consumerId
,
.
newConsumerId
=
pConsumerEp
->
consumerId
,
.
pVgEp
=
pVgEp
,
};
taosArrayPush
(
pOutput
->
rebVgs
,
&
outputVg
);
}
}
static
void
transferVgroupsForConsumers
(
SMqRebOutputObj
*
pOutput
,
SHashObj
*
pHash
,
int32_t
minVgCnt
,
int32_t
imbConsumerNum
)
{
const
char
*
pSubKey
=
pOutput
->
pSub
->
key
;
...
...
@@ -290,24 +302,19 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
taosArrayPush
(
pOutput
->
modifyConsumers
,
&
pConsumerEp
->
consumerId
);
if
(
consumerVgNum
>
minVgCnt
)
{
if
(
imbCnt
<
imbConsumerNum
)
{
if
(
consumerVgNum
==
minVgCnt
+
1
)
{
imbCnt
++
;
continue
;
}
else
{
// pop until equal minVg + 1
while
(
taosArrayGetSize
(
pConsumerEp
->
vgs
)
>
minVgCnt
+
1
)
{
SMqVgEp
*
pVgEp
=
*
(
SMqVgEp
**
)
taosArrayPop
(
pConsumerEp
->
vgs
);
SMqRebOutputVg
outputVg
=
{
.
oldConsumerId
=
pConsumerEp
->
consumerId
,
.
newConsumerId
=
-
1
,
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"sub:%s mq rebalance remove vgId:%d from consumer:0x%"
PRIx64
",(first scan)"
,
pSubKey
,
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
}
imbCnt
++
;
// pop until equal minVg + 1
while
(
taosArrayGetSize
(
pConsumerEp
->
vgs
)
>
minVgCnt
+
1
)
{
SMqVgEp
*
pVgEp
=
*
(
SMqVgEp
**
)
taosArrayPop
(
pConsumerEp
->
vgs
);
SMqRebOutputVg
outputVg
=
{
.
oldConsumerId
=
pConsumerEp
->
consumerId
,
.
newConsumerId
=
-
1
,
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"sub:%s mq rebalance remove vgId:%d from consumer:0x%"
PRIx64
",(first scan)"
,
pSubKey
,
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
}
imbCnt
++
;
}
else
{
// all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg
while
(
taosArrayGetSize
(
pConsumerEp
->
vgs
)
>
minVgCnt
)
{
...
...
@@ -323,6 +330,7 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
}
}
}
putNoTransferToOutput
(
pOutput
,
pConsumerEp
);
}
}
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
8356533e
...
...
@@ -445,6 +445,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t
}
int32_t
tqProcessSubscribeReq
(
STQ
*
pTq
,
int64_t
sversion
,
char
*
msg
,
int32_t
msgLen
)
{
int
ret
=
0
;
SMqRebVgReq
req
=
{
0
};
tDecodeSMqRebVgReq
(
msg
,
&
req
);
...
...
@@ -463,8 +464,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if
(
req
.
newConsumerId
==
-
1
)
{
tqError
(
"vgId:%d, tq invalid re-balance request, new consumerId %"
PRId64
""
,
req
.
vgId
,
req
.
newConsumerId
);
taosMemoryFree
(
req
.
qmsg
);
return
0
;
goto
end
;
}
STqHandle
tqHandle
=
{
0
};
...
...
@@ -481,8 +481,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
// TODO version should be assigned and refed during preprocess
SWalRef
*
pRef
=
walRefCommittedVer
(
pVnode
->
pWal
);
if
(
pRef
==
NULL
)
{
taosMemoryFree
(
req
.
qmsg
)
;
return
-
1
;
ret
=
-
1
;
goto
end
;
}
int64_t
ver
=
pRef
->
refVer
;
...
...
@@ -534,49 +534,42 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
taosHashPut
(
pTq
->
pHandle
,
req
.
subKey
,
strlen
(
req
.
subKey
),
pHandle
,
sizeof
(
STqHandle
));
tqDebug
(
"try to persist handle %s consumer:0x%"
PRIx64
" , old consumer:0x%"
PRIx64
,
req
.
subKey
,
pHandle
->
consumerId
,
oldConsumerId
);
if
(
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
)
<
0
)
{
taosMemoryFree
(
req
.
qmsg
);
return
-
1
;
}
ret
=
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
);
goto
end
;
}
else
{
if
(
pHandle
->
consumerId
==
req
.
newConsumerId
)
{
// do nothing
tqInfo
(
"vgId:%d consumer:0x%"
PRIx64
" remains, no switch occurs"
,
req
.
vgId
,
req
.
newConsumerId
);
atomic_store_32
(
&
pHandle
->
epoch
,
-
1
);
atomic_add_fetch_32
(
&
pHandle
->
epoch
,
1
);
taosMemoryFree
(
req
.
qmsg
);
return
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
);
}
else
{
tqInfo
(
"vgId:%d switch consumer from Id:0x%"
PRIx64
" to Id:0x%"
PRIx64
,
req
.
vgId
,
pHandle
->
consumerId
,
req
.
newConsumerId
);
// kill executing task
qTaskInfo_t
pTaskInfo
=
pHandle
->
execHandle
.
task
;
if
(
pTaskInfo
!=
NULL
)
{
qKillTask
(
pTaskInfo
,
TSDB_CODE_SUCCESS
);
}
taosWLockLatch
(
&
pTq
->
lock
);
atomic_store_64
(
&
pHandle
->
consumerId
,
req
.
newConsumerId
);
atomic_store_32
(
&
pHandle
->
epoch
,
0
);
}
// kill executing task
qTaskInfo_t
pTaskInfo
=
pHandle
->
execHandle
.
task
;
if
(
pTaskInfo
!=
NULL
)
{
qKillTask
(
pTaskInfo
,
TSDB_CODE_SUCCESS
);
}
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle
(
pTq
,
pHandle
);
atomic_store_64
(
&
pHandle
->
consumerId
,
req
.
newConsumerId
);
taosWLockLatch
(
&
pTq
->
lock
);
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle
(
pTq
,
pHandle
);
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
qStreamCloseTsdbReader
(
pTaskInfo
);
}
taosWUnLockLatch
(
&
pTq
->
lock
);
if
(
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
)
<
0
)
{
taosMemoryFree
(
req
.
qmsg
);
return
-
1
;
}
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
qStreamCloseTsdbReader
(
pTaskInfo
);
}
taosWUnLockLatch
(
&
pTq
->
lock
);
ret
=
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
);
goto
end
;
}
end:
taosMemoryFree
(
req
.
qmsg
);
return
0
;
return
ret
;
}
int32_t
tqExpandTask
(
STQ
*
pTq
,
SStreamTask
*
pTask
,
int64_t
ver
)
{
...
...
source/dnode/vnode/src/tq/tqUtil.c
浏览文件 @
8356533e
...
...
@@ -165,12 +165,19 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
SRpcMsg
*
pMsg
,
STqOffsetVal
*
pOffset
)
{
uint64_t
consumerId
=
pRequest
->
consumerId
;
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
int
code
=
0
;
SMqDataRsp
dataRsp
=
{
0
};
tqInitDataRsp
(
&
dataRsp
,
pRequest
,
pHandle
->
execHandle
.
subType
);
qTaskInfo_t
task
=
pHandle
->
execHandle
.
task
;
if
(
qTaskIsExecuting
(
task
)){
code
=
tqSendDataRsp
(
pTq
,
pMsg
,
pRequest
,
&
dataRsp
,
TMQ_MSG_TYPE__POLL_RSP
);
tDeleteSMqDataRsp
(
&
dataRsp
);
return
code
;
}
qSetTaskId
(
pHandle
->
execHandle
.
task
,
consumerId
,
pRequest
->
reqId
);
int
code
=
tqScanData
(
pTq
,
pHandle
,
&
dataRsp
,
pOffset
);
code
=
tqScanData
(
pTq
,
pHandle
,
&
dataRsp
,
pOffset
);
if
(
code
!=
0
)
{
goto
end
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
8356533e
...
...
@@ -4855,7 +4855,11 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) {
qTrace
(
"tsdb/read: %p, take read mutex, code: %d"
,
pReader
,
code
);
if
(
pReader
->
flag
==
READER_STATUS_SUSPEND
)
{
tsdbReaderResume
(
pReader
);
code
=
tsdbReaderResume
(
pReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tsdbReleaseReader
(
pReader
);
return
code
;
}
}
if
(
pReader
->
innerReader
[
0
]
!=
NULL
&&
pReader
->
step
==
0
)
{
...
...
@@ -5133,11 +5137,17 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
}
int32_t
tsdbReaderReset
(
STsdbReader
*
pReader
,
SQueryTableDataCond
*
pCond
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
qTrace
(
"tsdb/reader-reset: %p, take read mutex"
,
pReader
);
tsdbAcquireReader
(
pReader
);
if
(
pReader
->
flag
==
READER_STATUS_SUSPEND
)
{
tsdbReaderResume
(
pReader
);
code
=
tsdbReaderResume
(
pReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tsdbReleaseReader
(
pReader
);
return
code
;
}
}
if
(
isEmptyQueryTimeWindow
(
&
pReader
->
window
)
||
pReader
->
pReadSnap
==
NULL
)
{
...
...
@@ -5172,8 +5182,6 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
int64_t
ts
=
asc
?
pReader
->
window
.
skey
-
1
:
pReader
->
window
.
ekey
+
1
;
resetAllDataBlockScanInfo
(
pStatus
->
pTableMap
,
ts
,
step
);
int32_t
code
=
0
;
// no data in files, let's try buffer in memory
if
(
pStatus
->
fileIter
.
numOfFiles
==
0
)
{
pStatus
->
loadFromFile
=
false
;
...
...
@@ -5218,7 +5226,11 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
// find the start data block in file
tsdbAcquireReader
(
pReader
);
if
(
pReader
->
flag
==
READER_STATUS_SUSPEND
)
{
tsdbReaderResume
(
pReader
);
code
=
tsdbReaderResume
(
pReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tsdbReleaseReader
(
pReader
);
return
code
;
}
}
SReaderStatus
*
pStatus
=
&
pReader
->
status
;
...
...
@@ -5286,12 +5298,17 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
}
int64_t
tsdbGetNumOfRowsInMemTable
(
STsdbReader
*
pReader
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
int64_t
rows
=
0
;
SReaderStatus
*
pStatus
=
&
pReader
->
status
;
tsdbAcquireReader
(
pReader
);
if
(
pReader
->
flag
==
READER_STATUS_SUSPEND
)
{
tsdbReaderResume
(
pReader
);
code
=
tsdbReaderResume
(
pReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tsdbReleaseReader
(
pReader
);
return
code
;
}
}
int32_t
iter
=
0
;
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
8356533e
...
...
@@ -5572,7 +5572,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
}
int32_t
len
=
sprintf
(
st
+
VARSTR_HEADER_SIZE
,
"Total_Blocks=[%d] Total_Size=[%.2f K
b] Average_size=[%.2f Kb
] Compression_Ratio=[%.2f %c]"
,
"Total_Blocks=[%d] Total_Size=[%.2f K
B] Average_size=[%.2f KB
] Compression_Ratio=[%.2f %c]"
,
pData
->
numOfBlocks
,
pData
->
totalSize
/
1024
.
0
,
averageSize
/
1024
.
0
,
compRatio
,
'%'
);
varDataSetLen
(
st
,
len
);
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
8356533e
...
...
@@ -6130,17 +6130,50 @@ static bool isEventWindowQuery(SSelectStmt* pSelect) {
return
NULL
!=
pSelect
->
pWindow
&&
QUERY_NODE_EVENT_WINDOW
==
nodeType
(
pSelect
->
pWindow
);
}
static
bool
hasJsonTypeProjection
(
SSelectStmt
*
pSelect
)
{
SNode
*
pProj
=
NULL
;
FOREACH
(
pProj
,
pSelect
->
pProjectionList
)
{
if
(
TSDB_DATA_TYPE_JSON
==
((
SExprNode
*
)
pProj
)
->
resType
.
type
)
{
return
true
;
}
}
return
false
;
}
static
EDealRes
hasColumnOrPseudoColumn
(
SNode
*
pNode
,
void
*
pContext
)
{
if
(
QUERY_NODE_COLUMN
==
nodeType
(
pNode
))
{
*
(
bool
*
)
pContext
=
true
;
return
DEAL_RES_END
;
}
if
(
QUERY_NODE_FUNCTION
==
nodeType
(
pNode
)
&&
fmIsPseudoColumnFunc
(((
SFunctionNode
*
)
pNode
)
->
funcId
))
{
*
(
bool
*
)
pContext
=
true
;
return
DEAL_RES_END
;
}
return
DEAL_RES_CONTINUE
;
}
static
int32_t
subtableExprHasColumnOrPseudoColumn
(
SNode
*
pNode
)
{
bool
hasColumn
=
false
;
nodesWalkExprPostOrder
(
pNode
,
hasColumnOrPseudoColumn
,
&
hasColumn
);
return
hasColumn
;
}
static
int32_t
checkStreamQuery
(
STranslateContext
*
pCxt
,
SCreateStreamStmt
*
pStmt
)
{
SSelectStmt
*
pSelect
=
(
SSelectStmt
*
)
pStmt
->
pQuery
;
if
(
TSDB_DATA_TYPE_TIMESTAMP
!=
((
SExprNode
*
)
nodesListGetNode
(
pSelect
->
pProjectionList
,
0
))
->
resType
.
type
||
!
pSelect
->
isTimeLineResult
||
crossTableWithoutAggOper
(
pSelect
)
||
NULL
!=
pSelect
->
pOrderByList
||
crossTableWithUdaf
(
pSelect
)
||
isEventWindowQuery
(
pSelect
))
{
crossTableWithUdaf
(
pSelect
)
||
isEventWindowQuery
(
pSelect
)
||
hasJsonTypeProjection
(
pSelect
)
)
{
return
generateSyntaxErrMsgExt
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_STREAM_QUERY
,
"Unsupported stream query"
);
}
if
(
NULL
!=
pSelect
->
pSubtable
&&
TSDB_DATA_TYPE_VARCHAR
!=
((
SExprNode
*
)
pSelect
->
pSubtable
)
->
resType
.
type
)
{
return
generateSyntaxErrMsgExt
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_STREAM_QUERY
,
"SUBTABLE expression must be of VARCHAR type"
);
}
if
(
NULL
!=
pSelect
->
pSubtable
&&
0
==
LIST_LENGTH
(
pSelect
->
pPartitionByList
)
&&
subtableExprHasColumnOrPseudoColumn
(
pSelect
->
pSubtable
))
{
return
generateSyntaxErrMsgExt
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_STREAM_QUERY
,
"SUBTABLE expression must not has column when no partition by clause"
);
}
if
(
NULL
==
pSelect
->
pWindow
&&
STREAM_TRIGGER_AT_ONCE
!=
pStmt
->
pOptions
->
triggerType
)
{
return
generateSyntaxErrMsgExt
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_STREAM_QUERY
,
"The trigger mode of non window query can only be AT_ONCE"
);
...
...
source/libs/parser/test/parInitialCTest.cpp
浏览文件 @
8356533e
...
...
@@ -920,6 +920,10 @@ TEST_F(ParserInitialCTest, createStreamSemanticCheck) {
run
(
"CREATE STREAM s1 INTO st1 AS SELECT PERCENTILE(c1, 30) FROM t1 INTERVAL(10S)"
,
TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC
);
run
(
"CREATE STREAM s2 INTO st1 AS SELECT ts, to_json('{c1:1}') FROM st1 PARTITION BY TBNAME"
,
TSDB_CODE_PAR_INVALID_STREAM_QUERY
);
run
(
"CREATE STREAM s3 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tbname)) "
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 INTERVAL(10S)"
,
TSDB_CODE_PAR_INVALID_STREAM_QUERY
);
}
/*
...
...
source/libs/scalar/test/CMakeLists.txt
浏览文件 @
8356533e
enable_testing
()
#
add_subdirectory(filter)
add_subdirectory
(
filter
)
add_subdirectory
(
scalar
)
source/libs/scalar/test/filter/filterTests.cpp
浏览文件 @
8356533e
...
...
@@ -33,6 +33,7 @@
#include "os.h"
#include "filter.h"
#include "filterInt.h"
#include "nodes.h"
#include "scalar.h"
#include "stub.h"
...
...
@@ -344,6 +345,7 @@ TEST(timerangeTest, greater_and_lower_not_strict) {
nodesDestroyNode
(
logicNode1
);
}
#if 0
TEST(columnTest, smallint_column_greater_double_value) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int16_t leftv[5] = {1, 2, 3, 4, 5};
...
...
@@ -1337,6 +1339,127 @@ TEST(scalarModelogicTest, diff_columns_or_and_or) {
nodesDestroyNode(logicNode1);
blockDataDestroy(src);
}
#endif
template
<
class
SignedT
,
class
UnsignedT
>
int32_t
compareSignedWithUnsigned
(
SignedT
l
,
UnsignedT
r
)
{
if
(
l
<
0
)
return
-
1
;
auto
l_uint64
=
static_cast
<
uint64_t
>
(
l
);
auto
r_uint64
=
static_cast
<
uint64_t
>
(
r
);
if
(
l_uint64
<
r_uint64
)
return
-
1
;
if
(
l_uint64
>
r_uint64
)
return
1
;
return
0
;
}
template
<
class
UnsignedT
,
class
SignedT
>
int32_t
compareUnsignedWithSigned
(
UnsignedT
l
,
SignedT
r
)
{
if
(
r
<
0
)
return
1
;
auto
l_uint64
=
static_cast
<
uint64_t
>
(
l
);
auto
r_uint64
=
static_cast
<
uint64_t
>
(
r
);
if
(
l_uint64
<
r_uint64
)
return
-
1
;
if
(
l_uint64
>
r_uint64
)
return
1
;
return
0
;
}
template
<
class
SignedT
,
class
UnsignedT
>
void
doCompareWithValueRange_SignedWithUnsigned
(
__compar_fn_t
fp
)
{
int32_t
signedMin
=
-
10
,
signedMax
=
10
;
int32_t
unsignedMin
=
0
,
unsignedMax
=
10
;
for
(
SignedT
l
=
signedMin
;
l
<=
signedMax
;
++
l
)
{
for
(
UnsignedT
r
=
unsignedMin
;
r
<=
unsignedMax
;
++
r
)
{
ASSERT_EQ
(
fp
(
&
l
,
&
r
),
compareSignedWithUnsigned
(
l
,
r
));
}
}
}
template
<
class
UnsignedT
,
class
SignedT
>
void
doCompareWithValueRange_UnsignedWithSigned
(
__compar_fn_t
fp
)
{
int32_t
signedMin
=
-
10
,
signedMax
=
10
;
int32_t
unsignedMin
=
0
,
unsignedMax
=
10
;
for
(
UnsignedT
l
=
unsignedMin
;
l
<=
unsignedMax
;
++
l
)
{
for
(
SignedT
r
=
signedMin
;
r
<=
signedMax
;
++
r
)
{
ASSERT_EQ
(
fp
(
&
l
,
&
r
),
compareUnsignedWithSigned
(
l
,
r
));
}
}
}
template
<
class
LType
>
void
doCompareWithValueRange_OnlyLeftType
(
__compar_fn_t
fp
,
int32_t
rType
)
{
switch
(
rType
)
{
case
TSDB_DATA_TYPE_UTINYINT
:
doCompareWithValueRange_SignedWithUnsigned
<
LType
,
uint8_t
>
(
fp
);
break
;
case
TSDB_DATA_TYPE_USMALLINT
:
doCompareWithValueRange_SignedWithUnsigned
<
LType
,
uint16_t
>
(
fp
);
break
;
case
TSDB_DATA_TYPE_UINT
:
doCompareWithValueRange_SignedWithUnsigned
<
LType
,
uint32_t
>
(
fp
);
break
;
case
TSDB_DATA_TYPE_UBIGINT
:
doCompareWithValueRange_SignedWithUnsigned
<
LType
,
uint64_t
>
(
fp
);
break
;
case
TSDB_DATA_TYPE_TINYINT
:
doCompareWithValueRange_UnsignedWithSigned
<
LType
,
int8_t
>
(
fp
);
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
doCompareWithValueRange_UnsignedWithSigned
<
LType
,
int16_t
>
(
fp
);
break
;
case
TSDB_DATA_TYPE_INT
:
doCompareWithValueRange_UnsignedWithSigned
<
LType
,
int32_t
>
(
fp
);
break
;
case
TSDB_DATA_TYPE_BIGINT
:
doCompareWithValueRange_UnsignedWithSigned
<
LType
,
int64_t
>
(
fp
);
break
;
default:
FAIL
();
}
}
void
doCompare
(
const
std
::
vector
<
int32_t
>
&
lTypes
,
const
std
::
vector
<
int32_t
>
&
rTypes
,
int32_t
oper
)
{
for
(
int
i
=
0
;
i
<
lTypes
.
size
();
++
i
)
{
for
(
int
j
=
0
;
j
<
rTypes
.
size
();
++
j
)
{
auto
fp
=
filterGetCompFuncEx
(
lTypes
[
i
],
rTypes
[
j
],
oper
);
switch
(
lTypes
[
i
])
{
case
TSDB_DATA_TYPE_TINYINT
:
doCompareWithValueRange_OnlyLeftType
<
int8_t
>
(
fp
,
rTypes
[
j
]);
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
doCompareWithValueRange_OnlyLeftType
<
int16_t
>
(
fp
,
rTypes
[
j
]);
break
;
case
TSDB_DATA_TYPE_INT
:
doCompareWithValueRange_OnlyLeftType
<
int32_t
>
(
fp
,
rTypes
[
j
]);
break
;
case
TSDB_DATA_TYPE_BIGINT
:
doCompareWithValueRange_OnlyLeftType
<
int64_t
>
(
fp
,
rTypes
[
j
]);
break
;
case
TSDB_DATA_TYPE_UTINYINT
:
doCompareWithValueRange_OnlyLeftType
<
uint8_t
>
(
fp
,
rTypes
[
j
]);
break
;
case
TSDB_DATA_TYPE_USMALLINT
:
doCompareWithValueRange_OnlyLeftType
<
uint16_t
>
(
fp
,
rTypes
[
j
]);
break
;
case
TSDB_DATA_TYPE_UINT
:
doCompareWithValueRange_OnlyLeftType
<
uint32_t
>
(
fp
,
rTypes
[
j
]);
break
;
case
TSDB_DATA_TYPE_UBIGINT
:
doCompareWithValueRange_OnlyLeftType
<
uint64_t
>
(
fp
,
rTypes
[
j
]);
break
;
default:
FAIL
();
}
}
}
}
TEST
(
dataCompareTest
,
signed_and_unsigned_int
)
{
std
::
vector
<
int32_t
>
lType
=
{
TSDB_DATA_TYPE_TINYINT
,
TSDB_DATA_TYPE_SMALLINT
,
TSDB_DATA_TYPE_INT
,
TSDB_DATA_TYPE_BIGINT
};
std
::
vector
<
int32_t
>
rType
=
{
TSDB_DATA_TYPE_UTINYINT
,
TSDB_DATA_TYPE_USMALLINT
,
TSDB_DATA_TYPE_UINT
,
TSDB_DATA_TYPE_UBIGINT
};
doCompare
(
lType
,
rType
,
OP_TYPE_GREATER_THAN
);
doCompare
(
rType
,
lType
,
OP_TYPE_GREATER_THAN
);
}
int
main
(
int
argc
,
char
**
argv
)
{
taosSeedRand
(
taosGetTimestampSec
());
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
8356533e
...
...
@@ -212,6 +212,11 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
return
-
1
;
}
if
(
streamMetaCommit
(
pMeta
)
<
0
)
{
tFreeStreamTask
(
pTask
);
return
-
1
;
}
taosHashPut
(
pMeta
->
pTasks
,
&
pTask
->
id
.
taskId
,
sizeof
(
int32_t
),
&
pTask
,
POINTER_BYTES
);
taosArrayPush
(
pMeta
->
pTaskList
,
&
pTask
->
id
.
taskId
);
return
0
;
...
...
source/libs/sync/src/syncRespMgr.c
浏览文件 @
8356533e
...
...
@@ -171,6 +171,8 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
}
void
syncRespCleanRsp
(
SSyncRespMgr
*
pObj
)
{
if
(
pObj
==
NULL
)
return
;
SSyncNode
*
pNode
=
pObj
->
data
;
sTrace
(
"vgId:%d, clean all resp"
,
pNode
->
vgId
);
...
...
source/libs/transport/src/transCli.c
浏览文件 @
8356533e
...
...
@@ -587,12 +587,12 @@ void* destroyConnPool(SCliThrd* pThrd) {
static
SCliConn
*
getConnFromPool
(
SCliThrd
*
pThrd
,
char
*
key
,
bool
*
exceed
)
{
void
*
pool
=
pThrd
->
pool
;
SConnList
*
plist
=
taosHashGet
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
));
SConnList
*
plist
=
taosHashGet
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
)
+
1
);
STrans
*
pTranInst
=
pThrd
->
pTransInst
;
if
(
plist
==
NULL
)
{
SConnList
list
=
{
0
};
taosHashPut
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
),
(
void
*
)
&
list
,
sizeof
(
list
));
plist
=
taosHashGet
(
pool
,
key
,
strlen
(
key
));
taosHashPut
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
)
+
1
,
(
void
*
)
&
list
,
sizeof
(
list
));
plist
=
taosHashGet
(
pool
,
key
,
strlen
(
key
)
+
1
);
SMsgList
*
nList
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgList
));
QUEUE_INIT
(
&
nList
->
msgQ
);
...
...
@@ -627,11 +627,11 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
static
SCliConn
*
getConnFromPool2
(
SCliThrd
*
pThrd
,
char
*
key
,
SCliMsg
**
pMsg
)
{
void
*
pool
=
pThrd
->
pool
;
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
SConnList
*
plist
=
taosHashGet
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
));
SConnList
*
plist
=
taosHashGet
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
)
+
1
);
if
(
plist
==
NULL
)
{
SConnList
list
=
{
0
};
taosHashPut
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
),
(
void
*
)
&
list
,
sizeof
(
list
));
plist
=
taosHashGet
(
pool
,
key
,
strlen
(
key
));
taosHashPut
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
)
+
1
,
(
void
*
)
&
list
,
sizeof
(
list
));
plist
=
taosHashGet
(
pool
,
key
,
strlen
(
key
)
+
1
);
SMsgList
*
nList
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgList
));
QUEUE_INIT
(
&
nList
->
msgQ
);
...
...
@@ -717,7 +717,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
cliDestroyConnMsgs
(
conn
,
false
);
if
(
conn
->
list
==
NULL
)
{
conn
->
list
=
taosHashGet
((
SHashObj
*
)
pool
,
conn
->
ip
,
strlen
(
conn
->
ip
));
conn
->
list
=
taosHashGet
((
SHashObj
*
)
pool
,
conn
->
ip
,
strlen
(
conn
->
ip
)
+
1
);
}
SConnList
*
pList
=
conn
->
list
;
...
...
@@ -822,7 +822,8 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
return
;
}
if
(
nread
<
0
)
{
tWarn
(
"%s conn %p read error:%s, ref:%d"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
uv_err_name
(
nread
),
T_REF_VAL_GET
(
conn
));
tDebug
(
"%s conn %p read error:%s, ref:%d"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
uv_err_name
(
nread
),
T_REF_VAL_GET
(
conn
));
conn
->
broken
=
true
;
cliHandleExcept
(
conn
);
}
...
...
@@ -875,8 +876,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
connList
->
list
->
numOfConn
--
;
connList
->
size
--
;
}
else
{
SConnList
*
connList
=
taosHashGet
((
SHashObj
*
)
pThrd
->
pool
,
conn
->
ip
,
strlen
(
conn
->
ip
));
connList
->
list
->
numOfConn
--
;
SConnList
*
connList
=
taosHashGet
((
SHashObj
*
)
pThrd
->
pool
,
conn
->
ip
,
strlen
(
conn
->
ip
)
+
1
);
if
(
connList
!=
NULL
)
connList
->
list
->
numOfConn
--
;
}
conn
->
list
=
NULL
;
pThrd
->
newConnCount
--
;
...
...
@@ -1269,7 +1270,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
if
(
pMsg
!=
NULL
&&
REQUEST_NO_RESP
(
&
pMsg
->
msg
)
&&
(
pTransInst
->
failFastFp
!=
NULL
&&
pTransInst
->
failFastFp
(
pMsg
->
msg
.
msgType
)))
{
SFailFastItem
*
item
=
taosHashGet
(
pThrd
->
failFastCache
,
pConn
->
ip
,
strlen
(
pConn
->
ip
));
SFailFastItem
*
item
=
taosHashGet
(
pThrd
->
failFastCache
,
pConn
->
ip
,
strlen
(
pConn
->
ip
)
+
1
);
int64_t
cTimestamp
=
taosGetTimestampMs
();
if
(
item
!=
NULL
)
{
int32_t
elapse
=
cTimestamp
-
item
->
timestamp
;
...
...
@@ -1281,7 +1282,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
}
}
else
{
SFailFastItem
item
=
{.
count
=
1
,
.
timestamp
=
cTimestamp
};
taosHashPut
(
pThrd
->
failFastCache
,
pConn
->
ip
,
strlen
(
pConn
->
ip
),
&
item
,
sizeof
(
SFailFastItem
));
taosHashPut
(
pThrd
->
failFastCache
,
pConn
->
ip
,
strlen
(
pConn
->
ip
)
+
1
,
&
item
,
sizeof
(
SFailFastItem
));
}
}
}
else
{
...
...
@@ -1459,7 +1460,7 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
}
static
FORCE_INLINE
uint32_t
cliGetIpFromFqdnCache
(
SHashObj
*
cache
,
char
*
fqdn
)
{
uint32_t
addr
=
0
;
uint32_t
*
v
=
taosHashGet
(
cache
,
fqdn
,
strlen
(
fqdn
));
uint32_t
*
v
=
taosHashGet
(
cache
,
fqdn
,
strlen
(
fqdn
)
+
1
);
if
(
v
==
NULL
)
{
addr
=
taosGetIpv4FromFqdn
(
fqdn
);
if
(
addr
==
0xffffffff
)
{
...
...
@@ -1468,7 +1469,7 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn)
return
addr
;
}
taosHashPut
(
cache
,
fqdn
,
strlen
(
fqdn
),
&
addr
,
sizeof
(
addr
));
taosHashPut
(
cache
,
fqdn
,
strlen
(
fqdn
)
+
1
,
&
addr
,
sizeof
(
addr
));
}
else
{
addr
=
*
v
;
}
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
8356533e
...
...
@@ -314,7 +314,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
return
;
}
t
Warn
(
"%s conn %p read error:%s"
,
transLabel
(
pTransInst
),
conn
,
uv_err_name
(
nread
));
t
Debug
(
"%s conn %p read error:%s"
,
transLabel
(
pTransInst
),
conn
,
uv_err_name
(
nread
));
if
(
nread
<
0
)
{
conn
->
broken
=
true
;
if
(
conn
->
status
==
ConnAcquire
)
{
...
...
source/libs/wal/src/walRead.c
浏览文件 @
8356533e
...
...
@@ -37,7 +37,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
if
(
cond
)
{
pReader
->
cond
=
*
cond
;
}
else
{
pReader
->
cond
.
scanUncommited
=
0
;
//
pReader->cond.scanUncommited = 0;
pReader
->
cond
.
scanNotApplied
=
0
;
pReader
->
cond
.
scanMeta
=
0
;
pReader
->
cond
.
enableRef
=
0
;
...
...
@@ -74,13 +74,18 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t
lastVer
=
walGetLastVer
(
pReader
->
pWal
);
int64_t
committedVer
=
walGetCommittedVer
(
pReader
->
pWal
);
int64_t
appliedVer
=
walGetAppliedVer
(
pReader
->
pWal
);
int64_t
endVer
=
pReader
->
cond
.
scanUncommited
?
lastVer
:
committedVer
;
endVer
=
TMIN
(
appliedVer
,
endVer
);
while
(
appliedVer
<
committedVer
){
// wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
wDebug
(
"vgId:%d, wal apply ver:%"
PRId64
" smaller than commit ver:%"
PRId64
", so sleep 1ms"
,
pReader
->
pWal
->
cfg
.
vgId
,
appliedVer
,
committedVer
);
taosMsleep
(
1
);
appliedVer
=
walGetAppliedVer
(
pReader
->
pWal
);
}
// int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
// endVer = TMIN(appliedVer, endVer);
wDebug
(
"vgId:%d, wal start to fetch, index:%"
PRId64
", last index:%"
PRId64
" commit index:%"
PRId64
", applied index:%"
PRId64
", end index:%"
PRId64
,
pReader
->
pWal
->
cfg
.
vgId
,
fetchVer
,
lastVer
,
committedVer
,
appliedVer
,
endVer
);
while
(
fetchVer
<=
en
dVer
)
{
", applied index:%"
PRId64
,
pReader
->
pWal
->
cfg
.
vgId
,
fetchVer
,
lastVer
,
committedVer
,
appliedVer
);
while
(
fetchVer
<=
committe
dVer
)
{
if
(
walFetchHeadNew
(
pReader
,
fetchVer
)
<
0
)
{
return
-
1
;
}
...
...
source/util/src/tcompare.c
浏览文件 @
8356533e
...
...
@@ -308,17 +308,19 @@ int32_t compareInt8Uint16(const void *pLeft, const void *pRight) {
int32_t
compareInt8Uint32
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
int8_t
left
=
GET_INT8_VAL
(
pLeft
);
if
(
left
<
0
)
return
-
1
;
uint32_t
right
=
GET_UINT32_VAL
(
pRight
);
if
(
left
>
right
)
return
1
;
if
(
left
<
right
)
return
-
1
;
if
(
(
uint32_t
)
left
>
right
)
return
1
;
if
(
(
uint32_t
)
left
<
right
)
return
-
1
;
return
0
;
}
int32_t
compareInt8Uint64
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
int8_t
left
=
GET_INT8_VAL
(
pLeft
);
if
(
left
<
0
)
return
-
1
;
uint64_t
right
=
GET_UINT64_VAL
(
pRight
);
if
(
left
>
right
)
return
1
;
if
(
left
<
right
)
return
-
1
;
if
(
(
uint64_t
)
left
>
right
)
return
1
;
if
(
(
uint64_t
)
left
<
right
)
return
-
1
;
return
0
;
}
...
...
@@ -380,17 +382,19 @@ int32_t compareInt16Uint16(const void *pLeft, const void *pRight) {
int32_t
compareInt16Uint32
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
int16_t
left
=
GET_INT16_VAL
(
pLeft
);
if
(
left
<
0
)
return
-
1
;
uint32_t
right
=
GET_UINT32_VAL
(
pRight
);
if
(
left
>
right
)
return
1
;
if
(
left
<
right
)
return
-
1
;
if
(
(
uint32_t
)
left
>
right
)
return
1
;
if
(
(
uint32_t
)
left
<
right
)
return
-
1
;
return
0
;
}
int32_t
compareInt16Uint64
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
int16_t
left
=
GET_INT16_VAL
(
pLeft
);
if
(
left
<
0
)
return
-
1
;
uint64_t
right
=
GET_UINT64_VAL
(
pRight
);
if
(
left
>
right
)
return
1
;
if
(
left
<
right
)
return
-
1
;
if
(
(
uint64_t
)
left
>
right
)
return
1
;
if
(
(
uint64_t
)
left
<
right
)
return
-
1
;
return
0
;
}
...
...
@@ -452,17 +456,19 @@ int32_t compareInt32Uint16(const void *pLeft, const void *pRight) {
int32_t
compareInt32Uint32
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
int32_t
left
=
GET_INT32_VAL
(
pLeft
);
if
(
left
<
0
)
return
-
1
;
uint32_t
right
=
GET_UINT32_VAL
(
pRight
);
if
(
left
>
right
)
return
1
;
if
(
left
<
right
)
return
-
1
;
if
(
(
uint32_t
)
left
>
right
)
return
1
;
if
(
(
uint32_t
)
left
<
right
)
return
-
1
;
return
0
;
}
int32_t
compareInt32Uint64
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
int32_t
left
=
GET_INT32_VAL
(
pLeft
);
if
(
left
<
0
)
return
-
1
;
uint64_t
right
=
GET_UINT64_VAL
(
pRight
);
if
(
left
>
right
)
return
1
;
if
(
left
<
right
)
return
-
1
;
if
(
(
uint64_t
)
left
>
right
)
return
1
;
if
(
(
uint64_t
)
left
<
right
)
return
-
1
;
return
0
;
}
...
...
@@ -532,9 +538,10 @@ int32_t compareInt64Uint32(const void *pLeft, const void *pRight) {
int32_t
compareInt64Uint64
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
int64_t
left
=
GET_INT64_VAL
(
pLeft
);
if
(
left
<
0
)
return
-
1
;
uint64_t
right
=
GET_UINT64_VAL
(
pRight
);
if
(
left
>
right
)
return
1
;
if
(
left
<
right
)
return
-
1
;
if
(
(
uint64_t
)
left
>
right
)
return
1
;
if
(
(
uint64_t
)
left
<
right
)
return
-
1
;
return
0
;
}
...
...
@@ -857,24 +864,27 @@ int32_t compareUint16Uint64(const void *pLeft, const void *pRight) {
int32_t
compareUint32Int8
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
uint32_t
left
=
GET_UINT32_VAL
(
pLeft
);
int8_t
right
=
GET_INT8_VAL
(
pRight
);
if
(
left
>
right
)
return
1
;
if
(
left
<
right
)
return
-
1
;
if
(
right
<
0
)
return
1
;
if
(
left
>
(
uint32_t
)
right
)
return
1
;
if
(
left
<
(
uint32_t
)
right
)
return
-
1
;
return
0
;
}
int32_t
compareUint32Int16
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
uint32_t
left
=
GET_UINT32_VAL
(
pLeft
);
int16_t
right
=
GET_INT16_VAL
(
pRight
);
if
(
left
>
right
)
return
1
;
if
(
left
<
right
)
return
-
1
;
if
(
right
<
0
)
return
1
;
if
(
left
>
(
uint32_t
)
right
)
return
1
;
if
(
left
<
(
uint32_t
)
right
)
return
-
1
;
return
0
;
}
int32_t
compareUint32Int32
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
uint32_t
left
=
GET_UINT32_VAL
(
pLeft
);
int32_t
right
=
GET_INT32_VAL
(
pRight
);
if
(
left
>
right
)
return
1
;
if
(
left
<
right
)
return
-
1
;
if
(
right
<
0
)
return
1
;
if
(
left
>
(
uint32_t
)
right
)
return
1
;
if
(
left
<
(
uint32_t
)
right
)
return
-
1
;
return
0
;
}
...
...
@@ -929,32 +939,36 @@ int32_t compareUint32Uint64(const void *pLeft, const void *pRight) {
int32_t
compareUint64Int8
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
uint64_t
left
=
GET_UINT64_VAL
(
pLeft
);
int8_t
right
=
GET_INT8_VAL
(
pRight
);
if
(
left
>
right
)
return
1
;
if
(
left
<
right
)
return
-
1
;
if
(
right
<
0
)
return
1
;
if
(
left
>
(
uint64_t
)
right
)
return
1
;
if
(
left
<
(
uint64_t
)
right
)
return
-
1
;
return
0
;
}
int32_t
compareUint64Int16
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
uint64_t
left
=
GET_UINT64_VAL
(
pLeft
);
int16_t
right
=
GET_INT16_VAL
(
pRight
);
if
(
left
>
right
)
return
1
;
if
(
left
<
right
)
return
-
1
;
if
(
right
<
0
)
return
1
;
if
(
left
>
(
uint64_t
)
right
)
return
1
;
if
(
left
<
(
uint64_t
)
right
)
return
-
1
;
return
0
;
}
int32_t
compareUint64Int32
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
uint64_t
left
=
GET_UINT64_VAL
(
pLeft
);
int32_t
right
=
GET_INT32_VAL
(
pRight
);
if
(
left
>
right
)
return
1
;
if
(
left
<
right
)
return
-
1
;
if
(
right
<
0
)
return
1
;
if
(
left
>
(
uint64_t
)
right
)
return
1
;
if
(
left
<
(
uint64_t
)
right
)
return
-
1
;
return
0
;
}
int32_t
compareUint64Int64
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
uint64_t
left
=
GET_UINT64_VAL
(
pLeft
);
int64_t
right
=
GET_INT64_VAL
(
pRight
);
if
(
left
>
right
)
return
1
;
if
(
left
<
right
)
return
-
1
;
if
(
right
<
0
)
return
1
;
if
(
left
>
(
uint64_t
)
right
)
return
1
;
if
(
left
<
(
uint64_t
)
right
)
return
-
1
;
return
0
;
}
...
...
tests/pytest/auto_crash_gen.py
浏览文件 @
8356533e
...
...
@@ -342,12 +342,29 @@ def main():
print
(
'======== crash_gen run sucess and exit as expected ========'
)
try
:
text
=
f
'''exit status:
{
msg_dict
[
status
]
}
git commit :
{
git_commit
}
hostname:
{
hostname
}
start time:
{
starttime
}
end time:
{
endtime
}
cmd:
{
crash_cmds
}
'''
cmd
=
crash_cmds
.
split
(
'&'
)[
2
]
if
status
==
0
:
log_dir
=
"none"
else
:
log_dir
=
"/root/pxiao/crash_gen_logs"
if
status
==
3
:
core_dir
=
"/root/pxiao/crash_gen_logs"
else
:
core_dir
=
"none"
text
=
f
'''
exit status:
{
msg_dict
[
status
]
}
test scope: crash_gen
owner: pxiao
hostname:
{
hostname
}
start time:
{
starttime
}
end time:
{
endtime
}
git commit :
{
git_commit
}
log dir:
{
log_dir
}
core dir:
{
core_dir
}
cmd:
{
cmd
}
'''
send_msg
(
get_msg
(
text
))
except
Exception
as
e
:
print
(
"exception:"
,
e
)
...
...
tests/pytest/auto_crash_gen_valgrind.py
浏览文件 @
8356533e
...
...
@@ -377,13 +377,30 @@ def main():
print
(
'======== crash_gen run sucess and exit as expected ========'
)
try
:
text
=
f
'''exit status:
{
msg_dict
[
status
]
}
git commit :
{
git_commit
}
hostname:
{
hostname
}
start time:
{
starttime
}
end time:
{
endtime
}
cmd:
{
crash_cmds
}
'''
send_msg
(
get_msg
(
text
))
cmd
=
crash_cmds
.
split
(
'&'
)[
2
]
if
status
==
0
:
log_dir
=
"none"
else
:
log_dir
=
"/root/pxiao/crash_gen_logs"
if
status
==
3
:
core_dir
=
"/root/pxiao/crash_gen_logs"
else
:
core_dir
=
"none"
text
=
f
'''
exit status:
{
msg_dict
[
status
]
}
test scope: crash_gen
owner: pxiao
hostname:
{
hostname
}
start time:
{
starttime
}
end time:
{
endtime
}
git commit :
{
git_commit
}
log dir:
{
log_dir
}
core dir:
{
core_dir
}
cmd:
{
cmd
}
'''
send_msg
(
get_msg
(
text
))
except
Exception
as
e
:
print
(
"exception:"
,
e
)
exit
(
status
)
...
...
tests/pytest/auto_crash_gen_valgrind_cluster.py
浏览文件 @
8356533e
...
...
@@ -377,12 +377,29 @@ def main():
print
(
'======== crash_gen run sucess and exit as expected ========'
)
try
:
text
=
f
'''exit status:
{
msg_dict
[
status
]
}
git commit :
{
git_commit
}
hostname:
{
hostname
}
start time:
{
starttime
}
end time:
{
endtime
}
cmd:
{
crash_cmds
}
'''
cmd
=
crash_cmds
.
split
(
'&'
)[
2
]
if
status
==
0
:
log_dir
=
"none"
else
:
log_dir
=
"/root/pxiao/crash_gen_logs"
if
status
==
3
:
core_dir
=
"/root/pxiao/crash_gen_logs"
else
:
core_dir
=
"none"
text
=
f
'''
exit status:
{
msg_dict
[
status
]
}
test scope: crash_gen
owner: pxiao
hostname:
{
hostname
}
start time:
{
starttime
}
end time:
{
endtime
}
git commit :
{
git_commit
}
log dir:
{
log_dir
}
core dir:
{
core_dir
}
cmd:
{
cmd
}
'''
send_msg
(
get_msg
(
text
))
except
Exception
as
e
:
print
(
"exception:"
,
e
)
...
...
tests/system-test/win-test-file
浏览文件 @
8356533e
...
...
@@ -279,7 +279,7 @@ python3 ./test.py -f 7-tmq/subscribeDb1.py
python3 ./test.py -f 7-tmq/subscribeDb2.py
python3 ./test.py -f 7-tmq/subscribeDb3.py
python3 ./test.py -f 7-tmq/subscribeDb4.py
#
python3 ./test.py -f 7-tmq/subscribeStb.py
python3 ./test.py -f 7-tmq/subscribeStb.py
python3 ./test.py -f 7-tmq/subscribeStb0.py
python3 ./test.py -f 7-tmq/subscribeStb1.py
python3 ./test.py -f 7-tmq/subscribeStb2.py
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录