提交 b01f13bb 编写于 作者: Z zyyang

change

上级 e6154984
package com.taosdata.taosdemo; package com.taosdata.taosdemo;
import com.taosdata.taosdemo.components.DataSourceFactory; import com.taosdata.taosdemo.components.DataSourceFactory;
import com.taosdata.taosdemo.domain.FieldMeta; import com.taosdata.taosdemo.components.JdbcTaosdemoConfig;
import com.taosdata.taosdemo.domain.SuperTableMeta; import com.taosdata.taosdemo.domain.SuperTableMeta;
import com.taosdata.taosdemo.domain.TagMeta;
import com.taosdata.taosdemo.service.DatabaseService; import com.taosdata.taosdemo.service.DatabaseService;
import com.taosdata.taosdemo.service.InsertTask;
import com.taosdata.taosdemo.service.SubTableService; import com.taosdata.taosdemo.service.SubTableService;
import com.taosdata.taosdemo.service.SuperTableService; import com.taosdata.taosdemo.service.SuperTableService;
import com.taosdata.taosdemo.service.data.SuperTableMetaGenerator; import com.taosdata.taosdemo.service.data.SuperTableMetaGenerator;
import com.taosdata.taosdemo.components.JdbcTaosdemoConfig;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.*; import java.util.Arrays;
import java.util.concurrent.ExecutionException; import java.util.HashMap;
import java.util.concurrent.FutureTask; import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class TaosDemoApplication { public class TaosDemoApplication {
private static Logger logger = Logger.getLogger(TaosDemoApplication.class); private static Logger logger = Logger.getLogger(TaosDemoApplication.class);
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
// 读配置参数 // 读配置参数
JdbcTaosdemoConfig config = new JdbcTaosdemoConfig(args); JdbcTaosdemoConfig config = new JdbcTaosdemoConfig(args);
boolean isHelp = Arrays.asList(args).contains("--help"); boolean isHelp = Arrays.asList(args).contains("--help");
...@@ -34,12 +29,11 @@ public class TaosDemoApplication { ...@@ -34,12 +29,11 @@ public class TaosDemoApplication {
JdbcTaosdemoConfig.printHelp(); JdbcTaosdemoConfig.printHelp();
System.exit(0); System.exit(0);
} }
// 初始化
final DataSource dataSource = DataSourceFactory.getInstance(config.host, config.port, config.user, config.password); final DataSource dataSource = DataSourceFactory.getInstance(config.host, config.port, config.user, config.password);
final DatabaseService databaseService = new DatabaseService(dataSource); final DatabaseService databaseService = new DatabaseService(dataSource);
final SuperTableService superTableService = new SuperTableService(dataSource); final SuperTableService superTableService = new SuperTableService(dataSource);
final SubTableService subTableService = new SubTableService(dataSource); final SubTableService subTableService = new SubTableService(dataSource);
// 创建数据库 // 创建数据库
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Map<String, String> databaseParam = new HashMap<>(); Map<String, String> databaseParam = new HashMap<>();
...@@ -54,7 +48,7 @@ public class TaosDemoApplication { ...@@ -54,7 +48,7 @@ public class TaosDemoApplication {
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
logger.info(">>> create database time cost : " + (end - start) + " ms."); logger.info(">>> create database time cost : " + (end - start) + " ms.");
/**********************************************************************************/ /**********************************************************************************/
// 超级表的meta // 构造超级表的meta
SuperTableMeta superTableMeta; SuperTableMeta superTableMeta;
// create super table // create super table
if (config.superTableSQL != null) { if (config.superTableSQL != null) {
...@@ -63,19 +57,8 @@ public class TaosDemoApplication { ...@@ -63,19 +57,8 @@ public class TaosDemoApplication {
if (config.database != null && !config.database.isEmpty()) if (config.database != null && !config.database.isEmpty())
superTableMeta.setDatabase(config.database); superTableMeta.setDatabase(config.database);
} else if (config.numOfFields == 0) { } else if (config.numOfFields == 0) {
// default sql = "create table test.weather (ts timestamp, temperature float, humidity int) tags(location nchar(64), groupId int)"; String sql = "create table " + config.database + "." + config.superTable + " (ts timestamp, temperature float, humidity int) tags(location nchar(64), groupId int)";
superTableMeta = new SuperTableMeta(); superTableMeta = SuperTableMetaGenerator.generate(sql);
superTableMeta.setDatabase(config.database);
superTableMeta.setName(config.superTable);
List<FieldMeta> fields = new ArrayList<>();
fields.add(new FieldMeta("ts", "timestamp"));
fields.add(new FieldMeta("temperature", "float"));
fields.add(new FieldMeta("humidity", "int"));
superTableMeta.setFields(fields);
List<TagMeta> tags = new ArrayList<>();
tags.add(new TagMeta("location", "nchar(64)"));
tags.add(new TagMeta("groupId", "int"));
superTableMeta.setTags(tags);
} else { } else {
// create super table with specified field size and tag size // create super table with specified field size and tag size
superTableMeta = SuperTableMetaGenerator.generate(config.database, config.superTable, config.numOfFields, config.prefixOfFields, config.numOfTags, config.prefixOfTags); superTableMeta = SuperTableMetaGenerator.generate(config.database, config.superTable, config.numOfFields, config.prefixOfFields, config.numOfTags, config.prefixOfTags);
...@@ -104,43 +87,7 @@ public class TaosDemoApplication { ...@@ -104,43 +87,7 @@ public class TaosDemoApplication {
start = System.currentTimeMillis(); start = System.currentTimeMillis();
// multi threads to insert // multi threads to insert
List<FutureTask> taskList = new ArrayList<>(); int affectedRows = subTableService.insertAutoCreateTable(superTableMeta, threadSize, tableSize, startTime, gap, config);
List<Thread> threads = IntStream.range(0, threadSize)
.mapToObj(i -> {
long startInd = i * gap;
long endInd = (i + 1) * gap < tableSize ? (i + 1) * gap : tableSize;
FutureTask<Integer> task = new FutureTask<>(
new InsertTask(superTableMeta,
startInd, endInd,
startTime, config.timeGap,
config.numOfRowsPerTable, config.numOfTablesPerSQL, config.numOfValuesPerSQL,
config.order, config.rate, config.range,
config.prefixOfTable, config.autoCreateTable, dataSource)
);
taskList.add(task);
return new Thread(task, "InsertThread-" + i);
}).collect(Collectors.toList());
threads.stream().forEach(Thread::start);
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int affectedRows = 0;
for (FutureTask<Integer> task : taskList) {
try {
affectedRows += task.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
end = System.currentTimeMillis(); end = System.currentTimeMillis();
logger.info("insert " + affectedRows + " rows, time cost: " + (end - start) + " ms"); logger.info("insert " + affectedRows + " rows, time cost: " + (end - start) + " ms");
/**********************************************************************************/ /**********************************************************************************/
......
package com.taosdata.taosdemo.service;
import com.taosdata.taosdemo.domain.SubTableValue;
import com.taosdata.taosdemo.domain.SuperTableMeta;
import com.taosdata.taosdemo.service.data.SubTableValueGenerator;
import javax.sql.DataSource;
import java.sql.Connection;
import java.util.List;
import java.util.concurrent.Callable;
public class InsertTask implements Callable<Integer> {
private final long startTableInd; // included
private final long endTableInd; // excluded
private final long startTime;
private final long timeGap;
private final long numOfRowsPerTable;
private long numOfTablesPerSQL;
private long numOfValuesPerSQL;
private final SuperTableMeta superTableMeta;
private final int order;
private final int rate;
private final long range;
private final String prefixOfTable;
private final boolean autoCreateTable;
private final DataSource dataSource;
public InsertTask(SuperTableMeta superTableMeta, long startTableInd, long endTableInd,
long startTime, long timeGap,
long numOfRowsPerTable, long numOfTablesPerSQL, long numOfValuesPerSQL,
int order, int rate, long range,
String prefixOfTable, boolean autoCreateTable, DataSource dataSource) {
this.superTableMeta = superTableMeta;
this.startTableInd = startTableInd;
this.endTableInd = endTableInd;
this.startTime = startTime;
this.timeGap = timeGap;
this.numOfRowsPerTable = numOfRowsPerTable;
this.numOfTablesPerSQL = numOfTablesPerSQL;
this.numOfValuesPerSQL = numOfValuesPerSQL;
this.order = order;
this.rate = rate;
this.range = range;
this.prefixOfTable = prefixOfTable;
this.autoCreateTable = autoCreateTable;
this.dataSource = dataSource;
}
@Override
public Integer call() throws Exception {
Connection connection = dataSource.getConnection();
long numOfTables = endTableInd - startTableInd;
if (numOfRowsPerTable < numOfValuesPerSQL)
numOfValuesPerSQL = (int) numOfRowsPerTable;
if (numOfTables < numOfTablesPerSQL)
numOfTablesPerSQL = (int) numOfTables;
// row
for (long rowCnt = 0; rowCnt < numOfRowsPerTable; ) {
long rowSize = numOfValuesPerSQL;
if (rowCnt + rowSize > numOfRowsPerTable) {
rowSize = numOfRowsPerTable - rowCnt;
}
//table
for (long tableCnt = startTableInd; tableCnt < endTableInd; ) {
long tableSize = numOfTablesPerSQL;
if (tableCnt + tableSize > endTableInd) {
tableSize = endTableInd - tableCnt;
}
long startTime = this.startTime + rowCnt * timeGap;
// System.out.println(Thread.currentThread().getName() + " >>> " + "rowCnt: " + rowCnt + ", rowSize: " + rowSize + ", " + "tableCnt: " + tableCnt + ",tableSize: " + tableSize + ", " + "startTime: " + startTime + ",timeGap: " + timeGap + "");
/***********************************************/
// 生成数据
List<SubTableValue> data = SubTableValueGenerator.generate(superTableMeta, prefixOfTable, tableCnt, tableSize, rowSize, startTime, timeGap);
// 乱序
if (order != 0) {
SubTableValueGenerator.disrupt(data, rate, range);
}
// insert
SubTableService subTableService = new SubTableService(dataSource);
if (autoCreateTable) {
subTableService.insertAutoCreateTable(data);
} else {
subTableService.insert(data);
}
/***********************************************/
tableCnt += tableSize;
}
rowCnt += rowSize;
}
connection.close();
return 1;
}
}
package com.taosdata.taosdemo.service; package com.taosdata.taosdemo.service;
import com.taosdata.taosdemo.components.JdbcTaosdemoConfig;
import com.taosdata.taosdemo.dao.SubTableMapper; import com.taosdata.taosdemo.dao.SubTableMapper;
import com.taosdata.taosdemo.dao.SubTableMapperImpl; import com.taosdata.taosdemo.dao.SubTableMapperImpl;
import com.taosdata.taosdemo.domain.SubTableMeta; import com.taosdata.taosdemo.domain.SubTableMeta;
import com.taosdata.taosdemo.domain.SubTableValue; import com.taosdata.taosdemo.domain.SubTableValue;
import com.taosdata.taosdemo.domain.SuperTableMeta; import com.taosdata.taosdemo.domain.SuperTableMeta;
import com.taosdata.taosdemo.service.data.SubTableMetaGenerator; import com.taosdata.taosdemo.service.data.SubTableMetaGenerator;
import com.taosdata.taosdemo.service.data.SubTableValueGenerator;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.*;
import java.util.concurrent.Executors; import java.util.stream.Collectors;
import java.util.concurrent.Future; import java.util.stream.IntStream;
public class SubTableService extends AbstractService { public class SubTableService extends AbstractService {
private SubTableMapper mapper; private SubTableMapper mapper;
private static final Logger logger = Logger.getLogger(SubTableService.class);
public SubTableService(DataSource datasource) { public SubTableService(DataSource datasource) {
this.mapper = new SubTableMapperImpl(datasource); this.mapper = new SubTableMapperImpl(datasource);
...@@ -72,4 +76,132 @@ public class SubTableService extends AbstractService { ...@@ -72,4 +76,132 @@ public class SubTableService extends AbstractService {
return mapper.insertMultiTableMultiValuesUsingSuperTable(subTableValues); return mapper.insertMultiTableMultiValuesUsingSuperTable(subTableValues);
} }
public int insertAutoCreateTable(SuperTableMeta superTableMeta, int threadSize, long tableSize, long startTime, long gap, JdbcTaosdemoConfig config) {
long start = System.currentTimeMillis();
List<FutureTask> taskList = new ArrayList<>();
List<Thread> threads = IntStream.range(0, threadSize)
.mapToObj(i -> {
long startInd = i * gap;
long endInd = (i + 1) * gap < tableSize ? (i + 1) * gap : tableSize;
FutureTask<Integer> task = new FutureTask<>(
new InsertTask(superTableMeta,
startInd, endInd,
startTime, config.timeGap,
config.numOfRowsPerTable, config.numOfTablesPerSQL, config.numOfValuesPerSQL,
config.order, config.rate, config.range,
config.prefixOfTable, config.autoCreateTable)
);
taskList.add(task);
return new Thread(task, "InsertThread-" + i);
}).collect(Collectors.toList());
threads.stream().forEach(Thread::start);
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int affectedRows = 0;
for (FutureTask<Integer> task : taskList) {
try {
affectedRows += task.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
logger.info("insert " + affectedRows + " rows, time cost: " + (end - start) + " ms");
return affectedRows;
}
private class InsertTask implements Callable<Integer> {
private final long startTableInd; // included
private final long endTableInd; // excluded
private final long startTime;
private final long timeGap;
private final long numOfRowsPerTable;
private long numOfTablesPerSQL;
private long numOfValuesPerSQL;
private final SuperTableMeta superTableMeta;
private final int order;
private final int rate;
private final long range;
private final String prefixOfTable;
private final boolean autoCreateTable;
public InsertTask(SuperTableMeta superTableMeta, long startTableInd, long endTableInd,
long startTime, long timeGap,
long numOfRowsPerTable, long numOfTablesPerSQL, long numOfValuesPerSQL,
int order, int rate, long range,
String prefixOfTable, boolean autoCreateTable) {
this.superTableMeta = superTableMeta;
this.startTableInd = startTableInd;
this.endTableInd = endTableInd;
this.startTime = startTime;
this.timeGap = timeGap;
this.numOfRowsPerTable = numOfRowsPerTable;
this.numOfTablesPerSQL = numOfTablesPerSQL;
this.numOfValuesPerSQL = numOfValuesPerSQL;
this.order = order;
this.rate = rate;
this.range = range;
this.prefixOfTable = prefixOfTable;
this.autoCreateTable = autoCreateTable;
}
@Override
public Integer call() {
long numOfTables = endTableInd - startTableInd;
if (numOfRowsPerTable < numOfValuesPerSQL)
numOfValuesPerSQL = (int) numOfRowsPerTable;
if (numOfTables < numOfTablesPerSQL)
numOfTablesPerSQL = (int) numOfTables;
int affectRows = 0;
// row
for (long rowCnt = 0; rowCnt < numOfRowsPerTable; ) {
long rowSize = numOfValuesPerSQL;
if (rowCnt + rowSize > numOfRowsPerTable) {
rowSize = numOfRowsPerTable - rowCnt;
}
//table
for (long tableCnt = startTableInd; tableCnt < endTableInd; ) {
long tableSize = numOfTablesPerSQL;
if (tableCnt + tableSize > endTableInd) {
tableSize = endTableInd - tableCnt;
}
long startTime = this.startTime + rowCnt * timeGap;
// System.out.println(Thread.currentThread().getName() + " >>> " + "rowCnt: " + rowCnt + ", rowSize: " + rowSize + ", " + "tableCnt: " + tableCnt + ",tableSize: " + tableSize + ", " + "startTime: " + startTime + ",timeGap: " + timeGap + "");
/***********************************************/
// 生成数据
List<SubTableValue> data = SubTableValueGenerator.generate(superTableMeta, prefixOfTable, tableCnt, tableSize, rowSize, startTime, timeGap);
// 乱序
if (order != 0)
SubTableValueGenerator.disrupt(data, rate, range);
// insert
if (autoCreateTable)
affectRows = insertAutoCreateTable(data);
else
affectRows = insert(data);
/***********************************************/
tableCnt += tableSize;
}
rowCnt += rowSize;
}
return affectRows;
}
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册