Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
05a12b63
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
05a12b63
编写于
1月 03, 2021
作者:
sangshuduo
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-2598] feature: C# taosdemo
support multithread create table and insertion.
上级
980f5862
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
314 addition
and
92 deletion
+314
-92
tests/examples/C#/taosdemo/taosdemo.cs
tests/examples/C#/taosdemo/taosdemo.cs
+314
-92
未找到文件。
tests/examples/C#/taosdemo/taosdemo.cs
浏览文件 @
05a12b63
...
@@ -18,6 +18,8 @@ using System.Text;
...
@@ -18,6 +18,8 @@ using System.Text;
using
System.Collections.Generic
;
using
System.Collections.Generic
;
using
System.Runtime.InteropServices
;
using
System.Runtime.InteropServices
;
using
System.Collections
;
using
System.Collections
;
using
System.Threading
;
using
System.Diagnostics
;
namespace
TDengineDriver
namespace
TDengineDriver
{
{
...
@@ -31,28 +33,29 @@ namespace TDengineDriver
...
@@ -31,28 +33,29 @@ namespace TDengineDriver
private
short
port
=
0
;
private
short
port
=
0
;
//sql parameters
//sql parameters
private
string
dbName
;
private
string
dbName
=
"db"
;
private
string
stableName
;
private
string
stableName
=
"st"
;
private
string
tablePrefix
;
private
string
tablePrefix
=
"t"
;
private
bool
isInsertOnly
=
false
;
private
bool
isInsertOnly
=
false
;
private
int
queryMode
=
1
;
private
int
queryMode
=
1
;
private
long
recordsPerTable
=
1
;
private
long
recordsPerTable
=
1
0000
;
private
int
recordsPerRequest
=
1
;
private
int
recordsPerRequest
=
1
;
private
int
colsPerRecord
=
3
;
private
int
colsPerRecord
=
3
;
private
long
batchRows
;
private
long
batchRows
=
1000
;
private
long
numOfTables
;
private
long
numOfTables
=
10000
;
private
long
beginTimestamp
=
1551369600000L
;
private
IntPtr
conn
=
IntPtr
.
Zero
;
private
IntPtr
conn
=
IntPtr
.
Zero
;
private
long
rowsInserted
=
0
;
//
private long rowsInserted = 0;
private
bool
useStable
=
false
;
private
bool
useStable
=
false
;
private
short
methodOfDelete
=
0
;
private
short
methodOfDelete
=
0
;
private
short
numOfThreads
=
1
;
private
long
numOfThreads
=
1
;
private
long
rateOfOutorder
=
0
;
private
long
rateOfOutorder
=
0
;
private
bool
order
=
true
;
private
bool
order
=
true
;
private
bool
skipReadKey
=
false
;
private
bool
skipReadKey
=
false
;
private
bool
verbose
=
false
;
static
void
PrintHelp
(
String
[]
argv
)
static
void
PrintHelp
(
String
[]
argv
)
{
{
...
@@ -109,6 +112,8 @@ namespace TDengineDriver
...
@@ -109,6 +112,8 @@ namespace TDengineDriver
Console
.
Write
(
"{0}{1}{2}\n"
,
indent
,
indent
,
"rate, Out of order data's rate--if order=1 Default 10, min: 0, max: 50."
);
Console
.
Write
(
"{0}{1}{2}\n"
,
indent
,
indent
,
"rate, Out of order data's rate--if order=1 Default 10, min: 0, max: 50."
);
Console
.
Write
(
"{0}{1}"
,
indent
,
"-D"
);
Console
.
Write
(
"{0}{1}"
,
indent
,
"-D"
);
Console
.
Write
(
"{0}{1}{2}\n"
,
indent
,
indent
,
"Delete data methods 0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database."
);
Console
.
Write
(
"{0}{1}{2}\n"
,
indent
,
indent
,
"Delete data methods 0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database."
);
Console
.
Write
(
"{0}{1}"
,
indent
,
"-v"
);
Console
.
Write
(
"{0}{1}{2}\n"
,
indent
,
indent
,
"Print verbose output"
);
Console
.
Write
(
"{0}{1}"
,
indent
,
"-y"
);
Console
.
Write
(
"{0}{1}"
,
indent
,
"-y"
);
Console
.
Write
(
"{0}{1}{2}\n"
,
indent
,
indent
,
"Skip read key for continous test, default is not skip"
);
Console
.
Write
(
"{0}{1}{2}\n"
,
indent
,
indent
,
"Skip read key for continous test, default is not skip"
);
...
@@ -125,12 +130,12 @@ namespace TDengineDriver
...
@@ -125,12 +130,12 @@ namespace TDengineDriver
password
=
this
.
GetArgumentAsString
(
argv
,
"-P"
,
"taosdata"
);
password
=
this
.
GetArgumentAsString
(
argv
,
"-P"
,
"taosdata"
);
dbName
=
this
.
GetArgumentAsString
(
argv
,
"-d"
,
"db"
);
dbName
=
this
.
GetArgumentAsString
(
argv
,
"-d"
,
"db"
);
stableName
=
this
.
GetArgumentAsString
(
argv
,
"-s"
,
"st"
);
stableName
=
this
.
GetArgumentAsString
(
argv
,
"-s"
,
"st"
);
tablePrefix
=
this
.
GetArgumentAsString
(
argv
,
"-
t
"
,
"t"
);
tablePrefix
=
this
.
GetArgumentAsString
(
argv
,
"-
m
"
,
"t"
);
isInsertOnly
=
this
.
GetArgumentAsFlag
(
argv
,
"-x"
);
isInsertOnly
=
this
.
GetArgumentAsFlag
(
argv
,
"-x"
);
queryMode
=
(
int
)
this
.
GetArgumentAsLong
(
argv
,
"-q"
,
0
,
1
,
0
);
queryMode
=
(
int
)
this
.
GetArgumentAsLong
(
argv
,
"-q"
,
0
,
1
,
0
);
numOfTables
=
this
.
GetArgumentAsLong
(
argv
,
"-t"
,
1
,
10000
,
1
0
);
numOfTables
=
this
.
GetArgumentAsLong
(
argv
,
"-t"
,
1
,
10000
00000
,
1000
0
);
batchRows
=
this
.
GetArgumentAsLong
(
argv
,
"-r"
,
1
,
10000
,
1
);
batchRows
=
this
.
GetArgumentAsLong
(
argv
,
"-r"
,
1
,
10000
,
1
000
);
recordsPerTable
=
this
.
GetArgumentAsLong
(
argv
,
"-n"
,
1
,
100000000000
,
1
);
recordsPerTable
=
this
.
GetArgumentAsLong
(
argv
,
"-n"
,
1
,
100000000000
,
1
0000
);
recordsPerRequest
=
(
int
)
this
.
GetArgumentAsLong
(
argv
,
"-r"
,
1
,
10000
,
1
);
recordsPerRequest
=
(
int
)
this
.
GetArgumentAsLong
(
argv
,
"-r"
,
1
,
10000
,
1
);
colsPerRecord
=
(
int
)
this
.
GetArgumentAsLong
(
argv
,
"-l"
,
1
,
1024
,
3
);
colsPerRecord
=
(
int
)
this
.
GetArgumentAsLong
(
argv
,
"-l"
,
1
,
1024
,
3
);
configDir
=
this
.
GetArgumentAsString
(
argv
,
"-c"
,
"C:/TDengine/cfg"
);
configDir
=
this
.
GetArgumentAsString
(
argv
,
"-c"
,
"C:/TDengine/cfg"
);
...
@@ -142,6 +147,7 @@ namespace TDengineDriver
...
@@ -142,6 +147,7 @@ namespace TDengineDriver
rateOfOutorder
=
this
.
GetArgumentAsLong
(
argv
,
"-R"
,
0
,
100
,
0
);
rateOfOutorder
=
this
.
GetArgumentAsLong
(
argv
,
"-R"
,
0
,
100
,
0
);
skipReadKey
=
this
.
GetArgumentAsFlag
(
argv
,
"-y"
);
skipReadKey
=
this
.
GetArgumentAsFlag
(
argv
,
"-y"
);
verbose
=
this
.
GetArgumentAsFlag
(
argv
,
"-v"
);
Console
.
Write
(
"###################################################################\n"
);
Console
.
Write
(
"###################################################################\n"
);
Console
.
Write
(
"# Server IP: {0}\n"
,
host
);
Console
.
Write
(
"# Server IP: {0}\n"
,
host
);
...
@@ -160,6 +166,7 @@ namespace TDengineDriver
...
@@ -160,6 +166,7 @@ namespace TDengineDriver
Console
.
Write
(
"# Delete method: {0}\n"
,
methodOfDelete
);
Console
.
Write
(
"# Delete method: {0}\n"
,
methodOfDelete
);
Console
.
Write
(
"# Query Mode: {0}\n"
,
queryMode
);
Console
.
Write
(
"# Query Mode: {0}\n"
,
queryMode
);
Console
.
Write
(
"# Insert Only: {0}\n"
,
isInsertOnly
);
Console
.
Write
(
"# Insert Only: {0}\n"
,
isInsertOnly
);
Console
.
Write
(
"# Verbose output {0}\n"
,
verbose
);
Console
.
Write
(
"# Test time: {0}\n"
,
DateTime
.
Now
.
ToString
(
"h:mm:ss tt"
));
Console
.
Write
(
"# Test time: {0}\n"
,
DateTime
.
Now
.
ToString
(
"h:mm:ss tt"
));
Console
.
Write
(
"###################################################################\n"
);
Console
.
Write
(
"###################################################################\n"
);
...
@@ -179,7 +186,7 @@ namespace TDengineDriver
...
@@ -179,7 +186,7 @@ namespace TDengineDriver
if
(
argName
==
argv
[
i
])
if
(
argName
==
argv
[
i
])
{
{
return
true
;
return
true
;
}
}
}
}
return
false
;
return
false
;
}
}
...
@@ -246,18 +253,35 @@ namespace TDengineDriver
...
@@ -246,18 +253,35 @@ namespace TDengineDriver
System
.
Environment
.
Exit
(
0
);
System
.
Environment
.
Exit
(
0
);
}
}
private
void
DebugPrintFormat
(
string
format
,
params
object
[]
parameters
)
{
if
(
verbose
==
true
)
{
Console
.
Write
(
format
,
parameters
);
}
}
private
void
DebugPrint
(
string
str
)
{
if
(
verbose
==
true
)
{
Console
.
Write
(
str
);
}
}
public
void
InitTDengine
()
public
void
InitTDengine
()
{
{
TDengine
.
Options
((
int
)
TDengineInitOption
.
TDDB_OPTION_CONFIGDIR
,
this
.
configDir
);
TDengine
.
Options
((
int
)
TDengineInitOption
.
TDDB_OPTION_CONFIGDIR
,
this
.
configDir
);
TDengine
.
Options
((
int
)
TDengineInitOption
.
TDDB_OPTION_SHELL_ACTIVITY_TIMER
,
"60"
);
TDengine
.
Options
((
int
)
TDengineInitOption
.
TDDB_OPTION_SHELL_ACTIVITY_TIMER
,
"60"
);
TDengine
.
Init
();
TDengine
.
Init
();
Console
.
WriteLine
(
"TDengine Initialization finished
"
);
DebugPrint
(
"TDengine Initialization finished\n
"
);
}
}
public
void
ConnectTDengine
()
public
void
ConnectTDengine
()
{
{
string
db
=
""
;
string
db
=
""
;
Console
.
WriteLine
(
"host:{0} user:{1}, pass:{2}; db:{3}, port:{4}"
,
this
.
host
,
this
.
user
,
this
.
password
,
db
,
this
.
port
);
DebugPrintFormat
(
"host:{0} user:{1}, pass:{2}; db:{3}, port:{4}"
,
this
.
host
,
this
.
user
,
this
.
password
,
db
,
this
.
port
);
this
.
conn
=
TDengine
.
Connect
(
this
.
host
,
this
.
user
,
this
.
password
,
db
,
this
.
port
);
this
.
conn
=
TDengine
.
Connect
(
this
.
host
,
this
.
user
,
this
.
password
,
db
,
this
.
port
);
if
(
this
.
conn
==
IntPtr
.
Zero
)
if
(
this
.
conn
==
IntPtr
.
Zero
)
{
{
...
@@ -266,34 +290,84 @@ namespace TDengineDriver
...
@@ -266,34 +290,84 @@ namespace TDengineDriver
}
}
else
else
{
{
Console
.
WriteLine
(
"Connect to TDengine success
"
);
DebugPrint
(
"Connect to TDengine success\n
"
);
}
}
}
}
public
void
CreateTablesByThreads
()
public
void
CreateTablesByThreads
()
{
{
StringBuilder
sql
=
new
StringBuilder
()
;
Thread
[]
threadArr
=
new
Thread
[
numOfThreads
]
;
sql
.
Clear
();
long
quotition
=
numOfTables
/
numOfThreads
;
sql
.
Append
(
"use "
).
Append
(
this
.
dbName
);
if
(
quotition
<
1
)
{
numOfThreads
=
numOfTables
;
quotition
=
1
;
}
long
remainder
=
0
;
if
(
numOfThreads
!=
0
)
{
remainder
=
numOfTables
%
numOfThreads
;
}
long
last
=
0
;
for
(
int
i
=
0
;
i
<
numOfThreads
;
i
++)
{
CreateTableThread
createTableThread
=
new
CreateTableThread
();
createTableThread
.
id
=
i
;
createTableThread
.
verbose
=
verbose
;
createTableThread
.
dbName
=
this
.
dbName
;
createTableThread
.
tablePrefix
=
this
.
tablePrefix
;
if
(
useStable
)
{
createTableThread
.
stableName
=
stableName
;
}
createTableThread
.
conn
=
conn
;
createTableThread
.
start
=
last
;
if
(
i
<
remainder
)
{
createTableThread
.
end
=
last
+
quotition
;
}
else
{
createTableThread
.
end
=
last
+
quotition
-
1
;
}
last
=
createTableThread
.
end
+
1
;
threadArr
[
i
]
=
new
Thread
(
createTableThread
.
ThreadMain
);
threadArr
[
i
].
Start
();
threadArr
[
i
].
Join
();
}
}
public
void
dropDatabase
()
{
StringBuilder
sql
=
new
StringBuilder
();
sql
.
Append
(
"DROP DATABASE IF EXISTS "
).
Append
(
this
.
dbName
);
IntPtr
res
=
TDengine
.
Query
(
this
.
conn
,
sql
.
ToString
());
IntPtr
res
=
TDengine
.
Query
(
this
.
conn
,
sql
.
ToString
());
if
(
res
!=
IntPtr
.
Zero
)
if
(
res
!=
IntPtr
.
Zero
)
{
{
Console
.
WriteLine
(
sql
.
ToString
()
+
" success
"
);
DebugPrint
(
sql
.
ToString
()
+
" success\n
"
);
}
}
else
else
{
{
Console
.
WriteLine
(
sql
.
ToString
()
+
" failure, reason: "
+
TDengine
.
Error
(
res
));
Console
.
WriteLine
(
sql
.
ToString
()
+
" failure, reason: "
+
TDengine
.
Error
(
res
));
ExitProgram
();
ExitProgram
();
}
}
TDengine
.
FreeResult
(
res
);
sql
.
Clear
();
}
sql
.
Append
(
"create table if not exists "
).
Append
(
this
.
stableName
).
Append
(
"(ts timestamp, v1 bool, v2 tinyint, v3 smallint, v4 int, v5 bigint, v6 float, v7 double, v8 binary(10), v9 nchar(10)) tags(t1 int)"
);
res
=
TDengine
.
Query
(
this
.
conn
,
sql
.
ToString
());
public
void
CreateDb
()
{
StringBuilder
sql
=
new
StringBuilder
();
sql
.
Append
(
"CREATE DATABASE IF NOT EXISTS "
).
Append
(
this
.
dbName
);
IntPtr
res
=
TDengine
.
Query
(
this
.
conn
,
sql
.
ToString
());
if
(
res
!=
IntPtr
.
Zero
)
if
(
res
!=
IntPtr
.
Zero
)
{
{
Console
.
WriteLine
(
sql
.
ToString
()
+
" success
"
);
DebugPrint
(
sql
.
ToString
()
+
" success\n
"
);
}
}
else
else
{
{
...
@@ -301,36 +375,20 @@ namespace TDengineDriver
...
@@ -301,36 +375,20 @@ namespace TDengineDriver
ExitProgram
();
ExitProgram
();
}
}
TDengine
.
FreeResult
(
res
);
TDengine
.
FreeResult
(
res
);
for
(
int
i
=
0
;
i
<
this
.
numOfTables
;
i
++)
{
sql
.
Clear
();
sql
=
sql
.
Append
(
"create table if not exists "
).
Append
(
this
.
tablePrefix
).
Append
(
i
)
.
Append
(
" using "
).
Append
(
this
.
stableName
).
Append
(
" tags("
).
Append
(
i
).
Append
(
")"
);
res
=
TDengine
.
Query
(
this
.
conn
,
sql
.
ToString
());
if
(
res
!=
IntPtr
.
Zero
)
{
Console
.
WriteLine
(
sql
.
ToString
()
+
" success"
);
}
else
{
Console
.
WriteLine
(
sql
.
ToString
()
+
" failure, reason: "
+
TDengine
.
Error
(
res
));
ExitProgram
();
}
TDengine
.
FreeResult
(
res
);
}
Console
.
WriteLine
(
"create db and table success"
);
}
}
public
void
Create
Db
()
public
void
Create
Stable
()
{
{
StringBuilder
sql
=
new
StringBuilder
();
StringBuilder
sql
=
new
StringBuilder
();
sql
.
Append
(
"create database if not exists "
).
Append
(
this
.
dbName
);
sql
.
Clear
();
sql
.
Append
(
"CREATE TABLE IF NOT EXISTS "
).
Append
(
this
.
dbName
).
Append
(
"."
).
Append
(
this
.
stableName
).
Append
(
"(ts timestamp, v1 bool, v2 tinyint, v3 smallint, v4 int, v5 bigint, v6 float, v7 double, v8 binary(10), v9 nchar(10)) tags(t1 int)"
);
IntPtr
res
=
TDengine
.
Query
(
this
.
conn
,
sql
.
ToString
());
IntPtr
res
=
TDengine
.
Query
(
this
.
conn
,
sql
.
ToString
());
if
(
res
!=
IntPtr
.
Zero
)
if
(
res
!=
IntPtr
.
Zero
)
{
{
Console
.
WriteLine
(
sql
.
ToString
()
+
" success
"
);
DebugPrint
(
sql
.
ToString
()
+
" success\n
"
);
}
}
else
else
{
{
...
@@ -340,56 +398,67 @@ namespace TDengineDriver
...
@@ -340,56 +398,67 @@ namespace TDengineDriver
TDengine
.
FreeResult
(
res
);
TDengine
.
FreeResult
(
res
);
}
}
public
void
InsertByThreads
()
public
void
ExecuteInsertByThreads
()
{
{
System
.
DateTime
start
=
new
System
.
DateTime
();
Thread
[]
threadArr
=
new
Thread
[
numOfThreads
];
long
loopCount
=
this
.
recordsPerTable
/
this
.
batchRows
;
for
(
int
table
=
0
;
table
<
this
.
numOfTables
;
++
table
)
long
quotition
=
numOfTables
/
numOfThreads
;
if
(
quotition
<
1
)
{
{
for
(
long
loop
=
0
;
loop
<
loopCount
;
loop
++)
numOfThreads
=
numOfTables
;
{
quotition
=
1
;
StringBuilder
sql
=
new
StringBuilder
();
}
sql
.
Append
(
"insert into "
).
Append
(
this
.
tablePrefix
).
Append
(
table
).
Append
(
" values"
);
for
(
int
batch
=
0
;
batch
<
this
.
batchRows
;
++
batch
)
{
long
rows
=
loop
*
this
.
batchRows
+
batch
;
sql
.
Append
(
"("
)
.
Append
(
this
.
beginTimestamp
+
rows
)
.
Append
(
", 1, 2, 3,"
)
.
Append
(
rows
)
.
Append
(
", 5, 6, 7, 'abc', 'def')"
);
}
IntPtr
res
=
TDengine
.
Query
(
this
.
conn
,
sql
.
ToString
());
if
(
res
==
IntPtr
.
Zero
)
{
Console
.
WriteLine
(
sql
.
ToString
()
+
" failure, reason: "
+
TDengine
.
Error
(
res
));
}
int
affectRows
=
TDengine
.
AffectRows
(
res
);
long
remainder
=
0
;
this
.
rowsInserted
+=
affectRows
;
if
(
numOfThreads
!=
0
)
{
remainder
=
numOfTables
%
numOfThreads
;
}
TDengine
.
FreeResult
(
res
);
long
last
=
0
;
for
(
int
i
=
0
;
i
<
numOfThreads
;
i
++)
{
InsertDataThread
insertThread
=
new
InsertDataThread
();
insertThread
.
id
=
i
;
insertThread
.
recordsPerTable
=
recordsPerTable
;
insertThread
.
batchRows
=
batchRows
;
insertThread
.
numOfTables
=
numOfTables
;
insertThread
.
verbose
=
verbose
;
insertThread
.
dbName
=
this
.
dbName
;
insertThread
.
tablePrefix
=
this
.
tablePrefix
;
if
(
useStable
)
{
insertThread
.
stableName
=
stableName
;
}
}
}
insertThread
.
conn
=
conn
;
System
.
DateTime
end
=
new
System
.
DateTime
();
insertThread
.
start
=
last
;
TimeSpan
ts
=
end
-
start
;
if
(
i
<
remainder
)
{
insertThread
.
end
=
last
+
quotition
;
}
else
{
insertThread
.
end
=
last
+
quotition
-
1
;
}
last
=
insertThread
.
end
+
1
;
Console
.
Write
(
"Total {0:G} rows inserted, {1:G} rows failed, time spend {2:G} seconds.\n"
threadArr
[
i
]
=
new
Thread
(
insertThread
.
ThreadMain
);
,
this
.
rowsInserted
,
this
.
recordsPerTable
*
this
.
numOfTables
-
this
.
rowsInserted
,
ts
.
TotalSeconds
);
threadArr
[
i
].
Start
();
threadArr
[
i
].
Join
();
}
}
}
public
void
ExecuteQuery
()
public
void
ExecuteQuery
()
{
{
System
.
DateTime
start
=
new
System
.
DateTime
();
//
System.DateTime start = new System.DateTime();
long
queryRows
=
0
;
long
queryRows
=
0
;
for
(
int
i
=
0
;
i
<
1
/*this.numOfTables*/
;
++
i
)
for
(
int
i
=
0
;
i
<
1
/*this.numOfTables*/
;
++
i
)
{
{
String
sql
=
"select * from "
+
this
.
dbName
+
"."
+
tablePrefix
+
i
;
String
sql
=
"select * from "
+
this
.
dbName
+
"."
+
tablePrefix
+
i
;
Console
.
WriteLine
(
sql
);
//
Console.WriteLine(sql);
IntPtr
res
=
TDengine
.
Query
(
conn
,
sql
);
IntPtr
res
=
TDengine
.
Query
(
conn
,
sql
);
if
(
res
==
IntPtr
.
Zero
)
if
(
res
==
IntPtr
.
Zero
)
...
@@ -399,13 +468,13 @@ namespace TDengineDriver
...
@@ -399,13 +468,13 @@ namespace TDengineDriver
}
}
int
fieldCount
=
TDengine
.
FieldCount
(
res
);
int
fieldCount
=
TDengine
.
FieldCount
(
res
);
Console
.
WriteLine
(
"field count: "
+
fieldCount
);
//
Console.WriteLine("field count: " + fieldCount);
List
<
TDengineMeta
>
metas
=
TDengine
.
FetchFields
(
res
);
List
<
TDengineMeta
>
metas
=
TDengine
.
FetchFields
(
res
);
for
(
int
j
=
0
;
j
<
metas
.
Count
;
j
++)
for
(
int
j
=
0
;
j
<
metas
.
Count
;
j
++)
{
{
TDengineMeta
meta
=
(
TDengineMeta
)
metas
[
j
];
TDengineMeta
meta
=
(
TDengineMeta
)
metas
[
j
];
Console
.
WriteLine
(
"index:"
+
j
+
", type:"
+
meta
.
type
+
", typename:"
+
meta
.
TypeName
()
+
", name:"
+
meta
.
name
+
", size:"
+
meta
.
size
);
//
Console.WriteLine("index:" + j + ", type:" + meta.type + ", typename:" + meta.TypeName() + ", name:" + meta.name + ", size:" + meta.size);
}
}
IntPtr
rowdata
;
IntPtr
rowdata
;
...
@@ -482,17 +551,19 @@ namespace TDengineDriver
...
@@ -482,17 +551,19 @@ namespace TDengineDriver
if
(
TDengine
.
ErrorNo
(
res
)
!=
0
)
if
(
TDengine
.
ErrorNo
(
res
)
!=
0
)
{
{
Console
.
Write
(
"Query is not complete, Error {0:G}"
,
TDengine
.
ErrorNo
(
res
),
TDengine
.
Error
(
res
));
Console
.
Write
(
"Query is not complete, Error {0:G}"
,
TDengine
.
ErrorNo
(
res
),
TDengine
.
Error
(
res
));
}
}
TDengine
.
FreeResult
(
res
);
TDengine
.
FreeResult
(
res
);
}
}
/*
System.DateTime end = new System.DateTime();
TimeSpan ts = end - start;
System
.
DateTime
end
=
new
System
.
DateTime
();
Console.Write("Total {0:G} rows inserted, {1:G} rows query, time spend {2:G} seconds.\n"
TimeSpan
ts
=
end
-
start
;
, this.rowsInserted, queryRows, ts.TotalSeconds);
*/
Console
.
Write
(
"Total {0:G} rows inserted, {1:G} rows query, time spend {2:G} seconds.\n"
,
this
.
rowsInserted
,
queryRows
,
ts
.
TotalSeconds
);
}
}
public
void
CloseConnection
()
public
void
CloseConnection
()
...
@@ -513,16 +584,167 @@ namespace TDengineDriver
...
@@ -513,16 +584,167 @@ namespace TDengineDriver
tester
.
InitTDengine
();
tester
.
InitTDengine
();
tester
.
ConnectTDengine
();
tester
.
ConnectTDengine
();
tester
.
dropDatabase
();
tester
.
CreateDb
();
tester
.
CreateDb
();
if
(
tester
.
useStable
==
true
)
{
tester
.
CreateStable
();
}
tester
.
CreateTablesByThreads
();
tester
.
CreateTablesByThreads
();
tester
.
ExecuteInsertByThreads
();
Stopwatch
watch
=
Stopwatch
.
StartNew
();
tester
.
InsertByThreads
();
watch
.
Stop
();
double
elapsedMs
=
watch
.
Elapsed
.
TotalMilliseconds
;
Console
.
WriteLine
(
"Spent {0} seconds to insert {1} records with {2} record(s) per request: {3} records/second"
,
elapsedMs
/
1000
,
tester
.
recordsPerTable
*
tester
.
numOfTables
,
tester
.
batchRows
,
(
tester
.
recordsPerTable
*
tester
.
numOfTables
*
1000
)
/
elapsedMs
);
tester
.
ExecuteQuery
();
tester
.
ExecuteQuery
();
tester
.
CloseConnection
();
tester
.
CloseConnection
();
Console
.
WriteLine
(
"End."
);
Console
.
WriteLine
(
"End."
);
}
}
public
class
InsertDataThread
{
public
long
id
{
set
;
get
;
}
public
long
start
{
set
;
get
;
}
public
long
end
{
set
;
get
;
}
public
string
dbName
{
set
;
get
;
}
public
IntPtr
conn
{
set
;
get
;
}
public
string
tablePrefix
{
set
;
get
;
}
public
string
stableName
{
set
;
get
;
}
public
long
recordsPerTable
{
set
;
get
;
}
public
long
batchRows
{
set
;
get
;
}
public
long
numOfTables
{
set
;
get
;
}
public
bool
verbose
{
set
;
get
;
}
private
void
DebugPrintFormat
(
string
format
,
params
object
[]
parameters
)
{
if
(
verbose
==
true
)
{
Console
.
Write
(
format
,
parameters
);
}
}
private
void
DebugPrint
(
string
str
)
{
if
(
verbose
==
true
)
{
Console
.
Write
(
str
);
}
}
public
void
ThreadMain
()
{
DebugPrintFormat
(
"InsertDataThread {0} from {1} to {2}"
,
id
,
start
,
end
);
StringBuilder
sql
=
new
StringBuilder
();
long
beginTimestamp
=
1551369600000L
;
long
rowsInserted
=
0
;
// System.DateTime startTime = new System.DateTime();
long
i
=
0
;
while
(
i
<
recordsPerTable
)
{
for
(
long
table
=
start
;
table
<=
end
;
++
table
)
{
long
inserted
=
i
;
sql
.
Clear
();
sql
.
Append
(
"INSERT INTO "
).
Append
(
this
.
dbName
).
Append
(
"."
).
Append
(
this
.
tablePrefix
).
Append
(
table
).
Append
(
" VALUES"
);
for
(
int
batch
=
0
;
batch
<
this
.
batchRows
;
++
batch
)
{
sql
.
Append
(
"("
)
.
Append
(
beginTimestamp
+
i
+
batch
)
.
Append
(
", 1, 2, 3,"
)
.
Append
(
i
+
batch
)
.
Append
(
", 5, 6, 7, 'abc', 'def')"
);
}
IntPtr
res
=
TDengine
.
Query
(
this
.
conn
,
sql
.
ToString
());
if
(
res
==
IntPtr
.
Zero
)
{
DebugPrint
(
sql
.
ToString
()
+
" failure, reason: "
+
TDengine
.
Error
(
res
)
+
"\n"
);
}
inserted
+=
this
.
batchRows
;
int
affectRows
=
TDengine
.
AffectRows
(
res
);
rowsInserted
+=
affectRows
;
TDengine
.
FreeResult
(
res
);
if
(
table
==
end
)
{
i
=
inserted
;
}
}
}
}
}
public
class
CreateTableThread
{
public
long
id
{
set
;
get
;
}
public
long
start
{
set
;
get
;
}
public
long
end
{
set
;
get
;
}
public
string
dbName
{
set
;
get
;
}
public
IntPtr
conn
{
set
;
get
;
}
public
string
tablePrefix
{
set
;
get
;
}
public
string
stableName
{
set
;
get
;
}
public
bool
verbose
{
set
;
get
;
}
private
void
DebugPrintFormat
(
string
format
,
params
object
[]
parameters
)
{
if
(
verbose
==
true
)
{
Console
.
Write
(
format
,
parameters
);
}
}
private
void
DebugPrint
(
string
str
)
{
if
(
verbose
==
true
)
{
Console
.
Write
(
str
);
}
}
public
void
ThreadMain
()
{
DebugPrintFormat
(
"CreateTable {0} from {1} to {2}"
,
id
,
start
,
end
);
StringBuilder
sql
=
new
StringBuilder
();
for
(
long
tableId
=
start
;
tableId
<=
end
;
tableId
++)
{
sql
.
Clear
();
sql
=
sql
.
Append
(
"CREATE TABLE IF NOT EXISTS "
).
Append
(
this
.
dbName
).
Append
(
"."
).
Append
(
this
.
tablePrefix
).
Append
(
tableId
).
Append
(
" USING "
).
Append
(
this
.
dbName
).
Append
(
"."
).
Append
(
this
.
stableName
).
Append
(
" TAGS("
).
Append
(
tableId
).
Append
(
")"
);
IntPtr
res
=
TDengine
.
Query
(
this
.
conn
,
sql
.
ToString
());
if
(
res
!=
IntPtr
.
Zero
)
{
DebugPrint
(
sql
.
ToString
()
+
" success\n"
);
}
else
{
DebugPrint
(
sql
.
ToString
()
+
" failure, reason: "
+
TDengine
.
Error
(
res
)
+
"\n"
);
ExitProgram
();
}
TDengine
.
FreeResult
(
res
);
}
}
}
}
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录