Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bece7ecb
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
bece7ecb
编写于
11月 24, 2022
作者:
K
kailixu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '2.6' into feat/TS-1883-2.6
上级
edc3cda0
c0edaaf7
变更
17
显示空白变更内容
内联
并排
Showing
17 changed file
with
699 addition
and
28 deletion
+699
-28
examples/go/demo.go
examples/go/demo.go
+578
-0
src/client/src/tscServer.c
src/client/src/tscServer.c
+2
-1
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+1
-1
src/common/inc/tglobal.h
src/common/inc/tglobal.h
+1
-0
src/common/src/tglobal.c
src/common/src/tglobal.c
+13
-0
src/inc/taoserror.h
src/inc/taoserror.h
+1
-0
src/kit/taos-tools
src/kit/taos-tools
+1
-1
src/mnode/src/mnodeTable.c
src/mnode/src/mnodeTable.c
+46
-0
src/os/src/detail/osSysinfo.c
src/os/src/detail/osSysinfo.c
+8
-0
src/plugins/CMakeLists.txt
src/plugins/CMakeLists.txt
+20
-17
src/query/inc/qExtbuffer.h
src/query/inc/qExtbuffer.h
+1
-3
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+7
-1
src/query/src/qExtbuffer.c
src/query/src/qExtbuffer.c
+8
-2
src/query/src/queryMain.c
src/query/src/queryMain.c
+9
-0
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+1
-1
src/util/inc/tconfig.h
src/util/inc/tconfig.h
+1
-1
src/util/src/terror.c
src/util/src/terror.c
+1
-0
未找到文件。
examples/go/demo.go
0 → 100644
浏览文件 @
bece7ecb
package
main
import
(
"container/heap"
"database/sql"
"database/sql/driver"
"flag"
"fmt"
"reflect"
"runtime"
"strconv"
"strings"
"sync"
"time"
_
"github.com/taosdata/driver-go/v2/taosSql"
)
type
heapElem
struct
{
timeout
int64
colName
string
}
type
MinHeap
[]
heapElem
type
Column
struct
{
}
func
(
h
MinHeap
)
Len
()
int
{
return
len
(
h
)
}
func
(
h
MinHeap
)
Less
(
i
,
j
int
)
bool
{
res
:=
h
[
i
]
.
timeout
-
h
[
j
]
.
timeout
if
res
<
0
{
return
true
}
else
if
res
>
0
{
return
false
}
cmp
:=
strings
.
Compare
(
h
[
i
]
.
colName
,
h
[
j
]
.
colName
)
if
cmp
<=
0
{
return
true
}
else
{
return
false
}
}
func
(
h
*
MinHeap
)
Swap
(
i
,
j
int
)
{
(
*
h
)[
i
],
(
*
h
)[
j
]
=
(
*
h
)[
j
],
(
*
h
)[
i
]
}
func
(
h
*
MinHeap
)
Push
(
x
interface
{})
{
*
h
=
append
(
*
h
,
x
.
(
heapElem
))
}
func
(
h
*
MinHeap
)
Empty
()
bool
{
if
len
(
*
h
)
==
0
{
return
true
}
return
false
}
func
(
h
*
MinHeap
)
Top
()
heapElem
{
return
(
*
h
)[
0
]
}
func
(
h
*
MinHeap
)
Pop
()
interface
{}
{
res
:=
(
*
h
)[
len
(
*
h
)
-
1
]
*
h
=
(
*
h
)[
:
len
(
*
h
)
-
1
]
return
res
}
type
config
struct
{
hostName
string
serverPort
int
user
string
password
string
dbName
string
srcdbName
string
supTblName
string
}
var
configPara
config
var
taosDriverName
=
"taosSql"
var
url
string
func
init
()
{
flag
.
StringVar
(
&
configPara
.
hostName
,
"h"
,
"127.0.0.1"
,
"The host to connect to TDengine server."
)
flag
.
IntVar
(
&
configPara
.
serverPort
,
"p"
,
6030
,
"The TCP/IP port number to use for the connection to TDengine server."
)
flag
.
StringVar
(
&
configPara
.
user
,
"u"
,
"root"
,
"The TDengine user name to use when connecting to the server."
)
flag
.
StringVar
(
&
configPara
.
password
,
"P"
,
"taosdata"
,
"The password to use when connecting to the server."
)
flag
.
StringVar
(
&
configPara
.
dbName
,
"d"
,
"test1"
,
"check database."
)
flag
.
StringVar
(
&
configPara
.
srcdbName
,
"s"
,
"test"
,
"Destination database."
)
flag
.
Parse
()
}
func
checkErr
(
err
error
,
prompt
string
)
{
if
err
!=
nil
{
fmt
.
Printf
(
"%s
\n
"
,
prompt
)
panic
(
err
)
}
}
type
schema
struct
{
idx
int
numOfField
int
timestamp
time
.
Time
colName
string
interval
int32
threshold
int32
}
type
demo
struct
{
db
*
sql
.
DB
dbname
string
srcdbname
string
metaTable
string
exceptTable
string
dStartTs
int64
dInterval
int32
dThreshold
int32
suptabname
string
metaDict
map
[
string
]
*
schema
heap
MinHeap
timer
*
time
.
Timer
wg
*
sync
.
WaitGroup
}
/***
|ts |colName |interval |threshold|
|now |stbx.tx.colx|2 |5 |
|now+1|stbx.tx.colx|2 |5 |
|now+2|stbx.tx.colx|2 |5 |
***/
type
taskInfo
struct
{
wg
*
sync
.
WaitGroup
subtask
map
[
string
]
*
demo
}
type
tableInfo
struct
{
tbname
string
createTime
string
columns
int
stbname
string
uid
int64
tid
int64
vgId
int32
}
func
GetSubTableInfo
(
db
*
sql
.
DB
,
dbname
,
stbname
string
)
[]
tableInfo
{
tbs
:=
make
([]
tableInfo
,
0
,
512
)
sql
:=
"show "
+
dbname
+
".tables"
row
,
err
:=
db
.
Query
(
sql
)
if
err
!=
nil
{
checkErr
(
err
,
sql
)
}
for
row
.
Next
()
{
var
(
tbname
string
createTime
string
columns
int
stb
string
uid
int64
tid
int64
vgId
int32
)
err
:=
row
.
Scan
(
&
tbname
,
&
createTime
,
&
columns
,
&
stb
,
&
uid
,
&
tid
,
&
vgId
)
if
err
!=
nil
{
checkErr
(
err
,
sql
)
}
if
len
(
stbname
)
==
0
{
// skip normal table
if
len
(
stb
)
==
0
||
strings
.
Compare
(
stb
,
""
)
==
0
{
continue
}
tbs
=
append
(
tbs
,
tableInfo
{
tbname
:
tbname
,
createTime
:
createTime
,
columns
:
columns
,
stbname
:
stb
,
uid
:
uid
,
tid
:
tid
,
vgId
:
vgId
})
continue
}
if
strings
.
Compare
(
stb
,
stbname
)
==
0
{
tbs
=
append
(
tbs
,
tableInfo
{
tbname
:
tbname
,
createTime
:
createTime
,
columns
:
columns
,
stbname
:
stbname
,
uid
:
uid
,
tid
:
tid
,
vgId
:
vgId
})
}
}
row
.
Close
()
return
tbs
}
func
GetStableField
(
db
*
sql
.
DB
,
dbname
,
stbname
string
)
[]
string
{
result
:=
make
([]
string
,
0
,
10
)
sql
:=
"describe "
+
dbname
+
"."
+
stbname
row
,
err
:=
db
.
Query
(
sql
)
if
err
!=
nil
{
checkErr
(
err
,
sql
)
}
count
:=
0
for
row
.
Next
()
{
var
field
string
var
ty
string
var
tlen
int32
var
note
string
row
.
Scan
(
&
field
,
&
ty
,
&
tlen
,
&
note
)
// ignore time and tag col
if
count
!=
0
&&
strings
.
Compare
(
note
,
"TAG"
)
!=
0
{
// skip first and skip tag col
result
=
append
(
result
,
field
)
}
count
=
count
+
1
}
row
.
Close
()
return
result
}
func
taskInit
(
db
*
sql
.
DB
,
dbname
string
,
srcdbname
string
,
metatable
string
,
exptable
string
,
tskinfo
*
taskInfo
)
{
{
sql
:=
fmt
.
Sprintf
(
"drop database if exists %s"
,
dbname
)
_
,
err
:=
db
.
Exec
(
sql
)
checkErr
(
err
,
sql
)
}
{
sql
:=
fmt
.
Sprintf
(
"create database if not exists %s update 2"
,
dbname
)
_
,
err
:=
db
.
Exec
(
sql
)
checkErr
(
err
,
sql
)
}
{
sql
:=
fmt
.
Sprintf
(
"create stable if not exists %s.%s (ts timestamp, dbname binary(64), tabname binary(64), colname binary(64), lastTime timestamp, offline int) tags(tablename binary(128))"
,
dbname
,
exptable
)
_
,
err
:=
db
.
Exec
(
sql
)
checkErr
(
err
,
sql
)
}
{
sql
:=
fmt
.
Sprintf
(
"create table if not exists %s.%s (ts timestamp, dbname binary(64), tablename binary(64), colName binary(128), checkInterval int, threshold int)"
,
dbname
,
metatable
)
_
,
err
:=
db
.
Exec
(
sql
)
checkErr
(
err
,
sql
)
}
tbs
:=
GetSubTableInfo
(
db
,
srcdbname
,
""
)
fmt
.
Printf
(
"tbs size %d
\n
"
,
len
(
tbs
))
fieldDict
:=
make
(
map
[
string
][]
string
)
fieldTs
:=
time
.
Now
()
.
Add
(
time
.
Hour
*
-
1000
)
for
_
,
e
:=
range
tbs
{
tbname
:=
e
.
tbname
stbname
:=
e
.
stbname
field
,
ok
:=
fieldDict
[
stbname
]
if
!
ok
{
field
=
GetStableField
(
db
,
srcdbname
,
stbname
)
fieldDict
[
stbname
]
=
field
}
for
_
,
f
:=
range
field
{
insertTableInfoIntoMetatable
:=
func
(
db
*
sql
.
DB
,
metaDB
string
,
metaTable
string
,
srcDB
string
,
srcTable
string
,
srcCol
string
,
ts
time
.
Time
,
interval
,
threshold
int
)
{
sql
:=
fmt
.
Sprintf
(
"insert into %s.%s values(%v,
\"
%s
\"
,
\"
%s
\"
,
\"
%s
\"
, %d, %d)"
,
metaDB
,
metaTable
,
ts
.
UnixMilli
(),
srcDB
,
srcTable
,
srcCol
,
interval
,
threshold
)
_
,
err
:=
db
.
Exec
(
sql
)
if
err
!=
nil
{
checkErr
(
err
,
sql
)
}
}
insertTableInfoIntoMetatable
(
db
,
dbname
,
metatable
,
srcdbname
,
tbname
,
f
,
fieldTs
,
2
,
2
*
3
)
fieldTs
=
fieldTs
.
Add
(
time
.
Millisecond
*
2
)
}
key
:=
fmt
.
Sprintf
(
"%s_%s"
,
srcdbname
,
stbname
)
_
,
ok
=
tskinfo
.
subtask
[
key
]
if
!
ok
{
tskinfo
.
subtask
[
key
]
=
&
demo
{
db
:
db
,
dbname
:
dbname
,
srcdbname
:
srcdbname
,
suptabname
:
stbname
,
metaTable
:
metatable
,
exceptTable
:
exptable
,
wg
:
tskinfo
.
wg
,
dInterval
:
2
,
dThreshold
:
2
*
3
}
}
}
}
func
subTaskStart
(
d
*
demo
)
{
d
.
Init
()
for
{
select
{
case
<-
d
.
timer
.
C
:
timeout
:=
d
.
NextTimout
()
fmt
.
Printf
(
"stbname %s, timeout %d
\n
"
,
d
.
suptabname
,
timeout
)
d
.
timer
.
Reset
(
time
.
Second
*
time
.
Duration
(
timeout
))
}
}
d
.
wg
.
Done
()
}
func
(
d
*
demo
)
Init
()
{
d
.
heap
=
make
(
MinHeap
,
0
,
200
)
heap
.
Init
(
&
d
.
heap
)
d
.
metaDict
=
make
(
map
[
string
]
*
schema
)
tbs
:=
GetSubTableInfo
(
d
.
db
,
d
.
srcdbname
,
d
.
suptabname
)
fields
:=
GetStableField
(
d
.
db
,
d
.
srcdbname
,
d
.
suptabname
)
lastRowDict
:=
func
(
db
*
sql
.
DB
,
srcDB
,
stbname
string
)
map
[
string
]
time
.
Time
{
result
:=
make
(
map
[
string
]
time
.
Time
)
sql
:=
fmt
.
Sprintf
(
"select last_row(ts) from %s.%s group by tbname"
,
srcDB
,
stbname
)
row
,
err
:=
d
.
db
.
Query
(
sql
)
if
err
!=
nil
{
checkErr
(
err
,
sql
)
}
for
row
.
Next
()
{
var
ts
time
.
Time
var
tbname
string
row
.
Scan
(
&
ts
,
&
tbname
)
result
[
tbname
]
=
ts
}
row
.
Close
()
return
result
}(
d
.
db
,
d
.
srcdbname
,
d
.
suptabname
)
for
_
,
e
:=
range
tbs
{
tbname
:=
e
.
tbname
lastTime
,
ok
:=
lastRowDict
[
tbname
]
if
!
ok
{
lastTime
=
time
.
Now
()
}
for
i
,
f
:=
range
fields
{
col
:=
fmt
.
Sprintf
(
"%s %s"
,
tbname
,
f
)
d
.
metaDict
[
col
]
=
&
schema
{
idx
:
i
,
numOfField
:
len
(
fields
),
timestamp
:
lastTime
,
colName
:
col
,
interval
:
int32
(
d
.
dInterval
),
threshold
:
d
.
dThreshold
}
{
expRecordTab
:=
fmt
.
Sprintf
(
"%s_%s_%s"
,
d
.
suptabname
,
tbname
,
f
)
sql
:=
fmt
.
Sprintf
(
"create table %s.%s using %s.%s tags(
\"
%s
\"
)"
,
d
.
dbname
,
expRecordTab
,
d
.
dbname
,
d
.
exceptTable
,
expRecordTab
)
{
fmt
.
Printf
(
"create TAB: %s
\n
"
,
sql
)
_
,
err
:=
d
.
db
.
Exec
(
sql
)
checkErr
(
err
,
sql
)
}
}
}
}
now
:=
time
.
Now
()
for
k
,
v
:=
range
d
.
metaDict
{
durtion
:=
fmt
.
Sprintf
(
"%ds"
,
v
.
interval
)
s
,
_
:=
time
.
ParseDuration
(
durtion
)
now
.
Add
(
s
)
heap
.
Push
(
&
d
.
heap
,
heapElem
{
timeout
:
now
.
Unix
(),
colName
:
k
})
}
d
.
timer
=
time
.
NewTimer
(
time
.
Second
*
1
)
}
type
ValueRows
struct
{
column
[]
interface
{}
ts
time
.
Time
tbname
string
}
func
(
d
*
demo
)
Update
(
stbname
,
tbname
,
col
string
,
interval
int32
,
threshold
int32
)
{
key
:=
fmt
.
Sprintf
(
"%s %s"
,
tbname
,
col
)
sql
:=
fmt
.
Sprintf
(
"select * from %s.%s where dbname =
\"
%s
\"
and tablename =
\"
%s
\"
and colName =
\"
%s
\"
"
,
d
.
dbname
,
d
.
metaTable
,
d
.
dbname
,
tbname
,
col
)
rows
,
_
:=
d
.
db
.
Query
(
sql
)
fmt
.
Printf
(
"check metatable %s, SQL: %s
\n
"
,
d
.
metaTable
,
sql
)
for
rows
.
Next
()
{
var
(
ts
time
.
Time
dbname
string
tbname
string
col
string
inter
int32
thresh
int32
)
err
:=
rows
.
Scan
(
&
ts
,
&
dbname
,
&
tbname
,
&
col
,
&
inter
,
&
thresh
)
if
interval
!=
inter
||
threshold
!=
thresh
{
sql
:=
fmt
.
Sprintf
(
"insert into %s.%s values(%v,
\"
%s
\"
,
\"
%s
\"
,
\"
%s
\"
, %d, %d)"
,
d
.
dbname
,
d
.
metaTable
,
ts
.
UnixMilli
(),
d
.
dbname
,
tbname
,
col
,
interval
,
threshold
)
_
,
err
=
d
.
db
.
Exec
(
sql
)
if
err
!=
nil
{
checkErr
(
err
,
sql
)
}
}
}
schemadata
:=
d
.
metaDict
[
key
]
if
schemadata
!=
nil
{
schemadata
.
interval
=
interval
schemadata
.
threshold
=
threshold
}
defer
rows
.
Close
()
}
type
ExceptSQL
struct
{
lastTime
time
.
Time
exceptTable
string
cost
int32
col
string
tab
string
elem
*
schema
key
string
}
func
(
d
*
demo
)
NextTimout
()
int32
{
now
:=
time
.
Now
()
.
Unix
()
colArray
:=
make
([]
string
,
0
,
10
)
for
!
d
.
heap
.
Empty
()
{
elem
:=
d
.
heap
.
Top
()
if
elem
.
timeout
<=
now
{
colArray
=
append
(
colArray
,
elem
.
colName
)
heap
.
Pop
(
&
d
.
heap
)
}
else
{
break
}
}
lastRowGroup
,
colIdx
:=
func
(
db
*
sql
.
DB
,
srcDB
,
stbname
string
)
(
map
[
string
]
*
ValueRows
,
map
[
string
]
int
)
{
result
:=
make
(
map
[
string
]
*
ValueRows
)
colIdx
:=
make
(
map
[
string
]
int
)
sql
:=
fmt
.
Sprintf
(
"select last_row(*) from %s.%s group by tbname"
,
srcDB
,
stbname
)
row
,
err
:=
db
.
Query
(
sql
)
if
err
!=
nil
{
checkErr
(
err
,
sql
)
}
tt
,
err
:=
row
.
ColumnTypes
()
types
:=
make
([]
reflect
.
Type
,
len
(
tt
))
for
i
,
tp
:=
range
tt
{
st
:=
tp
.
ScanType
()
types
[
i
]
=
st
}
columns
,
_
:=
row
.
Columns
()
for
row
.
Next
()
{
values
:=
make
([]
interface
{},
len
(
tt
))
for
i
:=
range
values
{
values
[
i
]
=
reflect
.
New
(
types
[
i
])
.
Interface
()
}
row
.
Scan
(
values
...
)
ts
,
_
:=
values
[
0
]
.
(
driver
.
Valuer
)
.
Value
()
tts
,
_
:=
ts
.
(
time
.
Time
)
tbname
,
_
:=
values
[
len
(
tt
)
-
1
]
.
(
driver
.
Valuer
)
.
Value
()
ttbname
,
_
:=
tbname
.
(
string
)
result
[
ttbname
]
=
&
ValueRows
{
column
:
values
,
ts
:
tts
,
tbname
:
ttbname
}
}
row
.
Close
()
for
i
,
v
:=
range
columns
{
colIdx
[
v
]
=
i
}
return
result
,
colIdx
}(
d
.
db
,
d
.
srcdbname
,
d
.
suptabname
)
expSql
:=
make
([]
*
ExceptSQL
,
0
,
len
(
colArray
))
for
_
,
e
:=
range
colArray
{
elem
:=
d
.
metaDict
[
e
]
var
colName
string
var
tabName
string
fmt
.
Sscanf
(
e
,
"%s %s"
,
&
tabName
,
&
colName
)
ts
,
update
:=
func
(
rowGroup
map
[
string
]
*
ValueRows
,
colIdx
map
[
string
]
int
,
tabName
,
colName
string
)
(
time
.
Time
,
bool
)
{
var
ts
time
.
Time
update
:=
false
field
:=
fmt
.
Sprintf
(
"last_row(%s)"
,
colName
)
idx
,
ok1
:=
colIdx
[
field
]
row
,
ok2
:=
rowGroup
[
tabName
]
if
ok1
&&
ok2
{
if
row
!=
nil
{
v
,
_
:=
row
.
column
[
idx
]
.
(
driver
.
Valuer
)
.
Value
()
if
v
!=
nil
{
ts
=
row
.
ts
update
=
true
}
}
}
return
ts
,
update
}(
lastRowGroup
,
colIdx
,
tabName
,
colName
)
if
!
update
{
ts
=
elem
.
timestamp
}
exceptTableName
:=
fmt
.
Sprintf
(
"%s_%s_%s"
,
d
.
suptabname
,
tabName
,
colName
)
var
dura
time
.
Duration
=
ts
.
Sub
(
elem
.
timestamp
)
cost
:=
int32
(
dura
.
Seconds
())
if
cost
==
0
{
elem
.
timestamp
=
ts
expSql
=
append
(
expSql
,
&
ExceptSQL
{
exceptTable
:
exceptTableName
,
cost
:
cost
,
lastTime
:
ts
,
col
:
colName
,
tab
:
tabName
,
elem
:
elem
,
key
:
e
})
}
else
{
elem
.
timestamp
=
ts
if
cost
>
elem
.
threshold
{
expSql
=
append
(
expSql
,
&
ExceptSQL
{
exceptTable
:
exceptTableName
,
cost
:
cost
,
lastTime
:
ts
,
col
:
colName
,
tab
:
tabName
,
elem
:
elem
,
key
:
e
})
}
}
heap
.
Push
(
&
d
.
heap
,
heapElem
{
timeout
:
int64
(
elem
.
interval
)
+
now
,
colName
:
e
})
}
var
info
strings
.
Builder
info
.
Grow
(
64
*
len
(
expSql
))
for
i
,
v
:=
range
expSql
{
if
i
==
0
{
s
:=
fmt
.
Sprintf
(
"insert into %s.%s values(%v,
\"
%s
\"
,
\"
%s
\"
,
\"
%s
\"
, %v, %d )"
,
d
.
dbname
,
v
.
exceptTable
,
time
.
Now
()
.
UnixMilli
(),
d
.
srcdbname
,
v
.
tab
,
v
.
col
,
v
.
lastTime
.
UnixMilli
(),
int
(
time
.
Now
()
.
Sub
(
v
.
elem
.
timestamp
)
.
Seconds
()))
info
.
WriteString
(
s
)
info
.
WriteString
(
" "
)
}
else
{
s
:=
fmt
.
Sprintf
(
"%s.%s values(%v,
\"
%s
\"
,
\"
%s
\"
,
\"
%s
\"
, %v, %d )"
,
d
.
dbname
,
v
.
exceptTable
,
time
.
Now
()
.
UnixMilli
(),
d
.
srcdbname
,
v
.
tab
,
v
.
col
,
v
.
lastTime
.
UnixMilli
(),
int
(
time
.
Now
()
.
Sub
(
v
.
elem
.
timestamp
)
.
Seconds
()))
info
.
WriteString
(
s
)
info
.
WriteString
(
" "
)
}
}
if
len
(
expSql
)
!=
0
{
sql
:=
info
.
String
()
fmt
.
Printf
(
"INSERT SQL: %s
\n
"
,
sql
)
_
,
err
:=
d
.
db
.
Exec
(
sql
)
checkErr
(
err
,
sql
)
}
if
!
d
.
heap
.
Empty
()
{
elem
:=
d
.
heap
.
Top
()
timeout
:=
elem
.
timeout
-
now
if
timeout
<
1
{
timeout
=
1
}
return
int32
(
timeout
)
}
return
1
}
func
printAllArgs
()
{
fmt
.
Printf
(
"
\n
============= args parse result: =============
\n
"
)
fmt
.
Printf
(
"hostName: %v
\n
"
,
configPara
.
hostName
)
fmt
.
Printf
(
"serverPort: %v
\n
"
,
configPara
.
serverPort
)
fmt
.
Printf
(
"usr: %v
\n
"
,
configPara
.
user
)
fmt
.
Printf
(
"password: %v
\n
"
,
configPara
.
password
)
fmt
.
Printf
(
"dbName: %v
\n
"
,
configPara
.
dbName
)
fmt
.
Printf
(
"srcDbName: %v
\n
"
,
configPara
.
srcdbName
)
fmt
.
Printf
(
"stbNme: %v
\n
"
,
configPara
.
supTblName
)
fmt
.
Printf
(
"================================================
\n
"
)
}
func
main
()
{
printAllArgs
()
cpuNum
:=
runtime
.
NumCPU
()
fmt
.
Println
(
"cpu核心数:"
,
cpuNum
)
runtime
.
GOMAXPROCS
(
cpuNum
)
url
=
"root:taosdata@/tcp("
+
configPara
.
hostName
+
":"
+
strconv
.
Itoa
(
configPara
.
serverPort
)
+
")/"
db
,
err
:=
sql
.
Open
(
taosDriverName
,
url
)
if
err
!=
nil
{
checkErr
(
err
,
"failed to connect db"
)
}
wg
:=
sync
.
WaitGroup
{}
info
:=
&
taskInfo
{
subtask
:
make
(
map
[
string
]
*
demo
),
wg
:
&
wg
}
taskInit
(
db
,
configPara
.
dbName
,
configPara
.
srcdbName
,
"metatable"
,
"exptable"
,
info
)
for
_
,
v
:=
range
info
.
subtask
{
wg
.
Add
(
1
)
go
subTaskStart
(
v
)
}
wg
.
Wait
()
}
src/client/src/tscServer.c
浏览文件 @
bece7ecb
...
...
@@ -2955,7 +2955,8 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
SQueryTableRsp
*
pQueryAttr
=
(
SQueryTableRsp
*
)
pRes
->
pRsp
;
if
(
pQueryAttr
==
NULL
)
{
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
tscError
(
"0x%"
PRIx64
" invalid NULL query rsp received"
,
pSql
->
self
);
pRes
->
code
=
TSDB_CODE_QRY_APP_ERROR
;
return
pRes
->
code
;
}
...
...
src/client/src/tscUtil.c
浏览文件 @
bece7ecb
...
...
@@ -3264,7 +3264,7 @@ int32_t tscColCondCopy(SArray** dest, const SArray* src, uint64_t uid, int16_t t
}
size_t
s
=
taosArrayGetSize
(
src
);
*
dest
=
taosArrayInit
(
s
,
sizeof
(
SCond
));
*
dest
=
taosArrayInit
(
s
,
sizeof
(
S
Tbl
Cond
));
for
(
int32_t
i
=
0
;
i
<
s
;
++
i
)
{
STblCond
*
pCond
=
taosArrayGet
(
src
,
i
);
...
...
src/common/inc/tglobal.h
浏览文件 @
bece7ecb
...
...
@@ -69,6 +69,7 @@ extern char tsTempDir[];
extern
int32_t
tsShortcutFlag
;
extern
int32_t
tsMaxSqlGroups
;
extern
int8_t
tsSortWhenGroupBy
;
extern
int32_t
tsQueryRssThreshold
;
// query buffer management
extern
int32_t
tsQueryBufferSize
;
// maximum allowed usage buffer size in MB for each data node during query processing
...
...
src/common/src/tglobal.c
浏览文件 @
bece7ecb
...
...
@@ -124,6 +124,9 @@ int32_t tsMaxSqlGroups = 1000000;
// order by first group by column when group by
int8_t
tsSortWhenGroupBy
=
1
;
// memory rss thresold for creating new query in MB. 0 means no threshold limitation
int32_t
tsQueryRssThreshold
=
0
;
int32_t
tsProjectExecInterval
=
10000
;
// every 10sec, the projection will be executed once
int64_t
tsMaxRetentWindow
=
24
*
3600L
;
// maximum time window tolerance
...
...
@@ -1828,6 +1831,16 @@ static void doInitGlobalConfig(void) {
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosInitConfigOption
(
cfg
);
cfg
.
option
=
"queryRssThreshold"
;
cfg
.
ptr
=
&
tsQueryRssThreshold
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT32
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
;
cfg
.
minValue
=
0
;
cfg
.
maxValue
=
2048
*
1024
;
cfg
.
ptrLength
=
0
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosInitConfigOption
(
cfg
);
#ifdef TD_TSZ
// lossy compress
cfg
.
option
=
"lossyColumns"
;
...
...
src/inc/taoserror.h
浏览文件 @
bece7ecb
...
...
@@ -298,6 +298,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition")
#define TSDB_CODE_QRY_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0710) //"invalid schema version")
#define TSDB_CODE_QRY_RESULT_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0711) //"result num is too large")
#define TSDB_CODE_QRY_RSS_THRESHOLD TAOS_DEF_ERROR_CODE(0, 0x0712) //"query memory rss threshold")
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired"
...
...
taos-tools
@
efa2a5fa
比较
64350feb
...
efa2a5fa
Subproject commit
64350feba73d584f920474d8abcd913491081897
Subproject commit
efa2a5fa2bf0961a238e5485e8a458bf6d85cced
src/mnode/src/mnodeTable.c
浏览文件 @
bece7ecb
...
...
@@ -236,6 +236,41 @@ static int32_t mnodeChildTableActionDelete(SSdbRow *pRow) {
return
TSDB_CODE_SUCCESS
;
}
static
void
mnodeTableActionUpdateTimeSeries
(
const
char
*
tableId
,
int64_t
diffCols
)
{
if
(
diffCols
>
0
)
{
grantAdd
(
TSDB_GRANT_TIMESERIES
,
diffCols
);
}
else
if
(
diffCols
<
0
)
{
grantRestore
(
TSDB_GRANT_TIMESERIES
,
-
diffCols
);
}
else
{
return
;
}
if
(
tableId
)
{
SName
sName
=
{
0
};
char
db
[
TSDB_DB_NAME_LEN
]
=
{
0
};
SDbObj
*
pDb
=
NULL
;
SAcctObj
*
pAcct
=
NULL
;
if
(
tNameFromString
(
&
sName
,
tableId
,
T_NAME_ACCT
|
T_NAME_DB
))
{
mWarn
(
"table:%s, failed to get SName to update timeseries"
,
tableId
);
return
;
}
if
(
tNameExtractFullName
((
const
SName
*
)
&
sName
,
db
))
{
mWarn
(
"table:%s, failed to extract SName to update timeseries"
,
tableId
);
return
;
}
if
(
!
(
pDb
=
mnodeGetDb
(
db
)))
{
mWarn
(
"table:%s, db:%s not exist to update timeseries"
,
tableId
,
db
);
return
;
};
if
(
!
(
pAcct
=
mnodeGetAcct
(
pDb
->
acct
)))
{
mWarn
(
"table:%s, acct:%s not exist to update timeseries"
,
tableId
,
pDb
->
acct
);
return
;
}
if
(
pAcct
)
pAcct
->
acctInfo
.
numOfTimeSeries
+=
diffCols
;
}
}
static
int32_t
mnodeChildTableActionUpdate
(
SSdbRow
*
pRow
)
{
SCTableObj
*
pNew
=
pRow
->
pObj
;
SCTableObj
*
pTable
=
mnodeGetChildTable
(
pNew
->
info
.
tableId
);
...
...
@@ -246,6 +281,10 @@ static int32_t mnodeChildTableActionUpdate(SSdbRow *pRow) {
void
*
oldSTable
=
pTable
->
superTable
;
int32_t
oldRefCount
=
pTable
->
refCount
;
if
(
pTable
->
info
.
type
==
TSDB_NORMAL_TABLE
)
{
mnodeTableActionUpdateTimeSeries
(
pTable
->
info
.
tableId
,
pNew
->
numOfColumns
-
pTable
->
numOfColumns
);
}
memcpy
(
pTable
,
pNew
,
sizeof
(
SCTableObj
));
pTable
->
refCount
=
oldRefCount
;
...
...
@@ -528,6 +567,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) {
int32_t
oldRefCount
=
pTable
->
refCount
;
int32_t
oldNumOfTables
=
pTable
->
numOfTables
;
mnodeTableActionUpdateTimeSeries
(
pTable
->
info
.
tableId
,
(
int64_t
)
oldNumOfTables
*
(
pNew
->
numOfColumns
-
pTable
->
numOfColumns
));
memcpy
(
pTable
,
pNew
,
sizeof
(
SSTableObj
));
pTable
->
vgHash
=
oldVgHash
;
...
...
@@ -1464,6 +1506,7 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
pAcct
->
acctInfo
.
numOfTimeSeries
+=
(
ncols
*
pStable
->
numOfTables
);
mnodeDecAcctRef
(
pAcct
);
}
grantAdd
(
TSDB_GRANT_TIMESERIES
,
(
uint64_t
)
ncols
*
pStable
->
numOfTables
);
mInfo
(
"msg:%p, app:%p stable %s, start to add column"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
pStable
->
info
.
tableId
);
...
...
@@ -1514,6 +1557,7 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) {
pAcct
->
acctInfo
.
numOfTimeSeries
-=
pStable
->
numOfTables
;
mnodeDecAcctRef
(
pAcct
);
}
grantRestore
(
TSDB_GRANT_TIMESERIES
,
pStable
->
numOfTables
);
mInfo
(
"msg:%p, app:%p stable %s, start to delete column"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
pStable
->
info
.
tableId
);
...
...
@@ -2471,6 +2515,7 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
pAcct
->
acctInfo
.
numOfTimeSeries
+=
ncols
;
mnodeDecAcctRef
(
pAcct
);
}
grantAdd
(
TSDB_GRANT_TIMESERIES
,
ncols
);
mInfo
(
"msg:%p, app:%p ctable %s, start to add column"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
pTable
->
info
.
tableId
);
monSaveAuditLog
(
MON_DDL_CMD_ADD_COLUMN
,
mnodeGetUserFromMsg
(
pMsg
),
pTable
->
info
.
tableId
,
true
);
...
...
@@ -2506,6 +2551,7 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
pAcct
->
acctInfo
.
numOfTimeSeries
--
;
mnodeDecAcctRef
(
pAcct
);
}
grantRestore
(
TSDB_GRANT_TIMESERIES
,
1
);
mInfo
(
"msg:%p, app:%p ctable %s, start to drop column %s"
,
pMsg
,
pMsg
->
rpcMsg
.
ahandle
,
pTable
->
info
.
tableId
,
colName
);
monSaveAuditLog
(
MON_DDL_CMD_DROP_COLUMN
,
mnodeGetUserFromMsg
(
pMsg
),
pTable
->
info
.
tableId
,
true
);
...
...
src/os/src/detail/osSysinfo.c
浏览文件 @
bece7ecb
...
...
@@ -466,6 +466,13 @@ bool taosGetNetworkIO(float *netInKb, float *netOutKb) {
}
bool
taosReadProcIO
(
int64_t
*
rchars
,
int64_t
*
wchars
,
int64_t
*
rbytes
,
int64_t
*
wbytes
)
{
#if defined _TD_ARM_
if
(
rchars
)
*
rchars
=
0
;
if
(
wchars
)
*
wchars
=
0
;
if
(
rbytes
)
*
rbytes
=
0
;
if
(
wbytes
)
*
wbytes
=
0
;
return
true
;
#else
FILE
*
fp
=
fopen
(
tsProcIOFile
,
"r"
);
if
(
fp
==
NULL
)
{
uError
(
"open file:%s failed"
,
tsProcIOFile
);
...
...
@@ -511,6 +518,7 @@ bool taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *rbytes, int64_t *
}
return
true
;
#endif
}
bool
taosGetProcIO
(
float
*
rcharKB
,
float
*
wcharKB
,
float
*
rbyteKB
,
float
*
wbyteKB
)
{
...
...
src/plugins/CMakeLists.txt
浏览文件 @
bece7ecb
...
...
@@ -50,37 +50,39 @@ ELSE ()
IF
(
TD_LINUX
)
include
(
ExternalProject
)
set
(
_upx_prefix
"$ENV{HOME}/.taos/externals/upx"
)
ExternalProject_Add
(
upx
PREFIX
"
${
_upx_prefix
}
"
URL https://github.com/upx/upx/releases/download/v3.96/upx-3.96-
${
PLATFORM_ARCH_STR
}
_linux.tar.xz
CONFIGURE_COMMAND
""
BUILD_COMMAND
""
INSTALL_COMMAND
""
)
#
set(_upx_prefix "$ENV{HOME}/.taos/externals/upx")
#
ExternalProject_Add(upx
#
PREFIX "${_upx_prefix}"
#
URL https://github.com/upx/upx/releases/download/v3.96/upx-3.96-${PLATFORM_ARCH_STR}_linux.tar.xz
#
CONFIGURE_COMMAND ""
#
BUILD_COMMAND ""
#
INSTALL_COMMAND ""
#
)
ExternalProject_Add
(
taosadapter
PREFIX
"taosadapter"
SOURCE_DIR
${
CMAKE_CURRENT_SOURCE_DIR
}
/taosadapter
BUILD_ALWAYS off
DEPENDS taos upx
# DEPENDS taos upx
DEPENDS taos
BUILD_IN_SOURCE 1
CONFIGURE_COMMAND cmake -E echo
"taosadapter no need cmake to config"
PATCH_COMMAND
COMMAND git clean -f -d
BUILD_COMMAND
COMMAND cmake -E echo
"building taosadapter ..."
COMMAND CGO_CFLAGS=-I
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc CGO_LDFLAGS=-L
${
CMAKE_BINARY_DIR
}
/build/lib go build -a -ldflags
"-s -w -X github.com/taosdata/taosadapter/version.Version=
${
taos_version
}
-X github.com/taosdata/taosadapter/version.CommitID=
${
taosadapter_commit_sha1
}
"
COMMAND CGO_CFLAGS=-I
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc CGO_LDFLAGS=-L
${
CMAKE_BINARY_DIR
}
/build/lib go build -a -o taosadapter-debug -ldflags
"-X github.com/taosdata/taosadapter/version.Version=
${
taos_version
}
-X github.com/taosdata/taosadapter/version.CommitID=
${
taosadapter_commit_sha1
}
"
COMMAND CGO_CFLAGS=-I
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc CGO_LDFLAGS=-L
${
CMAKE_BINARY_DIR
}
/build/lib go build -a -ldflags
"-X github.com/taosdata/taosadapter/version.Version=
${
taos_version
}
-X github.com/taosdata/taosadapter/version.CommitID=
${
taosadapter_commit_sha1
}
"
# COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../inc CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X github.com/taosdata/taosadapter/version.Version=${taos_version} -X github.com/taosdata/taosadapter/version.CommitID=${taosadapter_commit_sha1}"
# COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../inc CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X github.com/taosdata/taosadapter/version.Version=${taos_version} -X github.com/taosdata/taosadapter/version.CommitID=${taosadapter_commit_sha1}"
INSTALL_COMMAND
COMMAND
${
_upx_prefix
}
/src/upx/upx taosadapter
#
COMMAND ${_upx_prefix}/src/upx/upx taosadapter
COMMAND cmake -E copy taosadapter
${
CMAKE_BINARY_DIR
}
/build/bin
COMMAND cmake -E make_directory
${
CMAKE_BINARY_DIR
}
/test/cfg/
COMMAND cmake -E copy ./example/config/taosadapter.toml
${
CMAKE_BINARY_DIR
}
/test/cfg/
COMMAND cmake -E copy ./taosadapter.service
${
CMAKE_BINARY_DIR
}
/test/cfg/
COMMAND cmake -E copy taosadapter-debug
${
CMAKE_BINARY_DIR
}
/build/bin
#
COMMAND cmake -E copy taosadapter-debug ${CMAKE_BINARY_DIR}/build/bin
)
unset
(
_upx_prefix
)
#
unset(_upx_prefix)
ELSEIF
(
TD_DARWIN
)
include
(
ExternalProject
)
ExternalProject_Add
(
taosadapter
...
...
@@ -93,14 +95,15 @@ ELSE ()
PATCH_COMMAND
COMMAND git clean -f -d
BUILD_COMMAND
COMMAND CGO_CFLAGS=-I
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc CGO_LDFLAGS=-L
${
CMAKE_BINARY_DIR
}
/build/lib go build -a -ldflags
"-s -w -X github.com/taosdata/taosadapter/version.Version=
${
taos_version
}
-X github.com/taosdata/taosadapter/version.CommitID=
${
taosadapter_commit_sha1
}
"
COMMAND CGO_CFLAGS=-I
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc CGO_LDFLAGS=-L
${
CMAKE_BINARY_DIR
}
/build/lib go build -a -o taosadapter-debug -ldflags
"-X github.com/taosdata/taosadapter/version.Version=
${
taos_version
}
-X github.com/taosdata/taosadapter/version.CommitID=
${
taosadapter_commit_sha1
}
"
COMMAND CGO_CFLAGS=-I
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc CGO_LDFLAGS=-L
${
CMAKE_BINARY_DIR
}
/build/lib go build -a -ldflags
"-X github.com/taosdata/taosadapter/version.Version=
${
taos_version
}
-X github.com/taosdata/taosadapter/version.CommitID=
${
taosadapter_commit_sha1
}
"
# COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../inc CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X github.com/taosdata/taosadapter/version.Version=${taos_version} -X github.com/taosdata/taosadapter/version.CommitID=${taosadapter_commit_sha1}"
# COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../inc CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X github.com/taosdata/taosadapter/version.Version=${taos_version} -X github.com/taosdata/taosadapter/version.CommitID=${taosadapter_commit_sha1}"
INSTALL_COMMAND
COMMAND cmake -E copy taosadapter
${
CMAKE_BINARY_DIR
}
/build/bin
COMMAND cmake -E make_directory
${
CMAKE_BINARY_DIR
}
/test/cfg/
COMMAND cmake -E copy ./example/config/taosadapter.toml
${
CMAKE_BINARY_DIR
}
/test/cfg/
COMMAND cmake -E copy ./taosadapter.service
${
CMAKE_BINARY_DIR
}
/test/cfg/
COMMAND cmake -E copy taosadapter-debug
${
CMAKE_BINARY_DIR
}
/build/bin
#
COMMAND cmake -E copy taosadapter-debug ${CMAKE_BINARY_DIR}/build/bin
)
ELSE
()
MESSAGE
(
"
${
Yellow
}
Windows system still use original embedded httpd
${
ColourReset
}
"
)
...
...
src/query/inc/qExtbuffer.h
浏览文件 @
bece7ecb
...
...
@@ -227,8 +227,6 @@ tOrderDescriptor *tOrderDesCreate(const int32_t *orderColIdx, int32_t numOfOrder
void
tOrderDescDestroy
(
tOrderDescriptor
*
pDesc
);
void
taoscQSort
(
void
**
pCols
,
SSchema
*
pSchema
,
int32_t
numOfCols
,
int32_t
numOfRows
,
int32_t
index
,
__compar_fn_t
compareFn
);
void
tColModelAppend
(
SColumnModel
*
dstModel
,
tFilePage
*
dstPage
,
void
*
srcData
,
int32_t
srcStartRows
,
int32_t
numOfRowsToWrite
,
int32_t
srcCapacity
);
...
...
@@ -239,7 +237,7 @@ void tColDataQSort(tOrderDescriptor *, int32_t numOfRows, int32_t start, int32_t
void
tColDataMergeSort
(
tOrderDescriptor
*
,
int32_t
numOfRows
,
int32_t
start
,
int32_t
end
,
char
*
data
,
int32_t
orderType
);
void
taoscQSort
(
void
**
pCols
,
SSchema
*
pSchema
,
int32_t
numOfCols
,
int32_t
numOfRows
,
int32_t
index
,
__compar_fn_t
compareFn
);
int32_t
taoscQSort
(
void
**
pCols
,
SSchema
*
pSchema
,
int32_t
numOfCols
,
int32_t
numOfRows
,
int32_t
index
,
__compar_fn_t
compareFn
);
int32_t
compare_sa
(
tOrderDescriptor
*
,
int32_t
numOfRows
,
int32_t
idx1
,
int32_t
idx2
,
char
*
data
);
...
...
src/query/src/qExecutor.c
浏览文件 @
bece7ecb
...
...
@@ -6311,7 +6311,13 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
__compar_fn_t
comp
=
getKeyComparFunc
(
pSchema
[
pInfo
->
colIndex
].
type
,
pInfo
->
order
);
if
(
pInfo
->
pDataBlock
->
info
.
rows
)
{
taoscQSort
(
pCols
,
pSchema
,
numOfCols
,
pInfo
->
pDataBlock
->
info
.
rows
,
pInfo
->
colIndex
,
comp
);
int32_t
code
=
taoscQSort
(
pCols
,
pSchema
,
numOfCols
,
pInfo
->
pDataBlock
->
info
.
rows
,
pInfo
->
colIndex
,
comp
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"QInfo:0x%"
PRIx64
" can not sort since %s"
,
GET_QID
(
pOperator
->
pRuntimeEnv
),
tstrerror
(
code
));
tfree
(
pCols
);
tfree
(
pSchema
);
longjmp
(
pOperator
->
pRuntimeEnv
->
env
,
code
);
}
}
tfree
(
pCols
);
...
...
src/query/src/qExtbuffer.c
浏览文件 @
bece7ecb
...
...
@@ -1257,7 +1257,7 @@ void tOrderDescDestroy(tOrderDescriptor *pDesc) {
tfree
(
pDesc
);
}
void
taoscQSort
(
void
**
pCols
,
SSchema
*
pSchema
,
int32_t
numOfCols
,
int32_t
numOfRows
,
int32_t
idx
,
__compar_fn_t
compareFn
)
{
int32_t
taoscQSort
(
void
**
pCols
,
SSchema
*
pSchema
,
int32_t
numOfCols
,
int32_t
numOfRows
,
int32_t
idx
,
__compar_fn_t
compareFn
)
{
assert
(
numOfRows
>
0
&&
numOfCols
>
0
&&
idx
>=
0
&&
idx
<
numOfCols
);
int32_t
bytes
=
pSchema
[
idx
].
bytes
;
...
...
@@ -1289,7 +1289,12 @@ void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOf
// make sure memory buffer is enough
if
(
prevLength
<
bytes1
)
{
char
*
tmp
=
realloc
(
p
,
bytes1
*
numOfRows
);
assert
(
tmp
);
if
(
tmp
==
NULL
)
{
qError
(
"can not allocated memory. bytes:%d"
,
bytes
*
numOfRows
);
tfree
(
p
);
tfree
(
buf
);
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
p
=
tmp
;
prevLength
=
bytes1
;
...
...
@@ -1309,4 +1314,5 @@ void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOf
tfree
(
buf
);
tfree
(
p
);
return
TSDB_CODE_SUCCESS
;
}
src/query/src/queryMain.c
浏览文件 @
bece7ecb
...
...
@@ -78,6 +78,15 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
goto
_over
;
}
float
procMemory
=
0
;
if
(
taosGetProcMemory
(
&
procMemory
))
{
if
(
tsQueryRssThreshold
>
0
&&
procMemory
>=
tsQueryRssThreshold
)
{
qError
(
"Exceeds query memory RSS threshold. RSS: %f, threshold: %d"
,
procMemory
,
tsQueryRssThreshold
);
code
=
TSDB_CODE_QRY_RSS_THRESHOLD
;
goto
_over
;
}
}
if
(
pQueryMsg
->
numOfTables
<=
0
)
{
qError
(
"Invalid number of tables to query, numOfTables:%d"
,
pQueryMsg
->
numOfTables
);
code
=
TSDB_CODE_QRY_INVALID_MSG
;
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
bece7ecb
...
...
@@ -627,7 +627,7 @@ static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle) {
if
(
!
pQueryHandle
->
pTableCheckInfo
)
{
tsdbError
(
"%p table check info is NULL"
,
pQueryHandle
);
terrno
=
TSDB_CODE_QRY_APP_ERROR
;
//
terrno = TSDB_CODE_QRY_APP_ERROR;
return
-
1
;
}
...
...
src/util/inc/tconfig.h
浏览文件 @
bece7ecb
...
...
@@ -20,7 +20,7 @@
extern
"C"
{
#endif
#define TSDB_CFG_MAX_NUM 14
1
#define TSDB_CFG_MAX_NUM 14
2
#define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41
...
...
src/util/src/terror.c
浏览文件 @
bece7ecb
...
...
@@ -304,6 +304,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_INVALID_TIME_CONDITION
,
"One valid time range condition expected"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_SYS_ERROR
,
"System error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_RESULT_TOO_LARGE
,
"result num is too large"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_RSS_THRESHOLD
,
"Exceed Memory RSS threshold when creating query"
)
// grant
TAOS_DEFINE_ERROR
(
TSDB_CODE_GRANT_EXPIRED
,
"License expired"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录