未验证 提交 b389f1cf 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #402 from fangpanpan/master

add opentsdb test code
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.opentsdb.test</groupId>
<artifactId>opentsdbtest</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-plugins</artifactId>
<version>30</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<archive>
<manifest>
<mainClass>OpentsdbTest</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<name>opentsdbtest</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>net.opentsdb</groupId>
<artifactId>opentsdb_gwt_theme</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>net.opentsdb</groupId>
<artifactId>opentsdb</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<dependency>
<groupId>com.google.gwt</groupId>
<artifactId>gwt-user</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.9.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.10.6.Final</version>
</dependency>
<dependency>
<groupId>com.stumbleupon</groupId>
<artifactId>async</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-jexl</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.4</version>
</dependency>
<dependency>
<groupId>org.jgrapht</groupId>
<artifactId>jgrapht-core</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>2.21.1</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-mapper-runtime</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.0.13</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.13</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>com.github.eulery</groupId>
<artifactId>opentsdb-java-sdk</artifactId>
<version>1.1.4</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>hbase</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.hbase</groupId>
<artifactId>asynchbase</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import lombok.extern.slf4j.Slf4j;
import net.opentsdb.core.TSDB;
import net.opentsdb.uid.NoSuchUniqueName;
import net.opentsdb.uid.UniqueId.UniqueIdType;
import net.opentsdb.utils.Config;
import java.net.URL;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.ResponseHandler;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import java.io.BufferedWriter;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.Charset;
import java.text.DecimalFormat;
import java.util.Random;
import java.util.ArrayList;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.math.*;
import java.lang.reflect.Method;
public class OpentsdbTest{
//static { System.setProperty("logback.configurationFile", "/home/ubuntu/fang/opentsdb/opentsdbtest/logback.xml");}
static { System.setProperty("logback.configurationFile", "/etc/opentsdb/logback.xml");}
public static void main(String args[]) {
// begin to parse argument
String datadir = "/home/ubuntu/testdata";
String sqlchoice = "q1";
int numOfRows = 1000000;
int numOfFiles = 0;
int numOfClients = 1;
int rowsPerRequest = 1;
for (int i = 0; i < args.length; ++i) {
if (args[i].equalsIgnoreCase("-dataDir")) {
if (i < args.length - 1) {
datadir = args[++i];
}
} else if (args[i].equalsIgnoreCase("-numofFiles")) {
if (i < args.length - 1) {
numOfFiles = Integer.parseInt(args[++i]);
}
} else if (args[i].equalsIgnoreCase("-rowsPerRequest")) {
if (i < args.length - 1) {
rowsPerRequest = Integer.parseInt(args[++i]);
}
} else if (args[i].equalsIgnoreCase("-writeClients")) {
if (i < args.length - 1) {
numOfClients = Integer.parseInt(args[++i]);
}
} else if (args[i].equalsIgnoreCase("-sql")) {
sqlchoice = args[++i];
}
}
System.out.println("parameters:\n");
if (numOfFiles >0) {
// write data
System.out.printf("----dataDir:%s\n", datadir);
System.out.printf("----numOfFiles:%d\n", numOfFiles);
System.out.printf("----numOfClients:%d\n", numOfClients);
System.out.printf("----rowsPerRequest:%d\n", rowsPerRequest);
try {
// begin to insert data
System.out.printf("----begin to insert data\n");
long startTime = System.currentTimeMillis();
int a = numOfFiles/numOfClients;
int b = numOfFiles%numOfClients;
int last = 0;
WriteThread[] writethreads = new WriteThread[numOfClients];
int[] wargs = new int[2]; // data file start, end
wargs[0] = numOfRows; //rows to be read from each file
wargs[1] = rowsPerRequest;
int fstart =0;
int fend =0;
for (int i = 0; i<numOfClients; ++i) {
if (i<b) {
fstart = last;
fend = last+a;
last = last+a+1;
writethreads[i] = new WriteThread(fstart,fend,wargs,datadir);
System.out.printf("----Thread %d begin to write\n",i);
writethreads[i].start();
} else {
fstart = last;
fend = last+a-1;
last = last+a;
writethreads[i] = new WriteThread(fstart,fend,wargs,datadir);
System.out.printf("----Thread %d begin to write\n",i);
writethreads[i].start();
}
}
for (int i =0; i<numOfClients; ++i) {
try {
writethreads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long stopTime = System.currentTimeMillis();
float elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
float speeds = numOfRows*numOfFiles/elapseTime;
System.out.printf("---- insertation speed: %f Rows/Second\n",speeds);
} catch (Exception ex) {
ex.printStackTrace();
System.exit(1);
} finally {
System.out.printf("---- insertion end\n");
}
// above:write part; below: read part;
} else {
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
String filter_reg;
String get_url;
long startTime;
long stopTime;
float elapseTime;
CloseableHttpResponse responseBody;
StringEntity stringEntity;
HttpPost httpPost;
String qjson;
for (int ig = 10; ig <110; ig = ig+10) {
if (ig == 10) {
filter_reg = "\\b[0-9]\\b";
} else {
filter_reg = "\\b" + "([0-9]|"
+ "[" + "1" + "-"
+ Integer.toString(ig/10-1) + "][0-9])" +"\\b";
}
switch (sqlchoice) {
case "q1":
get_url = "http://192.168.1.114:4242/api/query?";
/*
get_url = get_url + "start=1563249700&m=none:temperature{devgroup=";
get_url = get_url + String.valueOf(ig-10) +"}";
*/
startTime = System.currentTimeMillis();
httpPost = new HttpPost(get_url);
qjson = " {\n" +
" \"start\": 1563249700,\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"none\",\n" +
" \"metric\": \"temperature\",\n" +
" \"tags\": {\n" +
" \"devgroup\": " + "\"" + Integer.toString(ig-10) + "\"" + "\n" +
" }\n" +
" }\n" +
" ]\n" +
" }";
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
stringEntity = new StringEntity(qjson);
httpPost.setEntity(stringEntity);
responseBody = httpclient.execute(httpPost);
/*
System.out.println(responseBody.getStatusLine());
System.out.println(qjson);
*/
responseBody.close();
stopTime = System.currentTimeMillis();
elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("Spend %f seconds to get data when devgroup = %d\n",elapseTime, ig-10);
break;
case "q2":
//count
startTime = System.currentTimeMillis();
get_url = "http://192.168.1.114:4242/api/query?";
httpPost = new HttpPost(get_url);
qjson = " {\n" +
" \"start\": 1563249700,\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"count\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupby\": false\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }";
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
stringEntity = new StringEntity(qjson);
httpPost.setEntity(stringEntity);
responseBody = httpclient.execute(httpPost);
stopTime = System.currentTimeMillis();
elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("Spend %f seconds to count data when devgroup < %d\n",elapseTime, ig);
responseBody.close();
//avg
startTime = System.currentTimeMillis();
httpPost = new HttpPost(get_url);
qjson = " {\n" +
" \"start\": 1563249700,\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"avg\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupby\": false\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }";
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
stringEntity = new StringEntity(qjson);
httpPost.setEntity(stringEntity);
responseBody = httpclient.execute(httpPost);
stopTime = System.currentTimeMillis();
elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("Spend %f seconds to avg data when devgroup < %d\n",elapseTime, ig);
responseBody.close();
//sum
startTime = System.currentTimeMillis();
httpPost = new HttpPost(get_url);
qjson = " {\n" +
" \"start\": 1563249700,\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"sum\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" +",\n" +
" \"groupby\": false\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }";
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
stringEntity = new StringEntity(qjson);
httpPost.setEntity(stringEntity);
responseBody = httpclient.execute(httpPost);
stopTime = System.currentTimeMillis();
elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("Spend %f seconds to sum data when devgroup < %d\n",elapseTime, ig);
responseBody.close();
//max
startTime = System.currentTimeMillis();
httpPost = new HttpPost(get_url);
qjson = " {\n" +
" \"start\": 1563249700,\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"max\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupby\": false\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }";
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
stringEntity = new StringEntity(qjson);
httpPost.setEntity(stringEntity);
responseBody = httpclient.execute(httpPost);
stopTime = System.currentTimeMillis();
elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("Spend %f seconds to max data when devgroup < %d\n",elapseTime, ig);
responseBody.close();
//min
startTime = System.currentTimeMillis();
httpPost = new HttpPost(get_url);
qjson = " {\n" +
" \"start\": 1563249700,\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"min\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupby\": false\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }";
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
stringEntity = new StringEntity(qjson);
httpPost.setEntity(stringEntity);
responseBody = httpclient.execute(httpPost);
responseBody.close();
stopTime = System.currentTimeMillis();
elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("Spend %f seconds to min data when devgroup < %d\n",elapseTime, ig);
responseBody.close();
break;
case "q3":
startTime = System.currentTimeMillis();
get_url = "http://192.168.1.114:4242/api/query?";
httpPost = new HttpPost(get_url);
qjson = " {\n" +
" \"start\": 1563249700,\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"count\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupBy\": true\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"aggregator\": \"sum\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupBy\": true\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"aggregator\": \"avg\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupBy\": true\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }";
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
stringEntity = new StringEntity(qjson);
httpPost.setEntity(stringEntity);
responseBody = httpclient.execute(httpPost);
/*
System.out.println(responseBody.getStatusLine());
System.out.println(qjson);
*/
stopTime = System.currentTimeMillis();
elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("Spend %f seconds to group data by devgroup when devgroup < %d\n",elapseTime, ig);
responseBody.close();
break;
case "q4":
startTime = System.currentTimeMillis();
get_url = "http://192.168.1.114:4242/api/query?";
httpPost = new HttpPost(get_url);
qjson = " {\n" +
" \"start\": 1563249700,\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"none\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupBy\": false\n" +
" }\n" +
" ],\n" +
" \"downsample\": \"1m-sum\"\n" +
" },\n" +
" {\n" +
" \"aggregator\": \"none\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupBy\": false\n" +
" }\n" +
" ],\n" +
" \"downsample\": \"1m-avg\"\n" +
" }\n" +
" ]\n" +
" }";
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
stringEntity = new StringEntity(qjson);
httpPost.setEntity(stringEntity);
responseBody = httpclient.execute(httpPost);
/*
System.out.println(responseBody.getStatusLine());
System.out.println(qjson);
*/
stopTime = System.currentTimeMillis();
elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("Spend %f seconds to group data by time when devgroup < %d\n",elapseTime, ig);
responseBody.close();
break;
}
}
httpclient.close();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("query end:----\n");
} // end write or query
System.exit(0);
}// end main
}// end class
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import lombok.extern.slf4j.Slf4j;
import net.opentsdb.core.TSDB;
import net.opentsdb.uid.NoSuchUniqueName;
import net.opentsdb.uid.UniqueId.UniqueIdType;
import net.opentsdb.utils.Config;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.HttpEntity;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.http.client.methods.*;
import java.io.BufferedWriter;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.Charset;
import java.text.DecimalFormat;
import java.util.Random;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.math.*;
import java.lang.reflect.Method;
public class WriteThread extends Thread {
private int[] wargs; // fstart, fend, rows to be read, rows perrequest
private String fdir;
private int fstart;
private int fend;
public WriteThread (int fstart, int fend,int[] wargs, String fdir) {
this.fstart = fstart;
this.fend = fend;
this.fdir = fdir;
this.wargs = wargs;
}
// begin to insert in this thread
public void run() {
StringEntity stringEntity;
String port = "4242";
String put_url = "http://192.168.1.114:"+port+"/api/put?summary";
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
/*
httpclient.getHttpConnectionManager().getParams()
.setConnectionTimeout(1000);
httpclient.getHttpConnectionManager().getParams()
.setSoTimeout(5000);
*/
for (int i=fstart; i<=fend; i++) {
String csvfile;
csvfile = fdir + "/testdata"+ Integer.toString(i)+".csv";
BufferedReader br = null;
String line = "";
String cvsSplitBy = " ";
try {
br = new BufferedReader(new FileReader(csvfile));
System.out.println("---- begin to read file " +csvfile+"\n");
for (int itotalrow =0; itotalrow<wargs[0]; itotalrow=itotalrow+wargs[1]) {
HttpPost httpPost = new HttpPost(put_url);
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
String totaljson = "[\n";
for (int irow =0; irow<wargs[1]; ++irow) {
line = br.readLine();
if (line !=null) {
String[] meter = line.split(cvsSplitBy);
// devid, devname,devgroup,ts,temperature,humidity
BigInteger timestamp = new BigInteger(meter[3]);
timestamp = timestamp.divide(BigInteger.valueOf(1000));
long ts = timestamp.longValue();
int temperature = Integer.parseInt(meter[4]);
float humidity = Float.parseFloat(meter[5]);
String onejson = " {\n" +
" \"metric\": \"temperature\",\n" +
" \"timestamp\": " + String.valueOf(ts) + ",\n" +
" \"value\": " + String.valueOf(temperature) + ",\n" +
" \"tags\" : {\n" +
" \"devid\":" +" \"" + meter[0] + "\",\n" +
" \"devname\":" +" \"" + meter[1] + "\",\n" +
" \"devgroup\":" +" \"" + meter[2] + "\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"metric\": \"humidity\",\n" +
" \"timestamp\": " + String.valueOf(ts) + ",\n" +
" \"value\": " + String.valueOf(humidity) + ",\n" +
" \"tags\" : {\n" +
" \"devid\":" +" \"" + meter[0] + "\",\n" +
" \"devname\":" +" \"" + meter[1] + "\",\n" +
" \"devgroup\":" +" \"" + meter[2] + "\"\n" +
" }\n";
if (irow == 0) {
totaljson = totaljson + onejson;
} else if (irow < wargs[1]) {
totaljson = totaljson + " },\n" + onejson;
}
} //end one line reading
} //end on batch put
totaljson = totaljson + " }\n]";
stringEntity = new StringEntity(totaljson);
httpPost.setEntity(stringEntity);
CloseableHttpResponse responseBody = httpclient.execute(httpPost);
/*
System.out.println(responseBody.getStatusLine());
System.out.println(totaljson);
*/
responseBody.close();
}// end one file reading
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}//end file iteration
httpclient.close();
} catch (Exception e) {
e.printStackTrace();
System.out.println("failed to connect");
}
}//end run
}//end class
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 日志保存路径为tomcat下面的logs下面的mobileLog文件夹,logback会自动创建文件夹,这样设置了就可以输出日志文件了
<substitutionProperty name="logbase" value="${catalina.base}/logs/mobileLog/"
/> -->
<substitutionProperty name="logbase" value="${user.dir}/logs/ " />
<!-- 这个是要配置输出文件的 -->
<jmxConfigurator />
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%date [%thread] %-5level %logger{80} - %msg%n</pattern>
</layout>
</appender>
<!-- 文件输出日志 (文件大小策略进行文件输出,超过指定大小对文件备份) -->
<appender name="logfile"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<Encoding>UTF-8</Encoding>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<File>${logbase}%d{yyyy-MM-dd}.log.html</File>
<FileNamePattern>${logbase}.%d{yyyy-MM-dd}.log.html.zip
</FileNamePattern>
</rollingPolicy>
<triggeringPolicy
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<MaxFileSize>2MB</MaxFileSize>
</triggeringPolicy>
<layout class="ch.qos.logback.classic.html.HTMLLayout">
<pattern>%date%level%thread%10logger%file%line%msg</pattern>
</layout>
</appender>
<!-- Output by Email -->
<!--
<appender name="Email" class="ch.qos.logback.classic.net.SMTPAppender">
<SMTPHost>stmp host name</SMTPHost>
<To>Email Address</To>
<To>Email Address</To>
<From>Email Address</From>
<Subject>TESTING Email Function: %logger{20} - %m</Subject>
<layout class="ch.qos.logback.classic.html.HTMLLayout">
<pattern>%date%level%thread%10logger%file%line%msg</pattern>
</layout>
</appender> -->
<!-- Output to Database -->
<!--
<appender name="DB" class="ch.qos.logback.classic.db.DBAppender">
<connectionSource class="ch.qos.logback.core.db.DriverManagerConnectionSource">
<driverClass>com.mysql.jdbc.Driver</driverClass>
<url>jdbc:mysql://localhost:3306/test</url>
<user>root</user>
<password>trend_dev</password>
</connectionSource>
</appender> -->
<root>
<level value="debug" />
<appender-ref ref="logfile" />
<appender-ref ref="logfile" />
</root>
</configuration>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册