diff --git a/tests/examples/C#/taosdemo/taosdemo.cs b/tests/examples/C#/taosdemo/taosdemo.cs index 0a3632a71868cffa769ef24857cd509334df1eeb..8e48fa2c8f2d238b7aebf8cba6bf455cb6b890be 100644 --- a/tests/examples/C#/taosdemo/taosdemo.cs +++ b/tests/examples/C#/taosdemo/taosdemo.cs @@ -18,6 +18,8 @@ using System.Text; using System.Collections.Generic; using System.Runtime.InteropServices; using System.Collections; +using System.Threading; +using System.Diagnostics; namespace TDengineDriver { @@ -31,28 +33,29 @@ namespace TDengineDriver private short port = 0; //sql parameters - private string dbName; - private string stableName; - private string tablePrefix; + private string dbName = "db"; + private string stableName = "st"; + private string tablePrefix = "t"; private bool isInsertOnly = false; private int queryMode = 1; - private long recordsPerTable = 1; + private long recordsPerTable = 10000; private int recordsPerRequest = 1; private int colsPerRecord = 3; - private long batchRows; - private long numOfTables; - private long beginTimestamp = 1551369600000L; + private long batchRows = 1000; + private long numOfTables = 10000; private IntPtr conn = IntPtr.Zero; - private long rowsInserted = 0; + // private long rowsInserted = 0; private bool useStable = false; private short methodOfDelete = 0; - private short numOfThreads = 1; + private long numOfThreads = 1; private long rateOfOutorder = 0; private bool order = true; private bool skipReadKey = false; + private bool verbose = false; + static void PrintHelp(String[] argv) { @@ -109,6 +112,8 @@ namespace TDengineDriver Console.Write("{0}{1}{2}\n", indent, indent, "rate, Out of order data's rate--if order=1 Default 10, min: 0, max: 50."); Console.Write("{0}{1}", indent, "-D"); Console.Write("{0}{1}{2}\n", indent, indent, "Delete data methods 0: don't delete, 1: delete by table, 2: delete by stable, 3: delete by database."); + Console.Write("{0}{1}", indent, "-v"); + Console.Write("{0}{1}{2}\n", indent, indent, "Print verbose output"); Console.Write("{0}{1}", indent, "-y"); Console.Write("{0}{1}{2}\n", indent, indent, "Skip read key for continous test, default is not skip"); @@ -125,12 +130,12 @@ namespace TDengineDriver password = this.GetArgumentAsString(argv, "-P", "taosdata"); dbName = this.GetArgumentAsString(argv, "-d", "db"); stableName = this.GetArgumentAsString(argv, "-s", "st"); - tablePrefix = this.GetArgumentAsString(argv, "-t", "t"); + tablePrefix = this.GetArgumentAsString(argv, "-m", "t"); isInsertOnly = this.GetArgumentAsFlag(argv, "-x"); queryMode = (int)this.GetArgumentAsLong(argv, "-q", 0, 1, 0); - numOfTables = this.GetArgumentAsLong(argv, "-t", 1, 10000, 10); - batchRows = this.GetArgumentAsLong(argv, "-r", 1, 10000, 1); - recordsPerTable = this.GetArgumentAsLong(argv, "-n", 1, 100000000000, 1); + numOfTables = this.GetArgumentAsLong(argv, "-t", 1, 1000000000, 10000); + batchRows = this.GetArgumentAsLong(argv, "-r", 1, 10000, 1000); + recordsPerTable = this.GetArgumentAsLong(argv, "-n", 1, 100000000000, 10000); recordsPerRequest = (int)this.GetArgumentAsLong(argv, "-r", 1, 10000, 1); colsPerRecord = (int)this.GetArgumentAsLong(argv, "-l", 1, 1024, 3); configDir = this.GetArgumentAsString(argv, "-c", "C:/TDengine/cfg"); @@ -142,6 +147,7 @@ namespace TDengineDriver rateOfOutorder = this.GetArgumentAsLong(argv, "-R", 0, 100, 0); skipReadKey = this.GetArgumentAsFlag(argv, "-y"); + verbose = this.GetArgumentAsFlag(argv, "-v"); Console.Write("###################################################################\n"); Console.Write("# Server IP: {0}\n", host); @@ -160,6 +166,7 @@ namespace TDengineDriver Console.Write("# Delete method: {0}\n", methodOfDelete); Console.Write("# Query Mode: {0}\n", queryMode); Console.Write("# Insert Only: {0}\n", isInsertOnly); + Console.Write("# Verbose output {0}\n", verbose); Console.Write("# Test time: {0}\n", DateTime.Now.ToString("h:mm:ss tt")); Console.Write("###################################################################\n"); @@ -179,7 +186,7 @@ namespace TDengineDriver if (argName == argv[i]) { return true; - } + } } return false; } @@ -246,18 +253,35 @@ namespace TDengineDriver System.Environment.Exit(0); } + private void DebugPrintFormat(string format, params object[] parameters) + { + if (verbose == true) + { + Console.Write(format, parameters); + } + } + + private void DebugPrint(string str) + { + if (verbose == true) + { + Console.Write(str); + } + } + public void InitTDengine() { TDengine.Options((int)TDengineInitOption.TDDB_OPTION_CONFIGDIR, this.configDir); TDengine.Options((int)TDengineInitOption.TDDB_OPTION_SHELL_ACTIVITY_TIMER, "60"); TDengine.Init(); - Console.WriteLine("TDengine Initialization finished"); + DebugPrint("TDengine Initialization finished\n"); } public void ConnectTDengine() { string db = ""; - Console.WriteLine("host:{0} user:{1}, pass:{2}; db:{3}, port:{4}", this.host, this.user, this.password, db, this.port); + DebugPrintFormat("host:{0} user:{1}, pass:{2}; db:{3}, port:{4}", + this.host, this.user, this.password, db, this.port); this.conn = TDengine.Connect(this.host, this.user, this.password, db, this.port); if (this.conn == IntPtr.Zero) { @@ -266,34 +290,84 @@ namespace TDengineDriver } else { - Console.WriteLine("Connect to TDengine success"); + DebugPrint("Connect to TDengine success\n"); } } public void CreateTablesByThreads() { - StringBuilder sql = new StringBuilder(); + Thread[] threadArr = new Thread[numOfThreads]; - sql.Clear(); - sql.Append("use ").Append(this.dbName); + long quotition = numOfTables / numOfThreads; + if (quotition < 1) + { + numOfThreads = numOfTables; + quotition = 1; + } + + long remainder = 0; + if (numOfThreads != 0) + { + remainder = numOfTables % numOfThreads; + } + + long last = 0; + + for (int i = 0; i < numOfThreads; i++) + { + CreateTableThread createTableThread = new CreateTableThread(); + createTableThread.id = i; + createTableThread.verbose = verbose; + createTableThread.dbName = this.dbName; + createTableThread.tablePrefix = this.tablePrefix; + if (useStable) + { + createTableThread.stableName = stableName; + } + createTableThread.conn = conn; + + createTableThread.start = last; + if (i < remainder) + { + createTableThread.end = last + quotition; + } + else + { + createTableThread.end = last + quotition - 1; + } + last = createTableThread.end + 1; + + threadArr[i] = new Thread(createTableThread.ThreadMain); + threadArr[i].Start(); + threadArr[i].Join(); + } + } + + public void dropDatabase() + { + StringBuilder sql = new StringBuilder(); + sql.Append("DROP DATABASE IF EXISTS ").Append(this.dbName); IntPtr res = TDengine.Query(this.conn, sql.ToString()); if (res != IntPtr.Zero) { - Console.WriteLine(sql.ToString() + " success"); + DebugPrint(sql.ToString() + " success\n"); } else { Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(res)); ExitProgram(); } - TDengine.FreeResult(res); - sql.Clear(); - sql.Append("create table if not exists ").Append(this.stableName).Append("(ts timestamp, v1 bool, v2 tinyint, v3 smallint, v4 int, v5 bigint, v6 float, v7 double, v8 binary(10), v9 nchar(10)) tags(t1 int)"); - res = TDengine.Query(this.conn, sql.ToString()); + } + + public void CreateDb() + { + StringBuilder sql = new StringBuilder(); + sql.Append("CREATE DATABASE IF NOT EXISTS ").Append(this.dbName); + IntPtr res = TDengine.Query(this.conn, sql.ToString()); if (res != IntPtr.Zero) { - Console.WriteLine(sql.ToString() + " success"); + DebugPrint(sql.ToString() + " success\n"); } else { @@ -301,36 +375,20 @@ namespace TDengineDriver ExitProgram(); } TDengine.FreeResult(res); - - for (int i = 0; i < this.numOfTables; i++) - { - sql.Clear(); - sql = sql.Append("create table if not exists ").Append(this.tablePrefix).Append(i) - .Append(" using ").Append(this.stableName).Append(" tags(").Append(i).Append(")"); - res = TDengine.Query(this.conn, sql.ToString()); - if (res != IntPtr.Zero) - { - Console.WriteLine(sql.ToString() + " success"); - } - else - { - Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(res)); - ExitProgram(); - } - TDengine.FreeResult(res); - } - - Console.WriteLine("create db and table success"); } - public void CreateDb() + public void CreateStable() { StringBuilder sql = new StringBuilder(); - sql.Append("create database if not exists ").Append(this.dbName); + + sql.Clear(); + sql.Append("CREATE TABLE IF NOT EXISTS "). + Append(this.dbName).Append(".").Append(this.stableName). + Append("(ts timestamp, v1 bool, v2 tinyint, v3 smallint, v4 int, v5 bigint, v6 float, v7 double, v8 binary(10), v9 nchar(10)) tags(t1 int)"); IntPtr res = TDengine.Query(this.conn, sql.ToString()); if (res != IntPtr.Zero) { - Console.WriteLine(sql.ToString() + " success"); + DebugPrint(sql.ToString() + " success\n"); } else { @@ -340,56 +398,67 @@ namespace TDengineDriver TDengine.FreeResult(res); } - - public void ExecuteInsertByThreads() + public void InsertByThreads() { - System.DateTime start = new System.DateTime(); - long loopCount = this.recordsPerTable / this.batchRows; + Thread[] threadArr = new Thread[numOfThreads]; - for (int table = 0; table < this.numOfTables; ++table) + long quotition = numOfTables / numOfThreads; + if (quotition < 1) { - for (long loop = 0; loop < loopCount; loop++) - { - StringBuilder sql = new StringBuilder(); - sql.Append("insert into ").Append(this.tablePrefix).Append(table).Append(" values"); - for (int batch = 0; batch < this.batchRows; ++batch) - { - long rows = loop * this.batchRows + batch; - sql.Append("(") - .Append(this.beginTimestamp + rows) - .Append(", 1, 2, 3,") - .Append(rows) - .Append(", 5, 6, 7, 'abc', 'def')"); - } - IntPtr res = TDengine.Query(this.conn, sql.ToString()); - if (res == IntPtr.Zero) - { - Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(res)); - } + numOfThreads = numOfTables; + quotition = 1; + } - int affectRows = TDengine.AffectRows(res); - this.rowsInserted += affectRows; + long remainder = 0; + if (numOfThreads != 0) + { + remainder = numOfTables % numOfThreads; + } - TDengine.FreeResult(res); + long last = 0; + + for (int i = 0; i < numOfThreads; i++) + { + InsertDataThread insertThread = new InsertDataThread(); + insertThread.id = i; + insertThread.recordsPerTable = recordsPerTable; + insertThread.batchRows = batchRows; + insertThread.numOfTables = numOfTables; + insertThread.verbose = verbose; + insertThread.dbName = this.dbName; + insertThread.tablePrefix = this.tablePrefix; + if (useStable) + { + insertThread.stableName = stableName; } - } + insertThread.conn = conn; - System.DateTime end = new System.DateTime(); - TimeSpan ts = end - start; + insertThread.start = last; + if (i < remainder) + { + insertThread.end = last + quotition; + } + else + { + insertThread.end = last + quotition - 1; + } + last = insertThread.end + 1; - Console.Write("Total {0:G} rows inserted, {1:G} rows failed, time spend {2:G} seconds.\n" - , this.rowsInserted, this.recordsPerTable * this.numOfTables - this.rowsInserted, ts.TotalSeconds); + threadArr[i] = new Thread(insertThread.ThreadMain); + threadArr[i].Start(); + threadArr[i].Join(); + } } public void ExecuteQuery() { - System.DateTime start = new System.DateTime(); + // System.DateTime start = new System.DateTime(); long queryRows = 0; for (int i = 0; i < 1/*this.numOfTables*/; ++i) { String sql = "select * from " + this.dbName + "." + tablePrefix + i; - Console.WriteLine(sql); + // Console.WriteLine(sql); IntPtr res = TDengine.Query(conn, sql); if (res == IntPtr.Zero) @@ -399,13 +468,13 @@ namespace TDengineDriver } int fieldCount = TDengine.FieldCount(res); - Console.WriteLine("field count: " + fieldCount); + // Console.WriteLine("field count: " + fieldCount); List metas = TDengine.FetchFields(res); for (int j = 0; j < metas.Count; j++) { TDengineMeta meta = (TDengineMeta)metas[j]; - Console.WriteLine("index:" + j + ", type:" + meta.type + ", typename:" + meta.TypeName() + ", name:" + meta.name + ", size:" + meta.size); + // Console.WriteLine("index:" + j + ", type:" + meta.type + ", typename:" + meta.TypeName() + ", name:" + meta.name + ", size:" + meta.size); } IntPtr rowdata; @@ -482,17 +551,19 @@ namespace TDengineDriver if (TDengine.ErrorNo(res) != 0) { - Console.Write("Query is not complete, Error {0:G}", TDengine.ErrorNo(res), TDengine.Error(res)); + Console.Write("Query is not complete, Error {0:G}", + TDengine.ErrorNo(res), TDengine.Error(res)); } TDengine.FreeResult(res); } + /* + System.DateTime end = new System.DateTime(); + TimeSpan ts = end - start; - System.DateTime end = new System.DateTime(); - TimeSpan ts = end - start; - - Console.Write("Total {0:G} rows inserted, {1:G} rows query, time spend {2:G} seconds.\n" - , this.rowsInserted, queryRows, ts.TotalSeconds); + Console.Write("Total {0:G} rows inserted, {1:G} rows query, time spend {2:G} seconds.\n" + , this.rowsInserted, queryRows, ts.TotalSeconds); + */ } public void CloseConnection() @@ -513,16 +584,167 @@ namespace TDengineDriver tester.InitTDengine(); tester.ConnectTDengine(); + tester.dropDatabase(); tester.CreateDb(); + if (tester.useStable == true) + { + tester.CreateStable(); + } + tester.CreateTablesByThreads(); - tester.ExecuteInsertByThreads(); + + Stopwatch watch = Stopwatch.StartNew(); + tester.InsertByThreads(); + watch.Stop(); + double elapsedMs = watch.Elapsed.TotalMilliseconds; + + Console.WriteLine("Spent {0} seconds to insert {1} records with {2} record(s) per request: {3} records/second", + elapsedMs / 1000, + tester.recordsPerTable * tester.numOfTables, + tester.batchRows, + (tester.recordsPerTable * tester.numOfTables * 1000) / elapsedMs); tester.ExecuteQuery(); tester.CloseConnection(); Console.WriteLine("End."); } + + public class InsertDataThread + { + public long id { set; get; } + public long start { set; get; } + public long end { set; get; } + public string dbName { set; get; } + public IntPtr conn { set; get; } + public string tablePrefix { set; get; } + public string stableName { set; get; } + public long recordsPerTable { set; get; } + public long batchRows { set; get; } + public long numOfTables { set; get; } + public bool verbose { set; get; } + + private void DebugPrintFormat(string format, params object[] parameters) + { + if (verbose == true) + { + Console.Write(format, parameters); + } + } + + private void DebugPrint(string str) + { + if (verbose == true) + { + Console.Write(str); + } + } + + public void ThreadMain() + { + DebugPrintFormat("InsertDataThread {0} from {1} to {2}", id, start, end); + StringBuilder sql = new StringBuilder(); + long beginTimestamp = 1551369600000L; + long rowsInserted = 0; + + // System.DateTime startTime = new System.DateTime(); + long i = 0; + while (i < recordsPerTable) + { + for (long table = start; table <= end; ++table) + { + long inserted = i; + + sql.Clear(); + sql.Append("INSERT INTO "). + Append(this.dbName).Append(".").Append(this.tablePrefix).Append(table). + Append(" VALUES"); + for (int batch = 0; batch < this.batchRows; ++batch) + { + sql.Append("(") + .Append(beginTimestamp + i + batch) + .Append(", 1, 2, 3,") + .Append(i + batch) + .Append(", 5, 6, 7, 'abc', 'def')"); + + } + IntPtr res = TDengine.Query(this.conn, sql.ToString()); + if (res == IntPtr.Zero) + { + DebugPrint(sql.ToString() + " failure, reason: " + TDengine.Error(res) + "\n"); + } + + inserted += this.batchRows; + + int affectRows = TDengine.AffectRows(res); + rowsInserted += affectRows; + + TDengine.FreeResult(res); + if (table == end) + { + i = inserted; + } + } + } + + } + } + + public class CreateTableThread + { + public long id { set; get; } + public long start { set; get; } + public long end { set; get; } + public string dbName { set; get; } + public IntPtr conn { set; get; } + public string tablePrefix { set; get; } + public string stableName { set; get; } + public bool verbose { set; get; } + + private void DebugPrintFormat(string format, params object[] parameters) + { + if (verbose == true) + { + Console.Write(format, parameters); + } + } + + private void DebugPrint(string str) + { + if (verbose == true) + { + Console.Write(str); + } + } + + public void ThreadMain() + { + DebugPrintFormat("CreateTable {0} from {1} to {2}", id, start, end); + + StringBuilder sql = new StringBuilder(); + + for (long tableId = start; tableId <= end; tableId++) + { + sql.Clear(); + sql = sql.Append("CREATE TABLE IF NOT EXISTS "). + Append(this.dbName).Append(".").Append(this.tablePrefix).Append(tableId). + Append(" USING ").Append(this.dbName).Append(".").Append(this.stableName). + Append(" TAGS(").Append(tableId).Append(")"); + IntPtr res = TDengine.Query(this.conn, sql.ToString()); + if (res != IntPtr.Zero) + { + DebugPrint(sql.ToString() + " success\n"); + } + else + { + DebugPrint(sql.ToString() + " failure, reason: " + TDengine.Error(res) + "\n"); + ExitProgram(); + } + TDengine.FreeResult(res); + } + + } + } } } -