Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8eb17a28
T
TDengine
项目概览
taosdata
/
TDengine
11 个月 前同步成功
通知
1179
Star
22014
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
8eb17a28
编写于
7月 18, 2022
作者:
G
Ganlin Zhao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into fix/TD-17400
上级
f57ad234
2453331a
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
320 addition
and
122 deletion
+320
-122
cmake/cmake.platform
cmake/cmake.platform
+9
-0
include/common/ttime.h
include/common/ttime.h
+6
-5
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+3
-6
source/common/src/ttime.c
source/common/src/ttime.c
+4
-4
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+0
-1
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+3
-2
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+3
-2
source/os/src/osTime.c
source/os/src/osTime.c
+76
-2
tests/system-test/7-tmq/stbTagFilter.py
tests/system-test/7-tmq/stbTagFilter.py
+203
-56
tests/test/c/tmqDemo.c
tests/test/c/tmqDemo.c
+2
-1
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+5
-15
tests/tsim/src/simExe.c
tests/tsim/src/simExe.c
+3
-14
tools/shell/src/shellEngine.c
tools/shell/src/shellEngine.c
+3
-14
未找到文件。
cmake/cmake.platform
浏览文件 @
8eb17a28
...
...
@@ -76,15 +76,19 @@ IF ("${CPUTYPE}" STREQUAL "")
IF (CMAKE_SYSTEM_PROCESSOR MATCHES "(amd64)|(AMD64)")
MESSAGE(STATUS "The current platform is amd64")
SET(PLATFORM_ARCH_STR "amd64")
SET(TD_INTEL_64 TRUE)
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "(x86)|(X86)")
MESSAGE(STATUS "The current platform is x86")
SET(PLATFORM_ARCH_STR "i386")
SET(TD_INTEL_32 TRUE)
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "armv7l")
MESSAGE(STATUS "The current platform is aarch32")
SET(PLATFORM_ARCH_STR "arm")
SET(TD_ARM_32 TRUE)
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64")
MESSAGE(STATUS "The current platform is aarch64")
SET(PLATFORM_ARCH_STR "arm64")
SET(TD_ARM_64 TRUE)
ENDIF ()
ELSE ()
# if generate ARM version:
...
...
@@ -92,18 +96,23 @@ ELSE ()
IF (${CPUTYPE} MATCHES "aarch32")
SET(PLATFORM_ARCH_STR "arm")
MESSAGE(STATUS "input cpuType: aarch32")
SET(TD_ARM_32 TRUE)
ELSEIF (${CPUTYPE} MATCHES "aarch64")
SET(PLATFORM_ARCH_STR "arm64")
MESSAGE(STATUS "input cpuType: aarch64")
SET(TD_ARM_64 TRUE)
ELSEIF (${CPUTYPE} MATCHES "mips64")
SET(PLATFORM_ARCH_STR "mips")
MESSAGE(STATUS "input cpuType: mips64")
SET(TD_MIPS_64 TRUE)
ELSEIF (${CPUTYPE} MATCHES "x64")
SET(PLATFORM_ARCH_STR "amd64")
MESSAGE(STATUS "input cpuType: x64")
SET(TD_INTEL_64 TRUE)
ELSEIF (${CPUTYPE} MATCHES "x86")
SET(PLATFORM_ARCH_STR "i386")
MESSAGE(STATUS "input cpuType: x86")
SET(TD_INTEL_32 TRUE)
ELSE ()
MESSAGE(STATUS "input cpuType unknown " ${CPUTYPE})
ENDIF ()
...
...
include/common/ttime.h
浏览文件 @
8eb17a28
...
...
@@ -63,12 +63,13 @@ static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) {
:
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
?
1000000
:
1000000000
;
time_t
t
=
taosTime
(
NULL
);
struct
tm
*
tm
=
taosLocalTime
(
&
t
,
NULL
);
tm
->
tm_hour
=
0
;
tm
->
tm_min
=
0
;
tm
->
tm_sec
=
0
;
struct
tm
tm
;
taosLocalTime
(
&
t
,
&
tm
);
tm
.
tm_hour
=
0
;
tm
.
tm_min
=
0
;
tm
.
tm_sec
=
0
;
return
(
int64_t
)
taosMktime
(
tm
)
*
factor
;
return
(
int64_t
)
taosMktime
(
&
tm
)
*
factor
;
}
int64_t
taosTimeAdd
(
int64_t
t
,
int64_t
duration
,
char
unit
,
int32_t
precision
);
...
...
source/common/src/tdatablock.c
浏览文件 @
8eb17a28
...
...
@@ -1661,9 +1661,6 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
}
*/
#ifdef WINDOWS
if
(
tt
<
0
)
tt
=
0
;
#endif
if
(
tt
<=
0
&&
ms
<
0
)
{
tt
--
;
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
...
...
@@ -1674,9 +1671,9 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
ms
+=
1000
;
}
}
struct
tm
*
ptm
=
taosLocalTime
(
&
tt
,
NULL
);
size_t
pos
=
strftime
(
buf
,
35
,
"%Y-%m-%d %H:%M:%S"
,
ptm
);
struct
tm
ptm
=
{
0
};
taosLocalTime
(
&
tt
,
&
ptm
);
size_t
pos
=
strftime
(
buf
,
35
,
"%Y-%m-%d %H:%M:%S"
,
&
ptm
);
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
sprintf
(
buf
+
pos
,
".%09d"
,
ms
);
...
...
source/common/src/ttime.c
浏览文件 @
8eb17a28
...
...
@@ -902,7 +902,7 @@ const char* fmtts(int64_t ts) {
void
taosFormatUtcTime
(
char
*
buf
,
int32_t
bufLen
,
int64_t
t
,
int32_t
precision
)
{
char
ts
[
40
]
=
{
0
};
struct
tm
*
ptm
;
struct
tm
ptm
;
int32_t
fractionLen
;
char
*
format
=
NULL
;
...
...
@@ -939,10 +939,10 @@ void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision)
assert
(
false
);
}
ptm
=
taosLocalTime
(
&
quot
,
NULL
);
int32_t
length
=
(
int32_t
)
strftime
(
ts
,
40
,
"%Y-%m-%dT%H:%M:%S"
,
ptm
);
taosLocalTime
(
&
quot
,
&
ptm
);
int32_t
length
=
(
int32_t
)
strftime
(
ts
,
40
,
"%Y-%m-%dT%H:%M:%S"
,
&
ptm
);
length
+=
snprintf
(
ts
+
length
,
fractionLen
,
format
,
mod
);
length
+=
(
int32_t
)
strftime
(
ts
+
length
,
40
-
length
,
"%z"
,
ptm
);
length
+=
(
int32_t
)
strftime
(
ts
+
length
,
40
-
length
,
"%z"
,
&
ptm
);
tstrncpy
(
buf
,
ts
,
bufLen
);
}
source/libs/executor/src/timewindowoperator.c
浏览文件 @
8eb17a28
...
...
@@ -63,7 +63,6 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
SResultRow
**
pResult
,
int64_t
tableGroupId
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
,
SAggSupporter
*
pAggSup
,
SExecTaskInfo
*
pTaskInfo
)
{
assert
(
win
->
skey
<=
win
->
ekey
);
SResultRow
*
pResultRow
=
doSetResultOutBufByKey
(
pAggSup
->
pResultBuf
,
pResultRowInfo
,
(
char
*
)
&
win
->
skey
,
TSDB_KEYSIZE
,
masterscan
,
tableGroupId
,
pTaskInfo
,
true
,
pAggSup
);
...
...
source/libs/function/src/builtins.c
浏览文件 @
8eb17a28
...
...
@@ -194,8 +194,9 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
void
static
addTimezoneParam
(
SNodeList
*
pList
)
{
char
buf
[
6
]
=
{
0
};
time_t
t
=
taosTime
(
NULL
);
struct
tm
*
tmInfo
=
taosLocalTime
(
&
t
,
NULL
);
strftime
(
buf
,
sizeof
(
buf
),
"%z"
,
tmInfo
);
struct
tm
tmInfo
;
taosLocalTime
(
&
t
,
&
tmInfo
);
strftime
(
buf
,
sizeof
(
buf
),
"%z"
,
&
tmInfo
);
int32_t
len
=
(
int32_t
)
strlen
(
buf
);
SValueNode
*
pVal
=
(
SValueNode
*
)
nodesMakeNode
(
QUERY_NODE_VALUE
);
...
...
source/libs/scalar/src/sclfunc.c
浏览文件 @
8eb17a28
...
...
@@ -1062,8 +1062,9 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
memmove
(
fraction
,
fraction
+
TSDB_TIME_PRECISION_SEC_DIGITS
,
TSDB_TIME_PRECISION_SEC_DIGITS
);
}
struct
tm
*
tmInfo
=
taosLocalTime
((
const
time_t
*
)
&
timeVal
,
NULL
);
strftime
(
buf
,
sizeof
(
buf
),
"%Y-%m-%dT%H:%M:%S"
,
tmInfo
);
struct
tm
tmInfo
;
taosLocalTime
((
const
time_t
*
)
&
timeVal
,
&
tmInfo
);
strftime
(
buf
,
sizeof
(
buf
),
"%Y-%m-%dT%H:%M:%S"
,
&
tmInfo
);
int32_t
len
=
(
int32_t
)
strlen
(
buf
);
//add timezone string
...
...
source/os/src/osTime.c
浏览文件 @
8eb17a28
...
...
@@ -357,14 +357,88 @@ FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) {
time_t
taosTime
(
time_t
*
t
)
{
return
time
(
t
);
}
time_t
taosMktime
(
struct
tm
*
timep
)
{
return
mktime
(
timep
);
}
time_t
taosMktime
(
struct
tm
*
timep
)
{
#ifdef WINDOWS
struct
tm
tm1
=
{
0
};
LARGE_INTEGER
t
;
FILETIME
f
;
SYSTEMTIME
s
;
FILETIME
ff
;
SYSTEMTIME
ss
;
LARGE_INTEGER
offset
;
time_t
tt
=
0
;
localtime_s
(
&
tm1
,
&
tt
);
ss
.
wYear
=
tm1
.
tm_year
+
1900
;
ss
.
wMonth
=
tm1
.
tm_mon
+
1
;
ss
.
wDay
=
tm1
.
tm_wday
;
ss
.
wHour
=
tm1
.
tm_hour
;
ss
.
wMinute
=
tm1
.
tm_min
;
ss
.
wSecond
=
tm1
.
tm_sec
;
ss
.
wMilliseconds
=
0
;
SystemTimeToFileTime
(
&
ss
,
&
ff
);
offset
.
QuadPart
=
ff
.
dwHighDateTime
;
offset
.
QuadPart
<<=
32
;
offset
.
QuadPart
|=
ff
.
dwLowDateTime
;
s
.
wYear
=
timep
->
tm_year
+
1900
;
s
.
wMonth
=
timep
->
tm_mon
+
1
;
s
.
wDay
=
timep
->
tm_wday
;
s
.
wHour
=
timep
->
tm_hour
;
s
.
wMinute
=
timep
->
tm_min
;
s
.
wSecond
=
timep
->
tm_sec
;
s
.
wMilliseconds
=
0
;
SystemTimeToFileTime
(
&
s
,
&
f
);
t
.
QuadPart
=
f
.
dwHighDateTime
;
t
.
QuadPart
<<=
32
;
t
.
QuadPart
|=
f
.
dwLowDateTime
;
t
.
QuadPart
-=
offset
.
QuadPart
;
return
(
time_t
)(
t
.
QuadPart
/
10000000
);
#else
return
mktime
(
timep
);
#endif
}
struct
tm
*
taosLocalTime
(
const
time_t
*
timep
,
struct
tm
*
result
)
{
if
(
result
==
NULL
)
{
return
localtime
(
timep
);
}
#ifdef WINDOWS
localtime_s
(
result
,
timep
);
if
(
*
timep
<
0
)
{
SYSTEMTIME
ss
,
s
;
FILETIME
ff
,
f
;
LARGE_INTEGER
offset
;
struct
tm
tm1
;
time_t
tt
=
0
;
localtime_s
(
&
tm1
,
&
tt
);
ss
.
wYear
=
tm1
.
tm_year
+
1900
;
ss
.
wMonth
=
tm1
.
tm_mon
+
1
;
ss
.
wDay
=
tm1
.
tm_mday
;
ss
.
wHour
=
tm1
.
tm_hour
;
ss
.
wMinute
=
tm1
.
tm_min
;
ss
.
wSecond
=
tm1
.
tm_sec
;
ss
.
wMilliseconds
=
0
;
SystemTimeToFileTime
(
&
ss
,
&
ff
);
offset
.
QuadPart
=
ff
.
dwHighDateTime
;
offset
.
QuadPart
<<=
32
;
offset
.
QuadPart
|=
ff
.
dwLowDateTime
;
offset
.
QuadPart
+=
*
timep
*
10000000
;
f
.
dwLowDateTime
=
offset
.
QuadPart
&
0xffffffff
;
f
.
dwHighDateTime
=
(
offset
.
QuadPart
>>
32
)
&
0xffffffff
;
FileTimeToSystemTime
(
&
f
,
&
s
);
result
->
tm_sec
=
s
.
wSecond
;
result
->
tm_min
=
s
.
wMinute
;
result
->
tm_hour
=
s
.
wHour
;
result
->
tm_mday
=
s
.
wDay
;
result
->
tm_mon
=
s
.
wMonth
-
1
;
result
->
tm_year
=
s
.
wYear
-
1900
;
result
->
tm_wday
=
s
.
wDayOfWeek
;
result
->
tm_yday
=
0
;
result
->
tm_isdst
=
0
;
}
else
{
localtime_s
(
result
,
timep
);
}
#else
localtime_r
(
timep
,
result
);
#endif
...
...
tests/system-test/7-tmq/stbTagFilter.py
浏览文件 @
8eb17a28
...
...
@@ -5,103 +5,250 @@ import time
import
socket
import
os
import
threading
from
enum
import
Enum
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
__init__
(
self
):
self
.
snapshot
=
0
self
.
vgroups
=
4
self
.
ctbNum
=
1
self
.
rowsPerTbl
=
10000
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"========
test case 1
: "
)
paraDict
=
{
'dbName'
:
'db
2
'
,
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"========
prepare test env include database, stable, ctables, and insert data
: "
)
paraDict
=
{
'dbName'
:
'db
t
'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
2
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
1
0
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
10
,
'ctbNum'
:
1
,
'rowsPerTbl'
:
1000
00
,
'batchNum'
:
1
20
0
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
10
,
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
}
'showRow'
:
1
,
'snapshot'
:
0
}
topicNameList
=
[
'topic1'
]
expectRowsList
=
[]
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
1
,
replica
=
1
)
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
]
,
replica
=
1
)
tdLog
.
info
(
"create stb"
)
t
dCom
.
create_stable
(
tdSql
,
dbname
=
paraDict
[
"dbName"
],
stbname
=
paraDict
[
"stbName"
],
column_elm_list
=
paraDict
[
'colSchema'
],
tag_elm_list
=
paraDict
[
'tagSchema'
])
t
mqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
'ctbNum'
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
asyncInsertData
(
paraDict
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
1
,
'rowsPerTbl'
:
100000
,
'batchNum'
:
3000
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
paraDict
[
'snapshot'
]
=
self
.
snapshot
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
# update to half tables
paraDict
[
'rowsPerTbl'
]
=
int
(
self
.
rowsPerTbl
/
2
)
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
# queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
queryString
=
"select ts, c1, c2, t4 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicFromStb1
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
# paraDict['ctbNum'] = self.ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
consumerId
=
0
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
)
topicList
=
topicFromStb1
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdLog
.
info
(
"run select sql from db"
)
tdSql
.
query
(
queryString
)
expectrowcnt
=
tdSql
.
getRows
()
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d"
%
(
totalConsumeRows
,
expectrowcnt
))
if
totalConsumeRows
!=
expectrowcnt
:
tdLog
.
exit
(
"tmq consume rows error!"
)
tmqCom
.
checkFileContent
(
consumerId
,
queryString
)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
tmqCase2
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 2: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
1
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
5000
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
paraDict
[
'snapshot'
]
=
self
.
snapshot
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tdLog
.
info
(
"restart taosd to ensure that the data falls into the disk"
)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
# update to half tables
paraDict
[
'startTs'
]
=
paraDict
[
'startTs'
]
+
int
(
self
.
rowsPerTbl
/
2
)
paraDict
[
'rowsPerTbl'
]
=
int
(
self
.
rowsPerTbl
/
2
)
tmqCom
.
insert_data_with_autoCreateTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog
.
info
(
"create topics from stb with filter"
)
# queryString = "select ts, sin(c1), pow(c2,3) from %s.%s where t2 == 'beijing' or t2 == 'changsha'" %(paraDict['dbName'], paraDict['stbName'])
queryString
=
"select * from %s.%s where t2 == 'beijing' or t2 == 'changsha'"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
0
],
queryString
)
tmqCom
.
initConsumerTable
()
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
queryString
=
"select ts, c1, c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicFromStb1
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
#
start tmq consume processor
tdLog
.
info
(
"insert consume info to consume processor"
)
consumerId
=
0
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
topicList
=
topicNameList
[
0
]
ifcheckdata
=
0
tdSql
.
execute
(
sqlString
)
#
paraDict['ctbNum'] = self.ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
consumerId
=
1
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
)
topicList
=
topicFromStb1
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:2000, auto.offset.reset:earliest'
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
paraDict
[
'pollDelay'
],
paraDict
[
"dbName"
],
paraDict
[
'showMsg'
],
paraDict
[
'showRow'
])
# tmqCom.getStartCommitNotifyFromTmqsim()
tmqCom
.
getStartConsumeNotifyFromTmqsim
()
tdLog
.
info
(
"create some new ctb"
)
paraDict
[
'ctbStartIdx'
]
=
paraDict
[
'ctbStartIdx'
]
+
paraDict
[
'ctbNum'
]
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
'ctbNum'
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data into new ctb"
)
pThread
=
tmqCom
.
asyncInsertData
(
paraDict
)
pThread
.
join
()
tdLog
.
info
(
"wait insert end"
)
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
())
tdLog
.
info
(
"wait the consume result"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdSql
.
query
(
queryString
)
totalRowsInserted
=
tdSql
.
getRows
()
if
expectRowsList
[
0
]
!=
resultList
[
0
]:
tdLog
.
info
(
"expect consume rows: %d, act consume rows: %d"
%
(
expectRowsList
[
0
],
resultList
[
0
]))
tdLog
.
exit
(
"0 tmq consume rows error!"
)
tdLog
.
info
(
"act consume rows: %d, act insert rows: %d, expect consume rows: %d, "
%
(
totalConsumeRows
,
totalRowsInserted
,
expectrowcnt
))
time
.
sleep
(
10
)
for
i
in
range
(
len
(
topicNameList
)):
tdSql
.
query
(
"drop topic %s"
%
topicNameList
[
i
])
if
totalConsumeRows
!=
expectrowcnt
:
tdLog
.
exit
(
"tmq consume rows error!"
)
# tmqCom.checkFileContent(consumerId, queryString)
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
def
run
(
self
):
tdSql
.
prepare
()
self
.
prepareTestEnv
()
tdLog
.
printNoPrefix
(
"============================================="
)
tdLog
.
printNoPrefix
(
"======== snapshot is 0: only consume from wal"
)
self
.
tmqCase1
()
# self.tmqCase2()
self
.
prepareTestEnv
()
tdLog
.
printNoPrefix
(
"===================================================================="
)
tdLog
.
printNoPrefix
(
"======== snapshot is 1: firstly consume from tsbs, and then from wal"
)
self
.
snapshot
=
1
self
.
tmqCase1
()
# self.tmqCase2()
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/test/c/tmqDemo.c
浏览文件 @
8eb17a28
...
...
@@ -596,7 +596,8 @@ void printParaIntoFile() {
g_fp
=
pFile
;
time_t
tTime
=
taosGetTimestampSec
();
struct
tm
tm
=
*
taosLocalTime
(
&
tTime
,
NULL
);
struct
tm
tm
;
taosLocalTime
(
&
tTime
,
&
tm
);
taosFprintfFile
(
pFile
,
"###################################################################
\n
"
);
taosFprintfFile
(
pFile
,
"# configDir: %s
\n
"
,
configDir
);
...
...
tests/test/c/tmqSim.c
浏览文件 @
8eb17a28
...
...
@@ -171,7 +171,8 @@ static void printHelp() {
char
*
getCurrentTimeString
(
char
*
timeString
)
{
time_t
tTime
=
taosGetTimestampSec
();
struct
tm
tm
=
*
taosLocalTime
(
&
tTime
,
NULL
);
struct
tm
tm
;
taosLocalTime
(
&
tTime
,
&
tm
);
sprintf
(
timeString
,
"%d-%02d-%02d %02d:%02d:%02d"
,
tm
.
tm_year
+
1900
,
tm
.
tm_mon
+
1
,
tm
.
tm_mday
,
tm
.
tm_hour
,
tm
.
tm_min
,
tm
.
tm_sec
);
...
...
@@ -420,18 +421,6 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) {
ms
=
val
%
1000
;
}
/*
comment out as it make testcases like select_with_tags.sim fail.
but in windows, this may cause the call to localtime crash if tt < 0,
need to find a better solution.
if (tt < 0) {
tt = 0;
}
*/
#ifdef WINDOWS
if
(
tt
<
0
)
tt
=
0
;
#endif
if
(
tt
<=
0
&&
ms
<
0
)
{
tt
--
;
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
...
...
@@ -443,8 +432,9 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) {
}
}
struct
tm
*
ptm
=
taosLocalTime
(
&
tt
,
NULL
);
size_t
pos
=
strftime
(
buf
,
35
,
"%Y-%m-%d %H:%M:%S"
,
ptm
);
struct
tm
ptm
;
taosLocalTime
(
&
tt
,
&
ptm
);
size_t
pos
=
strftime
(
buf
,
35
,
"%Y-%m-%d %H:%M:%S"
,
&
ptm
);
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
sprintf
(
buf
+
pos
,
".%09d"
,
ms
);
...
...
tests/tsim/src/simExe.c
浏览文件 @
8eb17a28
...
...
@@ -635,7 +635,7 @@ bool simCreateTaosdConnect(SScript *script, char *rest) {
bool
simExecuteNativeSqlCommand
(
SScript
*
script
,
char
*
rest
,
bool
isSlow
)
{
char
timeStr
[
30
]
=
{
0
};
time_t
tt
;
struct
tm
*
tp
;
struct
tm
tp
;
SCmdLine
*
line
=
&
script
->
lines
[
script
->
linePos
];
int32_t
ret
=
-
1
;
...
...
@@ -768,20 +768,9 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
}
else
{
tt
=
(
*
(
int64_t
*
)
row
[
i
])
/
1000000000
;
}
/* comment out as it make testcases like select_with_tags.sim fail.
but in windows, this may cause the call to localtime crash if tt < 0,
need to find a better solution.
if (tt < 0) {
tt = 0;
}
*/
#ifdef WINDOWS
if
(
tt
<
0
)
tt
=
0
;
#endif
t
p
=
taosLocalTime
(
&
tt
,
NULL
);
strftime
(
timeStr
,
64
,
"%y-%m-%d %H:%M:%S"
,
tp
);
t
aosLocalTime
(
&
tt
,
&
tp
);
strftime
(
timeStr
,
64
,
"%y-%m-%d %H:%M:%S"
,
&
tp
);
if
(
precision
==
TSDB_TIME_PRECISION_MILLI
)
{
sprintf
(
value
,
"%s.%03d"
,
timeStr
,
(
int32_t
)(
*
((
int64_t
*
)
row
[
i
])
%
1000
));
}
else
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
...
...
tools/shell/src/shellEngine.c
浏览文件 @
8eb17a28
...
...
@@ -231,18 +231,6 @@ char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
ms
=
val
%
1000
;
}
/*
comment out as it make testcases like select_with_tags.sim fail.
but in windows, this may cause the call to localtime crash if tt < 0,
need to find a better solution.
if (tt < 0) {
tt = 0;
}
*/
#ifdef WINDOWS
if
(
tt
<
0
)
tt
=
0
;
#endif
if
(
tt
<=
0
&&
ms
<
0
)
{
tt
--
;
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
...
...
@@ -254,8 +242,9 @@ char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
}
}
struct
tm
*
ptm
=
taosLocalTime
(
&
tt
,
NULL
);
size_t
pos
=
strftime
(
buf
,
35
,
"%Y-%m-%d %H:%M:%S"
,
ptm
);
struct
tm
ptm
=
{
0
};
taosLocalTime
(
&
tt
,
&
ptm
);
size_t
pos
=
strftime
(
buf
,
35
,
"%Y-%m-%d %H:%M:%S"
,
&
ptm
);
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
sprintf
(
buf
+
pos
,
".%09d"
,
ms
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录