提交 2f7cf2b0 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into feature/ad

......@@ -39,7 +39,7 @@ create table D1002 using meters tags ("Beijing.Haidian", 2);
我们已经知道,可以通过下面这条SQL语句以一分钟为时间窗口、30秒为前向增量统计这些电表的平均电压。
```sql
select avg(voltage) from meters interval(1m) sliding(30s)
select avg(voltage) from meters interval(1m) sliding(30s);
```
每次执行这条语句,都会重新计算所有数据。
......@@ -47,14 +47,14 @@ select avg(voltage) from meters interval(1m) sliding(30s);
可以把上面的语句改进成下面的样子,每次使用不同的 `startTime` 并定期执行:
```sql
select avg(voltage) from meters where ts > {startTime} interval(1m) sliding(30s)
select avg(voltage) from meters where ts > {startTime} interval(1m) sliding(30s);
```
这样做没有问题,但TDengine提供了更简单的方法,
只要在最初的查询语句前面加上 `create table {tableName} as ` 就可以了, 例如:
```sql
create table avg_vol as select avg(voltage) from meters interval(1m) sliding(30s)
create table avg_vol as select avg(voltage) from meters interval(1m) sliding(30s);
```
会自动创建一个名为 `avg_vol` 的新表,然后每隔30秒,TDengine会增量执行 `as` 后面的 SQL 语句,
......@@ -80,7 +80,7 @@ taos> select * from avg_vol;
比如使用下面的SQL创建的连续查询将运行一小时,之后会自动停止。
```sql
create table avg_vol as select avg(voltage) from meters where ts > now and ts <= now + 1h interval(1m) sliding(30s)
create table avg_vol as select avg(voltage) from meters where ts > now and ts <= now + 1h interval(1m) sliding(30s);
```
需要说明的是,上面例子中的 `now` 是指创建连续查询的时间,而不是查询执行的时间,否则,查询就无法自动停止了。
......@@ -396,7 +396,7 @@ ts: 1597465200000 current: 11.2 voltage: 220 phase: 1 location: Beijing.Haidian
```sql
# taos
taos> use power
taos> use power;
taos> insert into d1001 values("2020-08-15 12:40:00.000", 12.4, 220, 1);
```
......
......@@ -276,14 +276,14 @@ SQL语句的解析和校验工作在客户端完成。解析SQL语句并生成
在TDengine中引入关键词interval来进行时间轴上固定长度时间窗口的切分,并按照时间窗口对数据进行聚合,对窗口范围内的数据按需进行聚合。例如:
```mysql
select count(*) from d1001 interval(1h)
select count(*) from d1001 interval(1h);
```
针对d1001设备采集的数据,按照1小时的时间窗口返回每小时存储的记录数量。
在需要连续获得查询结果的应用场景下,如果给定的时间区间存在数据缺失,会导致该区间数据结果也丢失。TDengine提供策略针对时间轴聚合计算的结果进行插值,通过使用关键词Fill就能够对时间轴聚合结果进行插值。例如:
```mysql
select count(*) from d1001 interval(1h) fill(prev)
select count(*) from d1001 interval(1h) fill(prev);
```
针对d1001设备采集数据统计每小时记录数,如果某一个小时不存在数据,这返回之前一个小时的统计数据。TDengine提供前向插值(prev)、线性插值(linear)、NULL值填充(NULL)、特定值填充(value)。
......
......@@ -89,7 +89,7 @@ taos>
2. 在第一个数据节点,使用CLI程序taos, 登录进TDengine系统, 执行命令:
```
CREATE DNODE "h2.taos.com:6030"
CREATE DNODE "h2.taos.com:6030";
```
将新数据节点的End Point (准备工作中第四步获知的) 添加进集群的EP列表。**"fqdn:port"需要用双引号引起来**,否则出错。请注意将示例的“h2.taos.com:6030" 替换为这个新数据节点的End Point。
......@@ -97,7 +97,7 @@ taos>
3. 然后执行命令
```
SHOW DNODES
SHOW DNODES;
```
查看新节点是否被成功加入。如果该被加入的数据节点处于离线状态,请做两个检查
......@@ -122,7 +122,7 @@ taos>
执行CLI程序taos, 使用root账号登录进系统, 执行:
```
CREATE DNODE "fqdn:port"
CREATE DNODE "fqdn:port";
```
将新数据节点的End Point添加进集群的EP列表。**"fqdn:port"需要用双引号引起来**,否则出错。一个数据节点对外服务的fqdn和port可以通过配置文件taos.cfg进行配置,缺省是自动获取。【强烈不建议用自动获取方式来配置FQDN,可能导致生成的数据节点的End Point不是所期望的】
......
......@@ -46,7 +46,7 @@ taos>
5. 在第一个节点,使用CLI程序taos, 登录进TDengine系统, 使用命令:
```
CREATE DNODE "h2.taos.com:6030"
CREATE DNODE "h2.taos.com:6030";
```
将新节点的End Point添加进集群的EP列表。**"fqdn:port"需要用双引号引起来**,否则出错。请注意将示例的“h2.taos.com:6030" 替换为你自己第一个节点的End Point
......@@ -54,7 +54,7 @@ taos>
6. 使用命令
```
SHOW DNODES
SHOW DNODES;
```
查看新节点是否被成功加入。
......@@ -71,7 +71,7 @@ taos>
###添加节点
执行CLI程序taos, 使用root账号登录进系统, 执行:
```
CREATE DNODE "fqdn:port"
CREATE DNODE "fqdn:port";
```
将新节点的End Point添加进集群的EP列表。**"fqdn:port"需要用双引号引起来**,否则出错。一个节点对外服务的fqdn和port可以通过配置文件taos.cfg进行配置,缺省是自动获取。
......
......@@ -261,7 +261,7 @@ typedef struct {
};
int32_t insertType;
int32_t clauseIndex; // index of multiple subclause query
int32_t clauseIndex; // index of multiple subclause query
char * curSql; // current sql, resume position of sql after parsing paused
int8_t parseFinished;
......@@ -276,7 +276,8 @@ typedef struct {
int32_t numOfParams;
int8_t dataSourceType; // load data from file or not
int8_t submitSchema; // submit block is built with table schema
int8_t submitSchema; // submit block is built with table schema
STagData tagData;
SHashObj *pTableList; // referred table involved in sql
SArray *pDataBlocks; // SArray<STableDataBlocks*> submit data blocks after parsing sql
} SSqlCmd;
......
......@@ -791,7 +791,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
sql += index;
tscAllocPayload(pCmd, sizeof(STagData));
STagData *pTag = (STagData *) pCmd->payload;
STagData *pTag = &pCmd->tagData;
memset(pTag, 0, sizeof(STagData));
......@@ -946,7 +946,6 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return tscSQLSyntaxErrMsg(pCmd->payload, ") expected", sToken.z);
}
pCmd->payloadLen = sizeof(pTag->name) + sizeof(pTag->dataLen) + pTag->dataLen;
pTag->dataLen = htonl(pTag->dataLen);
if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
......
......@@ -1495,43 +1495,29 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMTableInfoMsg *pInfoMsg;
char * pMsg;
int msgLen = 0;
char *tmpData = NULL;
uint32_t len = pSql->cmd.payloadLen;
if (len > 0) {
if ((tmpData = calloc(1, len)) == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
// STagData is in binary format, strncpy is not available
memcpy(tmpData, pSql->cmd.payload, len);
}
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
SCMTableInfoMsg* pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
char* pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
if (pSql->cmd.autoCreated && len > 0) {
memcpy(pInfoMsg->tags, tmpData, len);
pMsg += len;
size_t len = htonl(pCmd->tagData.dataLen);
if (pSql->cmd.autoCreated) {
if (len > 0) {
len += sizeof(pCmd->tagData.name) + sizeof(pCmd->tagData.dataLen);
memcpy(pInfoMsg->tags, &pCmd->tagData, len);
pMsg += len;
}
}
pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
taosTFree(tmpData);
assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
return TSDB_CODE_SUCCESS;
}
......@@ -2184,8 +2170,7 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf
assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
memcpy(pNew->cmd.payload, pSql->cmd.payload, pSql->cmd.payloadLen); // tag information if table does not exists.
pNew->cmd.payloadLen = pSql->cmd.payloadLen;
memcpy(&pNew->cmd.tagData, &pSql->cmd.tagData, sizeof(pSql->cmd.tagData));
tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
pNew->fp = tscTableMetaCallBack;
......
......@@ -1806,6 +1806,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
pCmd->command = cmd;
pCmd->parseFinished = 1;
pCmd->autoCreated = pSql->cmd.autoCreated;
memcpy(&pCmd->tagData, &pSql->cmd.tagData, sizeof(pCmd->tagData));
if (tscAddSubqueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pNew);
......
Subproject commit 06ec30a0f1762e8169bf6b9045c82bcaa52bcdf0
Subproject commit 8d7bf743852897110cbdcc7c4322cd7a74d4167b
......@@ -262,7 +262,9 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
if (pIMem != NULL) {
ASSERT(pRepo->commit);
tsdbDebug("vgId:%d waiting for the commit thread", REPO_ID(pRepo));
code = pthread_join(pRepo->commitThread, NULL);
tsdbDebug("vgId:%d commit thread is finished", REPO_ID(pRepo));
if (code != 0) {
tsdbError("vgId:%d failed to thread join since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
......
# custom
/out/
/logs/
*.jar
# Created by .ignore support plugin (hsz.mobi)
.gitignore
# Build Artifacts
.gradle/*
build/*
target/*
bin/*
dependency-reduced-pom.xml
# Eclipse Project Files
.classpath
.project
.settings/*
......@@ -65,5 +65,10 @@
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
</project>
package com.taosdata.example.jdbcTaosdemo;
import com.taosdata.example.jdbcTaosdemo.domain.JdbcTaosdemoConfig;
import com.taosdata.example.jdbcTaosdemo.task.CreateTableTask;
import com.taosdata.example.jdbcTaosdemo.task.InsertTableDatetimeTask;
import com.taosdata.example.jdbcTaosdemo.task.InsertTableTask;
import com.taosdata.example.jdbcTaosdemo.utils.TimeStampUtil;
import com.taosdata.jdbc.TSDBDriver;
import org.apache.log4j.Logger;
import java.sql.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
public class JdbcTaosdemo {
private static Logger logger = Logger.getLogger(JdbcTaosdemo.class);
private static AtomicLong beginTimestamp = new AtomicLong(TimeStampUtil.datetimeToLong("2005-01-01 00:00:00.000"));
private final JdbcTaosdemoConfig config;
private Connection connection;
private static final String[] locations = {"Beijing", "Shanghai", "Guangzhou", "Shenzhen", "HangZhou", "Tianjin", "Wuhan", "Changsha", "Nanjing", "Xian"};
private static Random random = new Random(System.currentTimeMillis());
public JdbcTaosdemo(JdbcTaosdemoConfig config) {
this.config = config;
}
public static void main(String[] args) {
JdbcTaosdemoConfig config = JdbcTaosdemoConfig.build(args);
boolean isHelp = Arrays.asList(args).contains("--help");
if (isHelp) {
JdbcTaosdemoConfig.printHelp();
return;
}
if (config.getHost() == null) {
JdbcTaosdemoConfig.printHelp();
return;
}
JdbcTaosdemo taosdemo = new JdbcTaosdemo(config);
taosdemo.init();
taosdemo.dropDatabase();
taosdemo.createDatabase();
taosdemo.useDatabase();
taosdemo.createSuperTable();
taosdemo.createTableMultiThreads();
boolean infinite = Arrays.asList(args).contains("--infinite");
if (infinite) {
logger.info("!!! Infinite Insert Mode Started. !!!!");
taosdemo.insertInfinite();
} else {
taosdemo.insertMultiThreads();
taosdemo.countFromSuperTable();
if (config.isDeleteTable())
taosdemo.dropSuperTable();
taosdemo.close();
}
}
/**
* establish the connection
*/
private void init() {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
connection = getConnection(config);
if (connection != null)
logger.info("[ OK ] Connection established.");
} catch (ClassNotFoundException | SQLException e) {
logger.error(e.getMessage());
throw new RuntimeException("connection failed: " + config.getHost());
}
}
public static Connection getConnection(JdbcTaosdemoConfig config) throws SQLException {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, config.getHost());
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, config.getUser());
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, config.getPassword());
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
return DriverManager.getConnection("jdbc:TAOS://" + config.getHost() + ":" + config.getPort() + "/" + config.getDbName() + "", properties);
}
/**
* create database
*/
private void createDatabase() {
String sql = "create database if not exists " + config.getDbName() + " keep " + config.getKeep() + " days " + config.getDays();
execute(sql);
}
private void dropDatabase() {
String sql = "drop database if exists " + config.getDbName();
execute(sql);
}
/**
* use database
*/
private void useDatabase() {
String sql = "use " + config.getDbName();
execute(sql);
}
private void createSuperTable() {
String sql = "create table if not exists " + config.getStbName() + "(ts timestamp, current float, voltage int, phase float) tags(location binary(64), groupId int)";
execute(sql);
}
/**
* create table use super table with multi threads
*/
private void createTableMultiThreads() {
try {
final int tableSize = config.getNumberOfTable() / config.getNumberOfThreads();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < config.getNumberOfThreads(); i++) {
Thread thread = new Thread(new CreateTableTask(config, i * tableSize, tableSize), "Thread-" + i);
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
logger.info(">>> Multi Threads create table finished.");
} catch (InterruptedException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
private void insertInfinite() {
try {
final long startDatetime = TimeStampUtil.datetimeToLong("2005-01-01 00:00:00.000");
final long finishDatetime = TimeStampUtil.datetimeToLong("2030-01-01 00:00:00.000");
final int tableSize = config.getNumberOfTable() / config.getNumberOfThreads();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < config.getNumberOfThreads(); i++) {
Thread thread = new Thread(new InsertTableDatetimeTask(config, i * tableSize, tableSize, startDatetime, finishDatetime), "Thread-" + i);
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
logger.info(">>> Multi Threads insert table finished.");
} catch (InterruptedException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
private void insertMultiThreads() {
try {
final int tableSize = config.getNumberOfTable() / config.getNumberOfThreads();
final int numberOfRecordsPerTable = config.getNumberOfRecordsPerTable();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < config.getNumberOfThreads(); i++) {
Thread thread = new Thread(new InsertTableTask(config, i * tableSize, tableSize, numberOfRecordsPerTable), "Thread-" + i);
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
logger.info(">>> Multi Threads insert table finished.");
} catch (InterruptedException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
public static String insertSql(int tableIndex, JdbcTaosdemoConfig config) {
float current = 10 + random.nextFloat();
int voltage = 200 + random.nextInt(20);
float phase = random.nextFloat();
String sql = "insert into " + config.getDbName() + "." + config.getTbPrefix() + "" + tableIndex + " " +
"values(" + beginTimestamp.getAndIncrement() + ", " + current + ", " + voltage + ", " + phase + ") ";
return sql;
}
public static String insertSql(int tableIndex, long ts, JdbcTaosdemoConfig config) {
float current = 10 + random.nextFloat();
int voltage = 200 + random.nextInt(20);
float phase = random.nextFloat();
String sql = "insert into " + config.getDbName() + "." + config.getTbPrefix() + "" + tableIndex + " " +
"values(" + ts + ", " + current + ", " + voltage + ", " + phase + ") ";
return sql;
}
public static String batchInsertSql(int tableIndex, long ts, int valueCnt, JdbcTaosdemoConfig config) {
float current = 10 + random.nextFloat();
int voltage = 200 + random.nextInt(20);
float phase = random.nextFloat();
StringBuilder sb = new StringBuilder();
sb.append("insert into " + config.getDbName() + "." + config.getTbPrefix() + "" + tableIndex + " " + "values");
for (int i = 0; i < valueCnt; i++) {
sb.append("(" + (ts + i) + ", " + current + ", " + voltage + ", " + phase + ") ");
}
return sb.toString();
}
public static String createTableSql(int tableIndex, JdbcTaosdemoConfig config) {
String location = locations[random.nextInt(locations.length)];
return "create table d" + tableIndex + " using " + config.getDbName() + "." + config.getStbName() + " tags('" + location + "'," + tableIndex + ")";
}
private void countFromSuperTable() {
String sql = "select count(*) from " + config.getDbName() + "." + config.getStbName();
executeQuery(sql);
}
private void close() {
try {
if (connection != null) {
this.connection.close();
logger.info("connection closed.");
}
} catch (SQLException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
/**
* drop super table
*/
private void dropSuperTable() {
String sql = "drop table if exists " + config.getDbName() + "." + config.getStbName();
execute(sql);
}
/**
* execute sql, use this method when sql is create, alter, drop..
*/
private void execute(String sql) {
try (Statement statement = connection.createStatement()) {
long start = System.currentTimeMillis();
boolean execute = statement.execute(sql);
long end = System.currentTimeMillis();
printSql(sql, execute, (end - start));
} catch (SQLException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
private static void printSql(String sql, boolean succeed, long cost) {
logger.info("[ " + (succeed ? "OK" : "ERROR!") + " ] time cost: " + cost + " ms, execute statement ====> " + sql);
}
private void executeQuery(String sql) {
try (Statement statement = connection.createStatement()) {
long start = System.currentTimeMillis();
ResultSet resultSet = statement.executeQuery(sql);
long end = System.currentTimeMillis();
printSql(sql, true, (end - start));
printResult(resultSet);
} catch (SQLException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
private static void printResult(ResultSet resultSet) throws SQLException {
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) {
StringBuilder sb = new StringBuilder();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
String columnLabel = metaData.getColumnLabel(i);
String value = resultSet.getString(i);
sb.append(columnLabel + ": " + value + "\t");
}
logger.info(sb.toString());
}
}
}
package com.taosdata.example.jdbcTaosdemo.domain;
public class JdbcTaosdemoConfig {
//The host to connect to TDengine. Must insert one
private String host;
//The TCP/IP port number to use for the connection. Default is 6030.
private int port = 6030;
//The TDengine user name to use when connecting to the server. Default is 'root'
private String user = "root";
//The password to use when connecting to the server. Default is 'taosdata'
private String password = "taosdata";
//Destination database. Default is 'test'
private String dbName = "test";
//keep
private int keep = 365 * 20;
//
private int days = 30;
//Super table Name. Default is 'meters'
private String stbName = "meters";
//Table name prefix. Default is 'd'
private String tbPrefix = "d";
//The number of threads. Default is 10.
private int numberOfThreads = 10;
//The number of tables. Default is 10000.
private int numberOfTable = 10000;
//The number of records per table. Default is 100000
private int numberOfRecordsPerTable = 100000;
//Delete data. Default is false
private boolean deleteTable = true;
public static void printHelp() {
System.out.println("Usage: java -jar JDBCConnectorChecker.jar -h host [OPTION...]");
System.out.println("-p port The TCP/IP port number to use for the connection. Default is 6030");
System.out.println("-u user The TDengine user name to use when connecting to the server. Default is 'root'");
System.out.println("-P password The password to use when connecting to the server.Default is 'taosdata'");
System.out.println("-d database Destination database. Default is 'test'");
System.out.println("-m tablePrefix Table prefix name. Default is 'd'");
System.out.println("-T num_of_threads The number of threads. Default is 10");
System.out.println("-t num_of_tables The number of tables. Default is 10000");
System.out.println("-n num_of_records_per_table The number of records per table. Default is 100000");
System.out.println("-D delete table Delete data methods. Default is false");
System.out.println("--help Give this help list");
}
/**
* parse args from command line
*
* @param args command line args
* @return JdbcTaosdemoConfig
*/
public static JdbcTaosdemoConfig build(String[] args) {
JdbcTaosdemoConfig config = new JdbcTaosdemoConfig();
for (int i = 0; i < args.length; i++) {
if ("-h".equals(args[i]) && i < args.length - 1) {
config.setHost(args[++i]);
}
if ("-p".equals(args[i]) && i < args.length - 1) {
config.setPort(Integer.parseInt(args[++i]));
}
if ("-u".equals(args[i]) && i < args.length - 1) {
config.setUser(args[++i]);
}
if ("-P".equals(args[i]) && i < args.length - 1) {
config.setPassword(args[++i]);
}
if ("-d".equals(args[i]) && i < args.length - 1) {
config.setDbName(args[++i]);
}
if ("-m".equals(args[i]) && i < args.length - 1) {
config.setTbPrefix(args[++i]);
}
if ("-T".equals(args[i]) && i < args.length - 1) {
config.setNumberOfThreads(Integer.parseInt(args[++i]));
}
if ("-t".equals(args[i]) && i < args.length - 1) {
config.setNumberOfTable(Integer.parseInt(args[++i]));
}
if ("-n".equals(args[i]) && i < args.length - 1) {
config.setNumberOfRecordsPerTable(Integer.parseInt(args[++i]));
}
if ("-D".equals(args[i]) && i < args.length - 1) {
config.setDeleteTable(Boolean.parseBoolean(args[++i]));
}
}
return config;
}
public void setHost(String host) {
this.host = host;
}
public String getHost() {
return host;
}
public String getDbName() {
return dbName;
}
public void setDbName(String dbName) {
this.dbName = dbName;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getStbName() {
return stbName;
}
public void setStbName(String stbName) {
this.stbName = stbName;
}
public String getTbPrefix() {
return tbPrefix;
}
public void setTbPrefix(String tbPrefix) {
this.tbPrefix = tbPrefix;
}
public int getNumberOfThreads() {
return numberOfThreads;
}
public void setNumberOfThreads(int numberOfThreads) {
this.numberOfThreads = numberOfThreads;
}
public int getNumberOfTable() {
return numberOfTable;
}
public void setNumberOfTable(int numberOfTable) {
this.numberOfTable = numberOfTable;
}
public int getNumberOfRecordsPerTable() {
return numberOfRecordsPerTable;
}
public void setNumberOfRecordsPerTable(int numberOfRecordsPerTable) {
this.numberOfRecordsPerTable = numberOfRecordsPerTable;
}
public boolean isDeleteTable() {
return deleteTable;
}
public void setDeleteTable(boolean deleteTable) {
this.deleteTable = deleteTable;
}
public int getKeep() {
return keep;
}
public void setKeep(int keep) {
this.keep = keep;
}
public int getDays() {
return days;
}
public void setDays(int days) {
this.days = days;
}
}
package com.taosdata.example.jdbcTaosdemo.task;
import com.taosdata.example.jdbcTaosdemo.JdbcTaosdemo;
import com.taosdata.example.jdbcTaosdemo.domain.JdbcTaosdemoConfig;
import org.apache.log4j.Logger;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
public class CreateTableTask implements Runnable {
private static Logger logger = Logger.getLogger(CreateTableTask.class);
private final JdbcTaosdemoConfig config;
private final int startIndex;
private final int tableNumber;
public CreateTableTask(JdbcTaosdemoConfig config, int startIndex, int tableNumber) {
this.config = config;
this.startIndex = startIndex;
this.tableNumber = tableNumber;
}
@Override
public void run() {
try {
Connection connection = JdbcTaosdemo.getConnection(config);
for (int i = startIndex; i < startIndex + tableNumber; i++) {
Statement statement = connection.createStatement();
String sql = JdbcTaosdemo.createTableSql(i + 1, config);
// long start = System.currentTimeMillis();
boolean execute = statement.execute(sql);
// long end = System.currentTimeMillis();
// printSql(sql, execute, (end - start));
statement.close();
logger.info(">>> " + sql);
}
connection.close();
} catch (SQLException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
}
package com.taosdata.example.jdbcTaosdemo.task;
import com.taosdata.example.jdbcTaosdemo.JdbcTaosdemo;
import com.taosdata.example.jdbcTaosdemo.domain.JdbcTaosdemoConfig;
import org.apache.log4j.Logger;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
public class InsertTableDatetimeTask implements Runnable {
private static Logger logger = Logger.getLogger(InsertTableDatetimeTask.class);
private final JdbcTaosdemoConfig config;
private final int startTableIndex;
private final int tableNumber;
private final long startDatetime;
private final long finishedDatetime;
public InsertTableDatetimeTask(JdbcTaosdemoConfig config, int startTableIndex, int tableNumber, long startDatetime, long finishedDatetime) {
this.config = config;
this.startTableIndex = startTableIndex;
this.tableNumber = tableNumber;
this.startDatetime = startDatetime;
this.finishedDatetime = finishedDatetime;
}
@Override
public void run() {
try {
Connection connection = JdbcTaosdemo.getConnection(config);
int valueCnt = 100;
for (long ts = startDatetime; ts < finishedDatetime; ts+= valueCnt) {
for (int i = startTableIndex; i < startTableIndex + tableNumber; i++) {
// String sql = JdbcTaosdemo.insertSql(i + 1, ts, config);
String sql = JdbcTaosdemo.batchInsertSql(i + 1, ts, valueCnt, config);
Statement statement = connection.createStatement();
statement.execute(sql);
statement.close();
logger.info(Thread.currentThread().getName() + ">>> " + sql);
}
}
connection.close();
} catch (SQLException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
}
package com.taosdata.example.jdbcTaosdemo.task;
import com.taosdata.example.jdbcTaosdemo.JdbcTaosdemo;
import com.taosdata.example.jdbcTaosdemo.domain.JdbcTaosdemoConfig;
import org.apache.log4j.Logger;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
public class InsertTableTask implements Runnable {
private static final Logger logger = Logger.getLogger(InsertTableTask.class);
private final JdbcTaosdemoConfig config;
private final int startIndex;
private final int tableNumber;
private final int recordsNumber;
public InsertTableTask(JdbcTaosdemoConfig config, int startIndex, int tableNumber, int recordsNumber) {
this.config = config;
this.startIndex = startIndex;
this.tableNumber = tableNumber;
this.recordsNumber = recordsNumber;
}
@Override
public void run() {
try {
Connection connection = JdbcTaosdemo.getConnection(config);
for (int i = startIndex; i < startIndex + tableNumber; i++) {
for (int j = 0; j < recordsNumber; j++) {
String sql = JdbcTaosdemo.insertSql(i + 1, config);
Statement statement = connection.createStatement();
statement.execute(sql);
statement.close();
logger.info(Thread.currentThread().getName() + ">>> " + sql);
}
}
connection.close();
} catch (SQLException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
}
package com.taosdata.example.jdbcTaosdemo.utils;
import java.sql.Date;
import java.text.ParseException;
import java.text.SimpleDateFormat;
public class TimeStampUtil {
private static final String datetimeFormat = "yyyy-MM-dd HH:mm:ss.SSS";
public static long datetimeToLong(String dateTime) {
SimpleDateFormat sdf = new SimpleDateFormat(datetimeFormat);
try {
return sdf.parse(dateTime).getTime();
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
public static String longToDatetime(long time) {
SimpleDateFormat sdf = new SimpleDateFormat(datetimeFormat);
return sdf.format(new Date(time));
}
public static void main(String[] args) {
final String startTime = "2005-01-01 00:00:00.000";
long start = TimeStampUtil.datetimeToLong(startTime);
System.out.println(start);
String datetime = TimeStampUtil.longToDatetime(1519833600000L);
System.out.println(datetime);
}
}
### 设置###
#log4j.rootLogger=debug,stdout,DebugLog,ErrorLog
log4j.rootLogger=debug,DebugLog,ErrorLog
### 输出信息到控制抬 ###
#log4j.appender.stdout=org.apache.log4j.ConsoleAppender
#log4j.appender.stdout.Target=System.out
#log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
#log4j.appender.stdout.layout.ConversionPattern=[%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
### 输出DEBUG 级别以上的日志到=logs/error.log ###
log4j.appender.DebugLog=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DebugLog.File=logs/debug.log
log4j.appender.DebugLog.Append=true
log4j.appender.DebugLog.Threshold=DEBUG
log4j.appender.DebugLog.layout=org.apache.log4j.PatternLayout
log4j.appender.DebugLog.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
### 输出ERROR 级别以上的日志到=logs/error.log ###
log4j.appender.ErrorLog=org.apache.log4j.DailyRollingFileAppender
log4j.appender.ErrorLog.File=logs/error.log
log4j.appender.ErrorLog.Append=true
log4j.appender.ErrorLog.Threshold=ERROR
log4j.appender.ErrorLog.layout=org.apache.log4j.PatternLayout
log4j.appender.ErrorLog.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
\ No newline at end of file
......@@ -22,8 +22,12 @@ if __name__ == '__main__':
# @password : Password
# @database : Database to use when connecting to TDengine server
# @config : Configuration directory
conn = taos.connect(host="127.0.0.1", user="root", password="taosdata", config="/etc/taos")
if len(sys.argv)>1:
hostname=sys.argv[1]
conn = taos.connect(host=hostname, user="root", password="taosdata", config="/etc/taos")
else:
conn = taos.connect(host="127.0.0.1", user="root", password="taosdata", config="/etc/taos")
# Generate a cursor object to run SQL commands
c1 = conn.cursor()
# Create a database named db
......
......@@ -22,8 +22,8 @@ IF (TD_LINUX)
#add_executable(importOneRow importOneRow.c)
#target_link_libraries(importOneRow taos_static pthread)
#add_executable(importPerTable importPerTable.c)
#target_link_libraries(importPerTable taos_static pthread)
add_executable(importPerTable importPerTable.c)
target_link_libraries(importPerTable taos_static pthread)
#add_executable(hashPerformance hashPerformance.c)
#target_link_libraries(hashPerformance taos_static tutil common pthread)
......
......@@ -20,6 +20,7 @@
#include "ttimer.h"
#include "tutil.h"
#include "tglobal.h"
#include "osTime.h"
#define MAX_RANDOM_POINTS 20000
#define GREEN "\033[1;32m"
......@@ -43,14 +44,16 @@ void createDbAndTable();
void insertData();
int32_t randomData[MAX_RANDOM_POINTS];
int64_t rowsPerTable = 10000;
int64_t rowsPerTable = 1000000;
int64_t pointsPerTable = 1;
int64_t numOfThreads = 1;
int64_t numOfTablesPerThread = 1;
int64_t numOfThreads = 10;
int64_t numOfTablesPerThread = 100;
char dbName[32] = "db";
char stableName[64] = "st";
int32_t cache = 16384;
int32_t tables = 1000;
int64_t totalUs = 0;
int64_t reqNum = 0;
int64_t maxUs = 0;
int64_t minUs = 100000000;
int main(int argc, char *argv[]) {
shellParseArgument(argc, argv);
......@@ -58,6 +61,38 @@ int main(int argc, char *argv[]) {
taos_init();
createDbAndTable();
insertData();
int64_t avgUs = totalUs / reqNum;
pError("%s totalUs:%ld, avgUs:%ld maxUs:%ld minUs:%ld reqNum:%ld %s\n", GREEN, totalUs, avgUs, maxUs, minUs, reqNum, NC);
}
int32_t query(void *con, char *qstr) {
int64_t begin = taosGetTimestampUs();
TAOS_RES *pSql = taos_query(con, qstr);
int32_t code = taos_errno(pSql);
if (code != 0) {
pError("failed to exec sql:%s, code:%d reason:%s", qstr, taos_errno(con), taos_errstr(con));
exit(0);
}
taos_free_result(pSql);
int64_t us = taosGetTimestampUs() - begin;
maxUs = MAX(us, maxUs);
minUs = MIN(us, minUs);
atomic_add_fetch_64(&totalUs, us);
atomic_add_fetch_64(&reqNum, 1);
if (reqNum > 100000) {
int64_t avgUs = totalUs / reqNum;
if (us > avgUs * 100) {
pError("sql:%s", qstr);
pError("%s totalUs:%ld, avgUs:%ld maxUs:%ld minUs:%ld reqNum:%ld %s\n", GREEN, totalUs, avgUs, maxUs, minUs,
reqNum, NC);
taosMsleep(1000);
exit(0);
}
}
return code;
}
void createDbAndTable() {
......@@ -79,14 +114,14 @@ void createDbAndTable() {
exit(1);
}
sprintf(qstr, "create database if not exists %s cache %d tables %d", dbName, cache, tables);
if (taos_query(con, qstr)) {
sprintf(qstr, "create database if not exists %s", dbName);
if (query(con, qstr)) {
pError("failed to create database:%s, code:%d reason:%s", dbName, taos_errno(con), taos_errstr(con));
exit(0);
}
sprintf(qstr, "use %s", dbName);
if (taos_query(con, qstr)) {
if (query(con, qstr)) {
pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con));
exit(0);
}
......@@ -102,14 +137,14 @@ void createDbAndTable() {
}
sprintf(qstr + len, ") tags(t int)");
if (taos_query(con, qstr)) {
if (query(con, qstr)) {
pError("failed to create stable, code:%d reason:%s", taos_errno(con), taos_errstr(con));
exit(0);
}
for (int64_t t = 0; t < totalTables; ++t) {
sprintf(qstr, "create table if not exists %s%ld using %s tags(%ld)", stableName, t, stableName, t);
if (taos_query(con, qstr)) {
if (query(con, qstr)) {
pError("failed to create table %s%" PRId64 ", reason:%s", stableName, t, taos_errstr(con));
exit(0);
}
......@@ -122,7 +157,7 @@ void createDbAndTable() {
}
sprintf(qstr + len, ")");
if (taos_query(con, qstr)) {
if (query(con, qstr)) {
pError("failed to create table %s%ld, reason:%s", stableName, t, taos_errstr(con));
exit(0);
}
......@@ -207,7 +242,7 @@ void *syncTest(void *param) {
}
sprintf(qstr, "use %s", pInfo->dbName);
taos_query(con, qstr);
query(con, qstr);
gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
......@@ -229,7 +264,7 @@ void *syncTest(void *param) {
}
len += sprintf(sql + len, ")");
if (len > maxBytes) {
if (taos_query(con, qstr)) {
if (query(con, qstr)) {
pError("thread:%d, failed to import table:%s%ld row:%ld, reason:%s", pInfo->threadIndex, pInfo->stableName,
table, row, taos_errstr(con));
}
......@@ -246,7 +281,7 @@ void *syncTest(void *param) {
}
if (len != strlen(inserStr)) {
taos_query(con, qstr);
query(con, qstr);
}
gettimeofday(&systemTime, NULL);
......@@ -284,10 +319,6 @@ void printHelp() {
printf("%s%s%s%" PRId64 "\n", indent, indent, "Number of threads to be used, default is ", numOfThreads);
printf("%s%s\n", indent, "-n");
printf("%s%s%s%" PRId64 "\n", indent, indent, "Number of tables per thread, default is ", numOfTablesPerThread);
printf("%s%s\n", indent, "-tables");
printf("%s%s%s%d\n", indent, indent, "Database parameters tables, default is ", tables);
printf("%s%s\n", indent, "-cache");
printf("%s%s%s%d\n", indent, indent, "Database parameters cache, default is ", cache);
exit(EXIT_SUCCESS);
}
......@@ -311,10 +342,6 @@ void shellParseArgument(int argc, char *argv[]) {
numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0) {
numOfTablesPerThread = atoi(argv[++i]);
} else if (strcmp(argv[i], "-tables") == 0) {
tables = atoi(argv[++i]);
} else if (strcmp(argv[i], "-cache") == 0) {
cache = atoi(argv[++i]);
} else {
}
}
......@@ -323,8 +350,6 @@ void shellParseArgument(int argc, char *argv[]) {
pPrint("%spointsPerTable:%" PRId64 "%s", GREEN, pointsPerTable, NC);
pPrint("%snumOfThreads:%" PRId64 "%s", GREEN, numOfThreads, NC);
pPrint("%snumOfTablesPerThread:%" PRId64 "%s", GREEN, numOfTablesPerThread, NC);
pPrint("%scache:%d%s", GREEN, cache, NC);
pPrint("%stables:%d%s", GREEN, tables, NC);
pPrint("%sdbName:%s%s", GREEN, dbName, NC);
pPrint("%stableName:%s%s", GREEN, stableName, NC);
pPrint("%sstart to run%s", GREEN, NC);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册