提交 9658e5ec 编写于 作者: M Minglei Jin

Merge branch 'v2.4' into fix/TS-1286

要显示的变更太多。

To preserve performance only 1000 of 1000+ files are displayed.
......@@ -17,10 +17,3 @@
[submodule "src/plugins/taosadapter"]
path = src/plugins/taosadapter
url = https://github.com/taosdata/taosadapter
[submodule "tests"]
path = tests
url = https://github.com/taosdata/tests
branch = 2.4
[submodule "examples/rust"]
path = examples/rust
url = https://github.com/songtianyi/tdengine-rust-bindings.git
......@@ -11,7 +11,11 @@ def sync_source() {
sh '''
cd ${WKC}
[ -f src/connector/grafanaplugin/README.md ] && rm -f src/connector/grafanaplugin/README.md > /dev/null || echo "failed to remove grafanaplugin README.md"
git reset --hard >/dev/null
git reset --hard
git fetch
cd ${WK}
git reset --hard
git fetch
'''
script {
if (env.CHANGE_TARGET == 'master') {
......@@ -37,64 +41,65 @@ def sync_source() {
}
}
sh '''
export TZ=Asia/Harbin
cd ${WKC}
git reset --hard
git remote prune origin
[ -f src/connector/grafanaplugin/README.md ] && rm -f src/connector/grafanaplugin/README.md > /dev/null || echo "failed to remove grafanaplugin README.md"
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git reset --hard
git clean -dfx
git submodule update --init --recursive --remote
git submodule update --init --recursive
cd ${WK}
git reset --hard
'''
sh '''
cd ${WKCT}
git reset --hard
'''
script {
if (env.CHANGE_TARGET == 'master') {
sh '''
cd ${WK}
git checkout master
cd ${WKCT}
git checkout master
'''
} else if (env.CHANGE_TARGET == '2.0') {
sh '''
cd ${WK}
git checkout 2.0
cd ${WKCT}
git checkout 2.0
'''
} else if (env.CHANGE_TARGET == '2.4') {
sh '''
cd ${WK}
git checkout 2.4
cd ${WKCT}
git checkout 2.4
'''
} else {
sh '''
cd ${WK}
git checkout develop
cd ${WKCT}
git checkout develop
'''
}
}
sh '''
export TZ=Asia/Harbin
cd ${WK}
git pull >/dev/null
git clean -dfx
cd ${WKCT}
git pull >/dev/null
git clean -dfx
date
'''
script {
if (env.CHANGE_URL =~ /\/TDengine\//) {
sh '''
echo "match /TDengine/ repository"
cd ${WKC}
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
'''
} else if (env.CHANGE_URL =~ /\/TDinternal\//) {
sh '''
echo "match /TDinternal/ repository"
cd ${WK}
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
'''
} else {
sh '''
echo "unmatched reposiotry ${CHANGE_URL}"
'''
}
}
sh '''
cd ${WKC}
git submodule update --init --recursive
'''
}
def pre_test() {
......@@ -129,7 +134,6 @@ pipeline {
environment{
WK = '/var/data/jenkins/workspace/TDinternal'
WKC = '/var/data/jenkins/workspace/TDinternal/community'
WKCT = '/var/data/jenkins/workspace/TDinternal/community/tests'
LOGDIR = '/var/data/jenkins/workspace/log'
}
stages {
......@@ -248,7 +252,7 @@ pipeline {
}
parallel {
stage ('build worker08_arm32') {
agent {label " worker08_arm32 "}
agent {label " worker08_arm32"}
steps {
timeout(time: 20, unit: 'MINUTES') {
pre_test()
......
Subproject commit 1c8924dc668e6aa848214c2fc54e3ace3f5bf8df
Subproject commit b07ad4279e062c4593bd5f729d2c0096dd79b703
# generate debug version:
# mkdir debug; cd debug; cmake -DCMAKE_BUILD_TYPE=Debug ..
# generate release version:
# mkdir release; cd release; cmake -DCMAKE_BUILD_TYPE=Release ..
CMAKE_MINIMUM_REQUIRED(VERSION 3.0...3.20)
PROJECT(TDengine)
SET(CMAKE_C_STANDARD 11)
#ADD_SUBDIRECTORY(examples/c)
ADD_SUBDIRECTORY(tsim)
ADD_SUBDIRECTORY(test/c)
ADD_SUBDIRECTORY(comparisonTest/tdengine)
### Prepare development environment
1. sudo apt install
build-essential cmake net-tools python-pip python-setuptools python3-pip
python3-setuptools valgrind psmisc curl
2. git clone <https://github.com/taosdata/TDengine>; cd TDengine
3. mkdir debug; cd debug; cmake ..; make ; sudo make install
4. pip install ../src/connector/python ; pip3 install
../src/connector/python
5. pip install numpy; pip3 install numpy fabric2 psutil pandas(numpy is required only if you need to run querySort.py)
> Note: Both Python2 and Python3 are currently supported by the Python test
> framework. Since Python2 is no longer officially supported by Python Software
> Foundation since January 1, 2020, it is recommended that subsequent test case
> development be guaranteed to run correctly on Python3.
> For Python2, please consider being compatible if appropriate without
> additional burden.
>
> If you use some new Linux distribution like Ubuntu 20.04 which already do not
> include Python2, please do not install Python2-related packages.
>
> <https://nakedsecurity.sophos.com/2020/01/03/python-is-dead-long-live-python/> 
### How to run Python test suite
1. cd \<TDengine\>/tests/pytest
2. ./smoketest.sh \# for smoke test
3. ./smoketest.sh -g \# for memory leak detection test with valgrind
4. ./fulltest.sh \# for full test
> Note1: TDengine daemon's configuration and data files are stored in
> \<TDengine\>/sim directory. As a historical design, it's same place with
> TSIM script. So after the TSIM script ran with sudo privilege, the directory
> has been used by TSIM then the python script cannot write it by a normal
> user. You need to remove the directory completely first before running the
> Python test case. We should consider using two different locations to store
> for TSIM and Python script.
> Note2: if you need to debug crash problem with a core dump, you need
> manually edit smoketest.sh or fulltest.sh to add "ulimit -c unlimited"
> before the script line. Then you can look for the core file in
> \<TDengine\>/tests/pytest after the program crash.
### How to add a new test case
**1. TSIM test cases:**
TSIM was the testing framework has been used internally. Now it still be used to run the test cases we develop in the past as a legacy system. We are turning to use Python to develop new test case and are abandoning TSIM gradually.
**2. Python test cases:**
**2.1 Please refer to \<TDengine\>/tests/pytest/insert/basic.py to add a new
test case.** The new test case must implement 3 functions, where self.init()
and self.stop() simply copy the contents of insert/basic.py and the test
logic is implemented in self.run(). You can refer to the code in the util
directory for more information.
**2.2 Edit smoketest.sh to add the path and filename of the new test case**
Note: The Python test framework may continue to be improved in the future,
hopefully, to provide more functionality and ease of writing test cases. The
method of writing the test case above does not exclude that it will also be
affected.
**2.3 What test.py does in detail:**
test.py is the entry program for test case execution and monitoring.
test.py has the following functions.
\-f --file, Specifies the test case file name to be executed
-p --path, Specifies deployment path
\-m --master, Specifies the master server IP for cluster deployment
-c--cluster, test cluster function
-s--stop, terminates all running nodes
\-g--valgrind, load valgrind for memory leak detection test
\-h--help, display help
**2.4 What util/log.py does in detail:**
log.py is quite simple, the main thing is that you can print the output in
different colors as needed. The success() should be called for successful
test case execution and the success() will print green text. The exit() will
print red text and exit the program, exit() should be called for test
failure.
**util/log.py**
...
    def info(self, info):
        printf("%s %s" % (datetime.datetime.now(), info))
 
    def sleep(self, sec):
        printf("%s sleep %d seconds" % (datetime.datetime.now(), sec))
        time.sleep(sec)
 
    def debug(self, err):
        printf("\\033[1;36m%s %s\\033[0m" % (datetime.datetime.now(), err))
 
    def success(self, info):
        printf("\\033[1;32m%s %s\\033[0m" % (datetime.datetime.now(), info))
 
    def notice(self, err):
        printf("\\033[1;33m%s %s\\033[0m" % (datetime.datetime.now(), err))
 
    def exit(self, err):
        printf("\\033[1;31m%s %s\\033[0m" % (datetime.datetime.now(), err))
        sys.exit(1)
 
    def printNoPrefix(self, info):
        printf("\\033[1;36m%s\\033[0m" % (info)
...
**2.5 What util/sql.py does in detail:**
SQL.py is mainly used to execute SQL statements to manipulate the database,
and the code is extracted and commented as follows:
**util/sql.py**
\# prepare() is mainly used to set up the environment for testing table and
data, and to set up the database db for testing. do not call prepare() if you
need to test the database operation command.
def prepare(self):
tdLog.info("prepare database:db")
self.cursor.execute('reset query cache')
self.cursor.execute('drop database if exists db')
self.cursor.execute('create database db')
self.cursor.execute('use db')
...
\# query() is mainly used to execute select statements for normal syntax input
def query(self, sql):
...
\# error() is mainly used to execute the select statement with the wrong syntax
input, the error will be caught as a reasonable behavior, if not caught it will
prove that the test failed
def error()
...
\# checkRows() is used to check the number of returned lines after calling
query(select ...) after calling the query(select ...) to check the number of
rows of returned results.
def checkRows(self, expectRows):
...
\# checkData() is used to check the returned result data after calling
query(select ...) after the query(select ...) is called, failure to meet
expectation is
def checkData(self, row, col, data):
...
\# getData() returns the result data after calling query(select ...) to return
the resulting data after calling query(select ...)
def getData(self, row, col):
...
\# execute() used to execute sql and return the number of affected rows
def execute(self, sql):
...
\# executeTimes() Multiple executions of the same sql statement
def executeTimes(self, sql, times):
...
\# CheckAffectedRows() Check if the number of affected rows is as expected
def checkAffectedRows(self, expectAffectedRows):
...
### CI submission adoption principle.
- Every commit / PR compilation must pass. Currently, the warning is treated
as an error, so the warning must also be resolved.
- Test cases that already exist must pass.
- Because CI is very important to support build and automatically test
procedure, it is necessary to manually test the test case before adding it
and do as many iterations as possible to ensure that the test case provides
stable and reliable test results when added.
> Note: In the future, according to the requirements and test development
> progress will add stress testing, performance testing, code style,
> and other features based on functional testing.
def pre_test(){
sh '''
sudo rmtaos||echo 'no taosd installed'
'''
sh '''
cd ${WKC}
git reset --hard
git checkout $BRANCH_NAME
git pull
git submodule update
cd ${WK}
git reset --hard
git checkout $BRANCH_NAME
git pull
export TZ=Asia/Harbin
date
rm -rf ${WK}/debug
mkdir debug
cd debug
cmake .. > /dev/null
make > /dev/null
make install > /dev/null
pip3 install ${WKC}/src/connector/python
'''
return 1
}
def pre_test_p(){
sh '''
sudo rmtaos||echo 'no taosd installed'
'''
sh '''
cd ${WKC}
git reset --hard
git checkout $BRANCH_NAME
git pull
git submodule update
cd ${WK}
git reset --hard
git checkout $BRANCH_NAME
git pull
export TZ=Asia/Harbin
date
rm -rf ${WK}/debug
mkdir debug
cd debug
cmake .. > /dev/null
make > /dev/null
make install > /dev/null
pip3 install ${WKC}/src/connector/python
'''
return 1
}
pipeline {
agent none
environment{
WK = '/data/lib/jenkins/workspace/TDinternal'
WKC= '/data/lib/jenkins/workspace/TDinternal/community'
}
stages {
stage('Parallel test stage') {
parallel {
stage('pytest') {
agent{label 'slad1'}
steps {
pre_test_p()
sh '''
cd ${WKC}/tests
find pytest -name '*'sql|xargs rm -rf
./test-all.sh pytest
date'''
}
}
stage('test_b1') {
agent{label 'slad2'}
steps {
pre_test()
sh '''
cd ${WKC}/tests
./test-all.sh b1
date'''
}
}
stage('test_crash_gen') {
agent{label "slad3"}
steps {
pre_test()
sh '''
cd ${WKC}/tests/pytest
'''
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
./crash_gen.sh -a -p -t 4 -s 2000
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
rm -rf /var/lib/taos/*
rm -rf /var/log/taos/*
./handle_crash_gen_val_log.sh
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
rm -rf /var/lib/taos/*
rm -rf /var/log/taos/*
./handle_taosd_val_log.sh
'''
}
sh'''
nohup taosd >/dev/null &
sleep 10
'''
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/gotest
bash batchtest.sh
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/examples/python/PYTHONConnectorChecker
python3 PythonChecker.py
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/examples/JDBC/JDBCDemo/
mvn clean package >/dev/null
java -jar target/JdbcRestfulDemo-jar-with-dependencies.jar
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cp -rf ${WKC}/tests/examples/nodejs ${JENKINS_HOME}/workspace/
cd ${JENKINS_HOME}/workspace/nodejs
node nodejsChecker.js host=localhost
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${JENKINS_HOME}/workspace/C#NET/src/CheckC#
dotnet run
'''
}
sh '''
pkill -9 taosd || echo 1
cd ${WKC}/tests
./test-all.sh b2
date
'''
sh '''
cd ${WKC}/tests
./test-all.sh full unit
date'''
}
}
stage('test_valgrind') {
agent{label "slad4"}
steps {
pre_test()
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
nohup taosd >/dev/null &
sleep 10
python3 concurrent_inquiry.py -c 1
'''
}
sh '''
cd ${WKC}/tests
./test-all.sh full jdbc
date'''
sh '''
cd ${WKC}/tests/pytest
./valgrind-test.sh 2>&1 > mem-error-out.log
./handle_val_log.sh
date
cd ${WKC}/tests
./test-all.sh b3
date'''
sh '''
date
cd ${WKC}/tests
./test-all.sh full example
date'''
}
}
stage('arm64_build'){
agent{label 'arm64'}
steps{
sh '''
cd ${WK}
git fetch
git checkout develop
git pull
cd ${WKC}
git fetch
git checkout develop
git pull
git submodule update
cd ${WKC}/packaging
./release.sh -v cluster -c aarch64 -n 2.0.0.0 -m 2.0.0.0
'''
}
}
stage('arm32_build'){
agent{label 'arm32'}
steps{
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WK}
git fetch
git checkout develop
git pull
cd ${WKC}
git fetch
git checkout develop
git pull
git submodule update
cd ${WKC}/packaging
./release.sh -v cluster -c aarch32 -n 2.0.0.0 -m 2.0.0.0
'''
}
}
}
}
}
}
post {
success {
emailext (
subject: "PR-result: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]' SUCCESS",
body: """<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
</head>
<body leftmargin="8" marginwidth="0" topmargin="8" marginheight="4" offset="0">
<table width="95%" cellpadding="0" cellspacing="0" style="font-size: 16pt; font-family: Tahoma, Arial, Helvetica, sans-serif">
<tr>
<td><br />
<b><font color="#0B610B"><font size="6">构建信息</font></font></b>
<hr size="2" width="100%" align="center" /></td>
</tr>
<tr>
<td>
<ul>
<div style="font-size:18px">
<li>构建名称>>分支:${env.BRANCH_NAME}</li>
<li>构建结果:<span style="color:green"> Successful </span></li>
<li>构建编号:${BUILD_NUMBER}</li>
<li>触发用户:${env.CHANGE_AUTHOR}</li>
<li>提交信息:${env.CHANGE_TITLE}</li>
<li>构建地址:<a href=${BUILD_URL}>${BUILD_URL}</a></li>
<li>构建日志:<a href=${BUILD_URL}console>${BUILD_URL}console</a></li>
</div>
</ul>
</td>
</tr>
</table></font>
</body>
</html>""",
to: "yqliu@taosdata.com,pxiao@taosdata.com",
from: "support@taosdata.com"
)
}
failure {
emailext (
subject: "PR-result: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]' FAIL",
body: """<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
</head>
<body leftmargin="8" marginwidth="0" topmargin="8" marginheight="4" offset="0">
<table width="95%" cellpadding="0" cellspacing="0" style="font-size: 16pt; font-family: Tahoma, Arial, Helvetica, sans-serif">
<tr>
<td><br />
<b><font color="#0B610B"><font size="6">构建信息</font></font></b>
<hr size="2" width="100%" align="center" /></td>
</tr>
<tr>
<td>
<ul>
<div style="font-size:18px">
<li>构建名称>>分支:${env.BRANCH_NAME}</li>
<li>构建结果:<span style="color:red"> Failure </span></li>
<li>构建编号:${BUILD_NUMBER}</li>
<li>触发用户:${env.CHANGE_AUTHOR}</li>
<li>提交信息:${env.CHANGE_TITLE}</li>
<li>构建地址:<a href=${BUILD_URL}>${BUILD_URL}</a></li>
<li>构建日志:<a href=${BUILD_URL}console>${BUILD_URL}console</a></li>
</div>
</ul>
</td>
</tr>
</table></font>
</body>
</html>""",
to: "yqliu@taosdata.com,pxiao@taosdata.com",
from: "support@taosdata.com"
)
}
}
}
\ No newline at end of file
datastax-java-driver {
basic.request {
timeout = 200000 seconds
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cassandra.test</groupId>
<artifactId>cassandratest</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-plugins</artifactId>
<version>30</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<archive>
<manifest>
<mainClass>CassandraTest</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<name>cassandratest</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-mapper-runtime</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>2.8.2</version>
</dependency>
</dependencies>
</project>
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.session.*;
import com.datastax.oss.driver.api.core.config.*;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
//import com.datastax.driver.core.Cluster;
//import com.datastax.driver.core.Cluster;
import java.io.BufferedWriter;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.FileReader;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.Random;
import java.math.*;
import java.lang.reflect.Method;
public class CassandraTest{
public static void main(String args[]) {
// begin to parse argument
String datadir = "/home/ubuntu/testdata";
String sqlfile = "/home/ubuntu/fang/cassandra/q1.txt";
String cfgfile = "/home/ubuntu/fang/cassandra/application.conf";
boolean q4flag = false;
int numOfRows = 1000000;
int numOfFiles =0;
int numOfClients =0;
int rowsPerRequest =0;
for (int i = 0; i < args.length; ++i) {
if (args[i].equalsIgnoreCase("-dataDir")) {
if (i < args.length - 1) {
datadir = args[++i];
}
} else if (args[i].equalsIgnoreCase("-numofFiles")) {
if (i < args.length - 1) {
numOfFiles = Integer.parseInt(args[++i]);
}
} else if (args[i].equalsIgnoreCase("-rowsPerRequest")) {
if (i < args.length - 1) {
rowsPerRequest = Integer.parseInt(args[++i]);
}
} else if (args[i].equalsIgnoreCase("-writeClients")) {
if (i < args.length - 1) {
numOfClients = Integer.parseInt(args[++i]);
}
} else if (args[i].equalsIgnoreCase("-sql")) {
sqlfile = args[++i];
} else if (args[i].equalsIgnoreCase("-timetest")) {
q4flag = true;
} else if (args[i].equalsIgnoreCase("-conf")) {
cfgfile = args[++i];
}
}
// file below to make sure no timeout error
File confile = new File(cfgfile);
System.out.println("parameters\n");
if (numOfFiles >0) {
// write data
System.out.printf("----dataDir:%s\n", datadir);
System.out.printf("----numOfFiles:%d\n", numOfFiles);
System.out.printf("----numOfClients:%d\n", numOfClients);
System.out.printf("----rowsPerRequest:%d\n", rowsPerRequest);
// connect to cassandra server
System.out.printf("----connecting to cassandra server\n");
try {
CqlSession session = CqlSession.builder()
.withConfigLoader(DriverConfigLoader.fromFile(confile))
.build();
session.execute("drop keyspace if exists cassandra");
session.execute("CREATE KEYSPACE if not exists cassandra WITH replication = {'class':'SimpleStrategy', 'replication_factor':1}");
if (q4flag) {
session.execute("create table if not exists cassandra.test (devid int, devname text, devgroup int, ts bigint, minute bigint, temperature int, humidity float ,primary key (minute,ts,devgroup,devid,devname))");
} else {
session.execute("create table if not exists cassandra.test (devid int, devname text, devgroup int, ts bigint, temperature int, humidity float ,primary key (devgroup,devid,devname,ts))");
}
session.close();
System.out.printf("----created keyspace cassandra and table test\n");
// begin to insert data
System.out.printf("----begin to insert data\n");
long startTime = System.currentTimeMillis();
int a = numOfFiles/numOfClients;
int b = numOfFiles%numOfClients;
int last = 0;
WriteThread[] writethreads = new WriteThread[numOfClients];
int[] wargs = new int[2]; // data file start, end
wargs[0] = numOfRows; //rows to be read from each file
wargs[1] = rowsPerRequest;
int fstart =0;
int fend =0;
for (int i = 0; i<numOfClients; ++i) {
if (i<b) {
fstart = last;
fend = last+a;
last = last+a+1;
writethreads[i] = new WriteThread(fstart,fend,wargs,datadir,q4flag);
System.out.printf("----Thread %d begin to write\n",i);
writethreads[i].start();
} else {
fstart = last;
fend = last+a-1;
last = last+a;
writethreads[i] = new WriteThread(fstart,fend,wargs,datadir,q4flag);
System.out.printf("----Thread %d begin to write\n",i);
writethreads[i].start();
}
}
for (int i =0; i<numOfClients; ++i) {
try {
writethreads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long stopTime = System.currentTimeMillis();
float elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
float speeds = numOfRows*numOfFiles/elapseTime;
System.out.printf("---- insertation speed: %f Rows/Second\n",speeds);
} catch (Exception ex) {
ex.printStackTrace();
System.exit(1);
} finally {
System.out.printf("---- insertion end\n");
}
// above:write part; below: read part;
} else {
// query data begin
System.out.printf("----sql command file:%s\n", sqlfile);
// connect to cassandra server
try {
CqlSession session = CqlSession.builder()
.withConfigLoader(DriverConfigLoader.fromFile(confile))
.build();
//session.execute("use cassandra;");
BufferedReader br = null;
String line = "";
try {
br = new BufferedReader(new FileReader(sqlfile));
while ((line = br.readLine()) != null && line.length()>10) {
long startTime = System.currentTimeMillis();
// begin to query one line command //
// end querying one line command
try {
ResultSet results = session.execute(line);
long icounter = 0;
for (Row row : results) {
icounter++;
}
long stopTime = System.currentTimeMillis();
float elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("----spend %f seconds to query: %s\n", elapseTime, line);
} catch (Exception ex) {
ex.printStackTrace();
System.out.printf("---- query failed!\n");
System.exit(1);
}
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
session.close();
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
System.out.println("query end:----\n");
}
} // end write or query
System.exit(0);
}// end main
}// end class
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.math.*;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.session.*;
import com.datastax.oss.driver.api.core.config.*;
public class WriteThread extends Thread {
private int[] wargs; // fstart, fend, rows to be read, rows perrequest
private String fdir;
private int fstart;
private int fend;
private boolean q4flag;
public WriteThread (int fstart, int fend,int[] wargs, String fdir, boolean q4flag) {
this.fstart = fstart;
this.fend = fend;
this.fdir = fdir;
this.wargs = wargs;
this.q4flag = q4flag;
}
// begin to insert in this thread
public void run() {
/*
// this configuration file makes sure no timeout error
File confile = new File("/home/ubuntu/fang/cassandra/application.conf");
*/
// connect to server
try {
CqlSession session = CqlSession.builder()
//.withConfigLoader(DriverConfigLoader.fromFile(confile))
.build();
//session.execute("use cassandra");
int tominute = 6000;
for (int i=fstart; i<=fend; i++) {
String csvfile;
csvfile = fdir + "/testdata"+ Integer.toString(i)+".csv";
BufferedReader br = null;
String line = "";
String cvsSplitBy = " ";
try {
br = new BufferedReader(new FileReader(csvfile));
System.out.println("---- begin to read file " +csvfile+"\n");
for (int itotalrow =0; itotalrow<wargs[0]; itotalrow=itotalrow+wargs[1]) {
String cqlstr = "BEGIN BATCH ";
for (int irow =0; irow<wargs[1]; ++irow) {
line = br.readLine();
if (line !=null) {
String[] meter = line.split(cvsSplitBy);
BigInteger tminute = new BigInteger(meter[3]);
tminute = tminute.divide(BigInteger.valueOf(tominute));
if (q4flag) {
cqlstr = cqlstr + "insert into cassandra.test (devid,devname,devgroup,ts, minute,temperature,humidity) values ";
cqlstr = cqlstr +"("+meter[0] +"," +"'" +meter[1] +"'" +"," +meter[2] +"," + meter[3] +",";
cqlstr = cqlstr +tminute.toString() +"," +meter[4] +"," +meter[5] +");";
} else {
cqlstr = cqlstr + "insert into cassandra.test (devid,devname,devgroup,ts,temperature,humidity) values ";
cqlstr = cqlstr +"("+meter[0] +"," +"'" +meter[1] +"'" +"," +meter[2] +"," + meter[3] +",";
cqlstr = cqlstr +meter[4] +"," +meter[5] +");";
}
} // if this line is not null
}//end row iteration in one batch
cqlstr = cqlstr+" APPLY BATCH;";
try {
//System.out.println(cqlstr+"----\n");
session.execute(cqlstr);
} catch (Exception ex) {
ex.printStackTrace();
}
}// end one file reading
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}//end file iteration
session.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}//end run
}//end class
select * from cassandra.test where devgroup=0 allow filtering;
select * from cassandra.test where devgroup=10 allow filtering;
select * from cassandra.test where devgroup=20 allow filtering;
select * from cassandra.test where devgroup=30 allow filtering;
select * from cassandra.test where devgroup=40 allow filtering;
select * from cassandra.test where devgroup=50 allow filtering;
select * from cassandra.test where devgroup=60 allow filtering;
select * from cassandra.test where devgroup=70 allow filtering;
select * from cassandra.test where devgroup=80 allow filtering;
select * from cassandra.test where devgroup=90 allow filtering;
select count(*) from cassandra.test where devgroup<10 allow filtering;
select count(*) from cassandra.test where devgroup<20 allow filtering;
select count(*) from cassandra.test where devgroup<30 allow filtering;
select count(*) from cassandra.test where devgroup<40 allow filtering;
select count(*) from cassandra.test where devgroup<50 allow filtering;
select count(*) from cassandra.test where devgroup<60 allow filtering;
select count(*) from cassandra.test where devgroup<70 allow filtering;
select count(*) from cassandra.test where devgroup<80 allow filtering;
select count(*) from cassandra.test where devgroup<90 allow filtering;
select count(*) from cassandra.test allow filtering;
select avg(temperature) from cassandra.test where devgroup<10 allow filtering;
select avg(temperature) from cassandra.test where devgroup<20 allow filtering;
select avg(temperature) from cassandra.test where devgroup<30 allow filtering;
select avg(temperature) from cassandra.test where devgroup<40 allow filtering;
select avg(temperature) from cassandra.test where devgroup<50 allow filtering;
select avg(temperature) from cassandra.test where devgroup<60 allow filtering;
select avg(temperature) from cassandra.test where devgroup<70 allow filtering;
select avg(temperature) from cassandra.test where devgroup<80 allow filtering;
select avg(temperature) from cassandra.test where devgroup<90 allow filtering;
select avg(temperature) from cassandra.test allow filtering;
select sum(temperature) from cassandra.test where devgroup<10 allow filtering;
select sum(temperature) from cassandra.test where devgroup<20 allow filtering;
select sum(temperature) from cassandra.test where devgroup<30 allow filtering;
select sum(temperature) from cassandra.test where devgroup<40 allow filtering;
select sum(temperature) from cassandra.test where devgroup<50 allow filtering;
select sum(temperature) from cassandra.test where devgroup<60 allow filtering;
select sum(temperature) from cassandra.test where devgroup<70 allow filtering;
select sum(temperature) from cassandra.test where devgroup<80 allow filtering;
select sum(temperature) from cassandra.test where devgroup<90 allow filtering;
select sum(temperature) from cassandra.test allow filtering;
select max(temperature) from cassandra.test where devgroup<10 allow filtering;
select max(temperature) from cassandra.test where devgroup<20 allow filtering;
select max(temperature) from cassandra.test where devgroup<30 allow filtering;
select max(temperature) from cassandra.test where devgroup<40 allow filtering;
select max(temperature) from cassandra.test where devgroup<50 allow filtering;
select max(temperature) from cassandra.test where devgroup<60 allow filtering;
select max(temperature) from cassandra.test where devgroup<70 allow filtering;
select max(temperature) from cassandra.test where devgroup<80 allow filtering;
select max(temperature) from cassandra.test where devgroup<90 allow filtering;
select max(temperature) from cassandra.test allow filtering;
select min(temperature) from cassandra.test where devgroup<10 allow filtering;
select min(temperature) from cassandra.test where devgroup<20 allow filtering;
select min(temperature) from cassandra.test where devgroup<30 allow filtering;
select min(temperature) from cassandra.test where devgroup<40 allow filtering;
select min(temperature) from cassandra.test where devgroup<50 allow filtering;
select min(temperature) from cassandra.test where devgroup<60 allow filtering;
select min(temperature) from cassandra.test where devgroup<70 allow filtering;
select min(temperature) from cassandra.test where devgroup<80 allow filtering;
select min(temperature) from cassandra.test where devgroup<90 allow filtering;
select min(temperature) from cassandra.test allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<10 group by devgroup allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<20 group by devgroup allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<30 group by devgroup allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<40 group by devgroup allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<50 group by devgroup allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<60 group by devgroup allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<70 group by devgroup allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<80 group by devgroup allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<90 group by devgroup allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test group by devgroup allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<10 group by minute allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<20 group by minute allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<30 group by minute allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<40 group by minute allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<50 group by minute allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<60 group by minute allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<70 group by minute allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<80 group by minute allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test where devgroup<90 group by minute allow filtering;
select count(temperature), sum(temperature), avg(temperature) from cassandra.test group by minute;
package com.taosdata.generator;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.Random;
public class DataGenerator {
/*
* to simulate the change action of humidity The valid range of humidity is
* [0, 100]
*/
public static class ValueGen {
int center;
int range;
Random rand;
public ValueGen(int center, int range) {
this.center = center;
this.range = range;
this.rand = new Random();
}
double next() {
double v = this.rand.nextGaussian();
if (v < -3) {
v = -3;
}
if (v > 3) {
v = 3;
}
return (this.range / 3.00) * v + center;
}
}
// data scale
private static int timestep = 1000; // sample time interval in milliseconds
private static long dataStartTime = 1563249700000L;
private static int deviceId = 0;
private static String tagPrefix = "dev_";
// MachineNum RowsPerMachine MachinesInOneFile
public static void main(String args[]) {
int numOfDevice = 10000;
int numOfFiles = 100;
int rowsPerDevice = 10000;
String directory = "~/";
for (int i = 0; i < args.length; i++) {
if (args[i].equalsIgnoreCase("-numOfDevices")) {
if (i < args.length - 1) {
numOfDevice = Integer.parseInt(args[++i]);
} else {
System.out.println("'-numOfDevices' requires a parameter, default is 10000");
}
} else if (args[i].equalsIgnoreCase("-numOfFiles")) {
if (i < args.length - 1) {
numOfFiles = Integer.parseInt(args[++i]);
} else {
System.out.println("'-numOfFiles' requires a parameter, default is 100");
}
} else if (args[i].equalsIgnoreCase("-rowsPerDevice")) {
if (i < args.length - 1) {
rowsPerDevice = Integer.parseInt(args[++i]);
} else {
System.out.println("'-rowsPerDevice' requires a parameter, default is 10000");
}
} else if (args[i].equalsIgnoreCase("-dataDir")) {
if (i < args.length - 1) {
directory = args[++i];
} else {
System.out.println("'-dataDir' requires a parameter, default is ~/testdata");
}
}
}
System.out.println("parameters");
System.out.printf("----dataDir:%s\n", directory);
System.out.printf("----numOfFiles:%d\n", numOfFiles);
System.out.printf("----numOfDevice:%d\n", numOfDevice);
System.out.printf("----rowsPerDevice:%d\n", rowsPerDevice);
int numOfDevPerFile = numOfDevice / numOfFiles;
long ts = dataStartTime;
// deviceId, time stamp, humid(int), temp(double), tagString(dev_deviceid)
int humidityDistRadius = 35;
int tempDistRadius = 17;
for (int i = 0; i < numOfFiles; ++i) { // prepare the data file
dataStartTime = ts;
// generate file name
String path = directory;
try {
path += "/testdata" + String.valueOf(i) + ".csv";
getDataInOneFile(path, rowsPerDevice, numOfDevPerFile, humidityDistRadius, tempDistRadius);
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static void getDataInOneFile(String path, int rowsPerDevice, int num, int humidityDistRadius, int tempDistRadius) throws IOException {
DecimalFormat df = new DecimalFormat("0.0000");
long startTime = dataStartTime;
FileWriter fw = new FileWriter(new File(path));
BufferedWriter bw = new BufferedWriter(fw);
for (int i = 0; i < num; ++i) {
deviceId += 1;
Random rand = new Random();
double centralVal = Math.abs(rand.nextInt(100));
if (centralVal < humidityDistRadius) {
centralVal = humidityDistRadius;
}
if (centralVal + humidityDistRadius > 100) {
centralVal = 100 - humidityDistRadius;
}
DataGenerator.ValueGen humidityDataGen = new DataGenerator.ValueGen((int) centralVal, humidityDistRadius);
dataStartTime = startTime;
centralVal = Math.abs(rand.nextInt(22));
DataGenerator.ValueGen tempDataGen = new DataGenerator.ValueGen((int) centralVal, tempDistRadius);
for (int j = 0; j < rowsPerDevice; ++j) {
int humidity = (int) humidityDataGen.next();
double temp = tempDataGen.next();
int deviceGroup = deviceId % 100;
StringBuffer sb = new StringBuffer();
sb.append(deviceId).append(" ").append(tagPrefix).append(deviceId).append(" ").append(deviceGroup)
.append(" ").append(dataStartTime).append(" ").append(humidity).append(" ")
.append(df.format(temp));
bw.write(sb.toString());
bw.write("\n");
dataStartTime += timestep;
}
}
bw.close();
fw.close();
System.out.printf("file:%s generated\n", path);
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"bufio"
"flag"
"fmt"
"log"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/influxdata/influxdb1-client/v2"
)
type ProArgs struct {
host string
username string
password string
db string
sql string
dataDir string
filesNum int
writeClients int
rowsPerRequest int
}
type WriteInfo struct {
threadId int
sID int
eID int
}
type StatisInfo struct {
totalRows int64
}
var statis StatisInfo
func main() {
// Configuration
var arguments ProArgs
// Parse options
flag.StringVar(&(arguments.host), "host", "http://localhost:8086", "Server host to connect")
flag.StringVar(&(arguments.db), "db", "db", "DB to insert data")
flag.StringVar(&(arguments.username), "user", "", "Username used to connect to server")
flag.StringVar(&(arguments.password), "pass", "", "Password used to connect to server")
flag.StringVar(&(arguments.sql), "sql", "./sqlCmd.txt", "File name of SQL commands")
flag.StringVar(&(arguments.dataDir), "dataDir", "./testdata", "Raw csv data")
flag.IntVar(&(arguments.filesNum), "numOfFiles", 10, "Number of files int dataDir ")
flag.IntVar(&(arguments.writeClients), "writeClients", 0, "Number of write clients")
flag.IntVar(&(arguments.rowsPerRequest), "rowsPerRequest", 100, "Number of rows per request")
flag.Parse()
statis.totalRows = 0
if arguments.writeClients > 0 {
writeData(&arguments)
} else {
readData(&arguments)
}
}
func writeData(arguments *ProArgs) {
log.Println("write data")
log.Println("---- writeClients:", arguments.writeClients)
log.Println("---- dataDir:", arguments.dataDir)
log.Println("---- numOfFiles:", arguments.filesNum)
log.Println("---- rowsPerRequest:", arguments.rowsPerRequest)
var wg sync.WaitGroup
wg.Add(arguments.writeClients)
st := time.Now()
a := arguments.filesNum / arguments.writeClients
b := arguments.filesNum % arguments.writeClients
last := 0
for i := 0; i < arguments.writeClients; i++ {
var wInfo WriteInfo
wInfo.threadId = i + 1
wInfo.sID = last
if i < b {
wInfo.eID = last + a
} else {
wInfo.eID = last + a - 1
}
last = wInfo.eID + 1
go writeDataImp(&wInfo, &wg, arguments)
}
wg.Wait()
elapsed := time.Since(st)
seconds := float64(elapsed) / float64(time.Second)
log.Println("---- Spent", seconds, "seconds to insert", statis.totalRows, "records, speed:", float64(statis.totalRows)/seconds, "Rows/Second")
}
func writeDataImp(wInfo *WriteInfo, wg *sync.WaitGroup, arguments *ProArgs) {
defer wg.Done()
log.Println("Thread", wInfo.threadId, "writing sID", wInfo.sID, "eID", wInfo.eID)
// Connect to the server
conn, err := client.NewHTTPClient(client.HTTPConfig{
Addr: arguments.host,
Username: arguments.username,
Password: arguments.password,
Timeout: 300 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// Create database
_, err = queryDB(conn, fmt.Sprintf("create database %s", arguments.db), arguments.db)
if err != nil {
log.Fatal(err)
}
// Write data
counter := 0
totalRecords := 0
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: arguments.db,
Precision: "ms",
})
if err != nil {
log.Fatal(err)
}
for j := wInfo.sID; j <= wInfo.eID; j++ {
fileName := fmt.Sprintf("%s/testdata%d.csv", arguments.dataDir, j)
fs, err := os.Open(fileName)
if err != nil {
log.Printf("failed to open file %s", fileName)
log.Fatal(err)
}
log.Printf("open file %s success", fileName)
bfRd := bufio.NewReader(fs)
for {
sline, err := bfRd.ReadString('\n')
if err != nil {
break
}
sline = strings.TrimSuffix(sline, "\n")
s := strings.Split(sline, " ")
if len(s) != 6 {
continue
}
// Create a point and add to batch
tags := map[string]string{
"devid": s[0],
"devname": s[1],
"devgroup": s[2],
}
timestamp, _ := strconv.ParseInt(s[3], 10, 64)
temperature, _ := strconv.ParseInt(s[4], 10, 32)
humidity, _ := strconv.ParseFloat(s[5], 64)
fields := map[string]interface{}{
"temperature": temperature,
"humidity": humidity,
}
pt, err := client.NewPoint("devices", tags, fields, time.Unix(0, timestamp * int64(time.Millisecond)))
if err != nil {
log.Fatalln("Error: ", err)
}
bp.AddPoint(pt)
counter++
if counter >= arguments.rowsPerRequest {
if err := conn.Write(bp); err != nil {
log.Fatal(err)
}
totalRecords += counter
counter = 0
bp, err = client.NewBatchPoints(client.BatchPointsConfig{
Database: arguments.db,
Precision: "ms",
})
if err != nil {
log.Fatal(err)
}
}
}
fs.Close()
}
totalRecords += counter
if counter > 0 {
if err := conn.Write(bp); err != nil {
log.Fatal(err)
}
}
atomic.AddInt64(&statis.totalRows, int64(totalRecords))
}
func readData(arguments *ProArgs) {
log.Println("read data")
log.Println("---- sql:", arguments.sql)
conn, err := client.NewHTTPClient(client.HTTPConfig{
Addr: arguments.host,
Username: arguments.username,
Password: arguments.password,
Timeout: 300 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer conn.Close()
fs, err := os.Open(arguments.sql)
if err != nil {
log.Printf("failed to open file %s", arguments.sql)
log.Fatal(err)
}
log.Printf("open file %s success", arguments.sql)
bfRd := bufio.NewReader(fs)
for {
sline, err := bfRd.ReadString('\n')
if err != nil {
break
}
sline = strings.TrimSuffix(sline, "\n")
st := time.Now()
_, err = queryDB(conn, sline, arguments.db)
if err != nil {
log.Fatal(err)
}
elapsed := time.Since(st)
seconds := float64(elapsed) / float64(time.Second)
log.Println("---- Spent", seconds, "seconds to query ", sline)
}
}
func queryDB(conn client.Client, cmd string, db string) (res []client.Result, err error) {
query := client.Query{
Command: cmd,
Database: db,
}
response, err := conn.Query(query)
if err == nil {
if response.Error() != nil {
return res, response.Error()
}
res = response.Results
} else {
return res, err
}
return res, nil
}
select * from devices where devgroup='0';
select * from devices where devgroup='10';
select * from devices where devgroup='20';
select * from devices where devgroup='30';
select * from devices where devgroup='40';
select * from devices where devgroup='50';
select * from devices where devgroup='60';
select * from devices where devgroup='70';
select * from devices where devgroup='80';
select * from devices where devgroup='90';
select count(temperature) from devices where devgroup=~/[1-1][0-9]/;
select count(temperature) from devices where devgroup=~/[1-2][0-9]/;
select count(temperature) from devices where devgroup=~/[1-3][0-9]/;
select count(temperature) from devices where devgroup=~/[1-4][0-9]/;
select count(temperature) from devices where devgroup=~/[1-5][0-9]/;
select count(temperature) from devices where devgroup=~/[1-6][0-9]/;
select count(temperature) from devices where devgroup=~/[1-7][0-9]/;
select count(temperature) from devices where devgroup=~/[1-8][0-9]/;
select count(temperature) from devices where devgroup=~/[1-9][0-9]/;
select count(temperature) from devices;
select mean(temperature) from devices where devgroup=~/[1-1][0-9]/;
select mean(temperature) from devices where devgroup=~/[1-2][0-9]/;
select mean(temperature) from devices where devgroup=~/[1-3][0-9]/;
select mean(temperature) from devices where devgroup=~/[1-4][0-9]/;
select mean(temperature) from devices where devgroup=~/[1-5][0-9]/;
select mean(temperature) from devices where devgroup=~/[1-6][0-9]/;
select mean(temperature) from devices where devgroup=~/[1-7][0-9]/;
select mean(temperature) from devices where devgroup=~/[1-8][0-9]/;
select mean(temperature) from devices where devgroup=~/[1-9][0-9]/;
select mean(temperature) from devices;
select sum(temperature) from devices where devgroup=~/[1-1][0-9]/;
select sum(temperature) from devices where devgroup=~/[1-2][0-9]/;
select sum(temperature) from devices where devgroup=~/[1-3][0-9]/;
select sum(temperature) from devices where devgroup=~/[1-4][0-9]/;
select sum(temperature) from devices where devgroup=~/[1-5][0-9]/;
select sum(temperature) from devices where devgroup=~/[1-6][0-9]/;
select sum(temperature) from devices where devgroup=~/[1-7][0-9]/;
select sum(temperature) from devices where devgroup=~/[1-8][0-9]/;
select sum(temperature) from devices where devgroup=~/[1-9][0-9]/;
select sum(temperature) from devices;
select max(temperature) from devices where devgroup=~/[1-1][0-9]/;
select max(temperature) from devices where devgroup=~/[1-2][0-9]/;
select max(temperature) from devices where devgroup=~/[1-3][0-9]/;
select max(temperature) from devices where devgroup=~/[1-4][0-9]/;
select max(temperature) from devices where devgroup=~/[1-5][0-9]/;
select max(temperature) from devices where devgroup=~/[1-6][0-9]/;
select max(temperature) from devices where devgroup=~/[1-7][0-9]/;
select max(temperature) from devices where devgroup=~/[1-8][0-9]/;
select max(temperature) from devices where devgroup=~/[1-9][0-9]/;
select max(temperature) from devices;
select min(temperature) from devices where devgroup=~/[1-1][0-9]/;
select min(temperature) from devices where devgroup=~/[1-2][0-9]/;
select min(temperature) from devices where devgroup=~/[1-3][0-9]/;
select min(temperature) from devices where devgroup=~/[1-4][0-9]/;
select min(temperature) from devices where devgroup=~/[1-5][0-9]/;
select min(temperature) from devices where devgroup=~/[1-6][0-9]/;
select min(temperature) from devices where devgroup=~/[1-7][0-9]/;
select min(temperature) from devices where devgroup=~/[1-8][0-9]/;
select min(temperature) from devices where devgroup=~/[1-9][0-9]/;
select min(temperature) from devices;
select spread(temperature) from devices where devgroup=~/[1-1][0-9]/;
select spread(temperature) from devices where devgroup=~/[1-2][0-9]/;
select spread(temperature) from devices where devgroup=~/[1-3][0-9]/;
select spread(temperature) from devices where devgroup=~/[1-4][0-9]/;
select spread(temperature) from devices where devgroup=~/[1-5][0-9]/;
select spread(temperature) from devices where devgroup=~/[1-6][0-9]/;
select spread(temperature) from devices where devgroup=~/[1-7][0-9]/;
select spread(temperature) from devices where devgroup=~/[1-8][0-9]/;
select spread(temperature) from devices where devgroup=~/[1-9][0-9]/;
select spread(temperature) from devices;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-1][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-2][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-3][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-4][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-5][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-6][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-7][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-8][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-9][0-9]/ group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices group by devgroup;
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-1][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-2][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-3][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-4][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-5][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-6][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-7][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-8][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices where devgroup=~/[1-9][0-9]/ group by time(1m);
select count(temperature), sum(temperature), mean(temperature) from devices group by time(1m);
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.opentsdb.test</groupId>
<artifactId>opentsdbtest</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-plugins</artifactId>
<version>30</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<archive>
<manifest>
<mainClass>OpentsdbTest</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<name>opentsdbtest</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>net.opentsdb</groupId>
<artifactId>opentsdb_gwt_theme</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>net.opentsdb</groupId>
<artifactId>opentsdb</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-jre</version>
</dependency>
<dependency>
<groupId>com.google.gwt</groupId>
<artifactId>gwt-user</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.9.10</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.10</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.0.pr1</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.10.6.Final</version>
</dependency>
<dependency>
<groupId>com.stumbleupon</groupId>
<artifactId>async</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-jexl</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.jgrapht</groupId>
<artifactId>jgrapht-core</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>2.21.1</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-mapper-runtime</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>com.github.eulery</groupId>
<artifactId>opentsdb-java-sdk</artifactId>
<version>1.1.4</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>hbase</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.hbase</groupId>
<artifactId>asynchbase</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import net.opentsdb.core.TSDB;
import net.opentsdb.uid.NoSuchUniqueName;
import net.opentsdb.uid.UniqueId.UniqueIdType;
import net.opentsdb.utils.Config;
import java.net.URL;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.ResponseHandler;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import java.io.BufferedWriter;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.Charset;
import java.text.DecimalFormat;
import java.util.Random;
import java.util.ArrayList;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.math.*;
import java.lang.reflect.Method;
import org.apache.log4j.Logger;
import org.apache.log4j.LogManager;
import org.apache.log4j.Level;
public class OpentsdbTest{
//static { System.setProperty("logback.configurationFile", "/home/ubuntu/fang/opentsdb/opentsdbtest/logback.xml");}
static { System.setProperty("logback.configurationFile", "/etc/opentsdb/logback.xml");}
public static void main(String args[]) {
Logger logger = LogManager.getLogger(OpentsdbTest.class);
logger.setLevel(Level.OFF);
// begin to parse argument
String datadir = "/home/ubuntu/testdata";
String sqlchoice = "q1";
int numOfRows = 1000000;
int numOfFiles = 0;
int numOfClients = 1;
int rowsPerRequest = 1;
for (int i = 0; i < args.length; ++i) {
if (args[i].equalsIgnoreCase("-dataDir")) {
if (i < args.length - 1) {
datadir = args[++i];
}
} else if (args[i].equalsIgnoreCase("-numofFiles")) {
if (i < args.length - 1) {
numOfFiles = Integer.parseInt(args[++i]);
}
} else if (args[i].equalsIgnoreCase("-rowsPerRequest")) {
if (i < args.length - 1) {
rowsPerRequest = Integer.parseInt(args[++i]);
}
} else if (args[i].equalsIgnoreCase("-writeClients")) {
if (i < args.length - 1) {
numOfClients = Integer.parseInt(args[++i]);
}
} else if (args[i].equalsIgnoreCase("-sql")) {
sqlchoice = args[++i];
}
}
System.out.println("parameters:\n");
if (numOfFiles >0) {
// write data
System.out.printf("----dataDir:%s\n", datadir);
System.out.printf("----numOfFiles:%d\n", numOfFiles);
System.out.printf("----numOfClients:%d\n", numOfClients);
System.out.printf("----rowsPerRequest:%d\n", rowsPerRequest);
try {
// begin to insert data
System.out.printf("----begin to insert data\n");
long startTime = System.currentTimeMillis();
int a = numOfFiles/numOfClients;
int b = numOfFiles%numOfClients;
int last = 0;
WriteThread[] writethreads = new WriteThread[numOfClients];
int[] wargs = new int[2]; // data file start, end
wargs[0] = numOfRows; //rows to be read from each file
wargs[1] = rowsPerRequest;
int fstart =0;
int fend =0;
for (int i = 0; i<numOfClients; ++i) {
if (i<b) {
fstart = last;
fend = last+a;
last = last+a+1;
writethreads[i] = new WriteThread(fstart,fend,wargs,datadir);
System.out.printf("----Thread %d begin to write\n",i);
writethreads[i].start();
} else {
fstart = last;
fend = last+a-1;
last = last+a;
writethreads[i] = new WriteThread(fstart,fend,wargs,datadir);
System.out.printf("----Thread %d begin to write\n",i);
writethreads[i].start();
}
}
for (int i =0; i<numOfClients; ++i) {
try {
writethreads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long stopTime = System.currentTimeMillis();
float elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
float speeds = numOfRows*numOfFiles/elapseTime;
System.out.printf("---- insertation speed: %f Rows/Second\n",speeds);
} catch (Exception ex) {
ex.printStackTrace();
System.exit(1);
} finally {
System.out.printf("---- insertion end\n");
}
// above:write part; below: read part;
} else {
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
String filter_reg;
String get_url;
long startTime;
long stopTime;
float elapseTime;
CloseableHttpResponse responseBody;
StringEntity stringEntity;
HttpPost httpPost;
String qjson;
for (int ig = 10; ig <110; ig = ig+10) {
if (ig == 10) {
filter_reg = "\\b[0-9]\\b";
} else {
filter_reg = "\\b" + "([0-9]|"
+ "[" + "1" + "-"
+ Integer.toString(ig/10-1) + "][0-9])" +"\\b";
}
switch (sqlchoice) {
case "q1":
get_url = "http://127.0.0.1:4242/api/query?";
/*
get_url = get_url + "start=1563249700&m=none:temperature{devgroup=";
get_url = get_url + String.valueOf(ig-10) +"}";
*/
startTime = System.currentTimeMillis();
httpPost = new HttpPost(get_url);
qjson = " {\n" +
" \"start\": 1563249700,\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"none\",\n" +
" \"metric\": \"temperature\",\n" +
" \"tags\": {\n" +
" \"devgroup\": " + "\"" + Integer.toString(ig-10) + "\"" + "\n" +
" }\n" +
" }\n" +
" ]\n" +
" }";
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
stringEntity = new StringEntity(qjson);
httpPost.setEntity(stringEntity);
responseBody = httpclient.execute(httpPost);
/*
System.out.println(responseBody.getStatusLine());
System.out.println(qjson);
*/
responseBody.close();
stopTime = System.currentTimeMillis();
elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("Spend %f seconds to get data when devgroup = %d\n",elapseTime, ig-10);
break;
case "q2":
//count
startTime = System.currentTimeMillis();
get_url = "http://127.0.0.1:4242/api/query?";
httpPost = new HttpPost(get_url);
qjson = " {\n" +
" \"start\": 1563249700,\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"count\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupby\": false\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }";
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
stringEntity = new StringEntity(qjson);
httpPost.setEntity(stringEntity);
responseBody = httpclient.execute(httpPost);
stopTime = System.currentTimeMillis();
elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("Spend %f seconds to count data when devgroup < %d\n",elapseTime, ig);
responseBody.close();
//avg
startTime = System.currentTimeMillis();
httpPost = new HttpPost(get_url);
qjson = " {\n" +
" \"start\": 1563249700,\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"avg\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupby\": false\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }";
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
stringEntity = new StringEntity(qjson);
httpPost.setEntity(stringEntity);
responseBody = httpclient.execute(httpPost);
stopTime = System.currentTimeMillis();
elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("Spend %f seconds to avg data when devgroup < %d\n",elapseTime, ig);
responseBody.close();
//sum
startTime = System.currentTimeMillis();
httpPost = new HttpPost(get_url);
qjson = " {\n" +
" \"start\": 1563249700,\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"sum\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" +",\n" +
" \"groupby\": false\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }";
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
stringEntity = new StringEntity(qjson);
httpPost.setEntity(stringEntity);
responseBody = httpclient.execute(httpPost);
stopTime = System.currentTimeMillis();
elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("Spend %f seconds to sum data when devgroup < %d\n",elapseTime, ig);
responseBody.close();
//max
startTime = System.currentTimeMillis();
httpPost = new HttpPost(get_url);
qjson = " {\n" +
" \"start\": 1563249700,\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"max\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupby\": false\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }";
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
stringEntity = new StringEntity(qjson);
httpPost.setEntity(stringEntity);
responseBody = httpclient.execute(httpPost);
stopTime = System.currentTimeMillis();
elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("Spend %f seconds to max data when devgroup < %d\n",elapseTime, ig);
responseBody.close();
//min
startTime = System.currentTimeMillis();
httpPost = new HttpPost(get_url);
qjson = " {\n" +
" \"start\": 1563249700,\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"min\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupby\": false\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }";
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
stringEntity = new StringEntity(qjson);
httpPost.setEntity(stringEntity);
responseBody = httpclient.execute(httpPost);
responseBody.close();
stopTime = System.currentTimeMillis();
elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("Spend %f seconds to min data when devgroup < %d\n",elapseTime, ig);
responseBody.close();
break;
case "q3":
startTime = System.currentTimeMillis();
get_url = "http://127.0.0.1:4242/api/query?";
httpPost = new HttpPost(get_url);
qjson = " {\n" +
" \"start\": 1563249700,\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"count\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupBy\": true\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"aggregator\": \"sum\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupBy\": true\n" +
" }\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"aggregator\": \"avg\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupBy\": true\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
" }";
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
stringEntity = new StringEntity(qjson);
httpPost.setEntity(stringEntity);
responseBody = httpclient.execute(httpPost);
/*
System.out.println(responseBody.getStatusLine());
System.out.println(qjson);
*/
stopTime = System.currentTimeMillis();
elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("Spend %f seconds to group data by devgroup when devgroup < %d\n",elapseTime, ig);
responseBody.close();
break;
case "q4":
startTime = System.currentTimeMillis();
get_url = "http://127.0.0.1:4242/api/query?";
httpPost = new HttpPost(get_url);
qjson = " {\n" +
" \"start\": 1563249700,\n" +
" \"queries\": [\n" +
" {\n" +
" \"aggregator\": \"none\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupBy\": false\n" +
" }\n" +
" ],\n" +
" \"downsample\": \"1m-sum\"\n" +
" },\n" +
" {\n" +
" \"aggregator\": \"none\",\n" +
" \"metric\": \"temperature\",\n" +
" \"filters\": [\n"+
" {\n" +
" \"type\": \"regexp\",\n" +
" \"tagk\": \"devgroup\",\n" +
" \"filter\": " +"\"" + filter_reg +"\"" + ",\n" +
" \"groupBy\": false\n" +
" }\n" +
" ],\n" +
" \"downsample\": \"1m-avg\"\n" +
" }\n" +
" ]\n" +
" }";
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
stringEntity = new StringEntity(qjson);
httpPost.setEntity(stringEntity);
responseBody = httpclient.execute(httpPost);
/*
System.out.println(responseBody.getStatusLine());
System.out.println(qjson);
*/
stopTime = System.currentTimeMillis();
elapseTime = stopTime - startTime;
elapseTime = elapseTime/1000;
System.out.printf("Spend %f seconds to group data by time when devgroup < %d\n",elapseTime, ig);
responseBody.close();
break;
}
}
httpclient.close();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("query end:----\n");
} // end write or query
System.exit(0);
}// end main
}// end class
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import net.opentsdb.core.TSDB;
import net.opentsdb.uid.NoSuchUniqueName;
import net.opentsdb.uid.UniqueId.UniqueIdType;
import net.opentsdb.utils.Config;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.HttpEntity;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.http.client.methods.*;
import java.io.BufferedWriter;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.Charset;
import java.text.DecimalFormat;
import java.util.Random;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.math.*;
import java.lang.reflect.Method;
public class WriteThread extends Thread {
private int[] wargs; // fstart, fend, rows to be read, rows perrequest
private String fdir;
private int fstart;
private int fend;
public WriteThread (int fstart, int fend,int[] wargs, String fdir) {
this.fstart = fstart;
this.fend = fend;
this.fdir = fdir;
this.wargs = wargs;
}
// begin to insert in this thread
public void run() {
StringEntity stringEntity;
String port = "4242";
String put_url = "http://127.0.0.1:"+port+"/api/put?summary";
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
/*
httpclient.getHttpConnectionManager().getParams()
.setConnectionTimeout(1000);
httpclient.getHttpConnectionManager().getParams()
.setSoTimeout(5000);
*/
for (int i=fstart; i<=fend; i++) {
String csvfile;
csvfile = fdir + "/testdata"+ Integer.toString(i)+".csv";
BufferedReader br = null;
String line = "";
String cvsSplitBy = " ";
try {
br = new BufferedReader(new FileReader(csvfile));
System.out.println("---- begin to read file " +csvfile+"\n");
for (int itotalrow =0; itotalrow<wargs[0]; itotalrow=itotalrow+wargs[1]) {
HttpPost httpPost = new HttpPost(put_url);
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
String totaljson = "[\n";
for (int irow =0; irow<wargs[1]; ++irow) {
line = br.readLine();
if (line !=null) {
String[] meter = line.split(cvsSplitBy);
// devid, devname,devgroup,ts,temperature,humidity
BigInteger timestamp = new BigInteger(meter[3]);
timestamp = timestamp.divide(BigInteger.valueOf(1000));
long ts = timestamp.longValue();
int temperature = Integer.parseInt(meter[4]);
float humidity = Float.parseFloat(meter[5]);
String onejson = " {\n" +
" \"metric\": \"temperature\",\n" +
" \"timestamp\": " + String.valueOf(ts) + ",\n" +
" \"value\": " + String.valueOf(temperature) + ",\n" +
" \"tags\" : {\n" +
" \"devid\":" +" \"" + meter[0] + "\",\n" +
" \"devname\":" +" \"" + meter[1] + "\",\n" +
" \"devgroup\":" +" \"" + meter[2] + "\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"metric\": \"humidity\",\n" +
" \"timestamp\": " + String.valueOf(ts) + ",\n" +
" \"value\": " + String.valueOf(humidity) + ",\n" +
" \"tags\" : {\n" +
" \"devid\":" +" \"" + meter[0] + "\",\n" +
" \"devname\":" +" \"" + meter[1] + "\",\n" +
" \"devgroup\":" +" \"" + meter[2] + "\"\n" +
" }\n";
if (irow == 0) {
totaljson = totaljson + onejson;
} else if (irow < wargs[1]) {
totaljson = totaljson + " },\n" + onejson;
}
} //end one line reading
} //end on batch put
totaljson = totaljson + " }\n]";
stringEntity = new StringEntity(totaljson);
httpPost.setEntity(stringEntity);
CloseableHttpResponse responseBody = httpclient.execute(httpPost);
/*
System.out.println(responseBody.getStatusLine());
System.out.println(totaljson);
*/
responseBody.close();
}// end one file reading
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}//end file iteration
httpclient.close();
} catch (Exception e) {
e.printStackTrace();
System.out.println("failed to connect");
}
}//end run
}//end class
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 日志保存路径为tomcat下面的logs下面的mobileLog文件夹,logback会自动创建文件夹,这样设置了就可以输出日志文件了
<substitutionProperty name="logbase" value="${catalina.base}/logs/mobileLog/"
/> -->
<substitutionProperty name="logbase" value="${user.dir}/logs/ " />
<!-- 这个是要配置输出文件的 -->
<jmxConfigurator />
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%date [%thread] %-5level %logger{80} - %msg%n</pattern>
</layout>
</appender>
<!-- 文件输出日志 (文件大小策略进行文件输出,超过指定大小对文件备份) -->
<appender name="logfile"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<Encoding>UTF-8</Encoding>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<File>${logbase}%d{yyyy-MM-dd}.log.html</File>
<FileNamePattern>${logbase}.%d{yyyy-MM-dd}.log.html.zip
</FileNamePattern>
</rollingPolicy>
<triggeringPolicy
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<MaxFileSize>2MB</MaxFileSize>
</triggeringPolicy>
<layout class="ch.qos.logback.classic.html.HTMLLayout">
<pattern>%date%level%thread%10logger%file%line%msg</pattern>
</layout>
</appender>
<!-- Output by Email -->
<!--
<appender name="Email" class="ch.qos.logback.classic.net.SMTPAppender">
<SMTPHost>stmp host name</SMTPHost>
<To>Email Address</To>
<To>Email Address</To>
<From>Email Address</From>
<Subject>TESTING Email Function: %logger{20} - %m</Subject>
<layout class="ch.qos.logback.classic.html.HTMLLayout">
<pattern>%date%level%thread%10logger%file%line%msg</pattern>
</layout>
</appender> -->
<!-- Output to Database -->
<!--
<appender name="DB" class="ch.qos.logback.classic.db.DBAppender">
<connectionSource class="ch.qos.logback.core.db.DriverManagerConnectionSource">
<driverClass>com.mysql.jdbc.Driver</driverClass>
<url>jdbc:mysql://localhost:3306/test</url>
<user>root</user>
<password>trend_dev</password>
</connectionSource>
</appender> -->
<root>
<level value="debug" />
<appender-ref ref="logfile" />
<appender-ref ref="logfile" />
</root>
</configuration>
CMAKE_MINIMUM_REQUIRED(VERSION 3.0...3.20)
PROJECT(TDengine)
IF (TD_LINUX)
add_executable(tdengineTest tdengineTest.c)
target_link_libraries(tdengineTest taos_static tutil common pthread)
ENDIF()
IF (TD_DARWIN)
add_executable(tdengineTest tdengineTest.c)
target_link_libraries(tdengineTest taos_static tutil common pthread)
ENDIF()
ROOT=./
TARGET=exe
LFLAGS = '-Wl,-rpath,/usr/lib' -ltaos -lpthread -lm -lrt
CFLAGS = -O3 -g -Wall -Wno-deprecated -fPIC -Wno-unused-result -Wconversion -Wno-char-subscripts -D_REENTRANT -Wno-format -D_REENTRANT -DLINUX -msse4.2 -Wno-unused-function -D_M_X64 -std=gnu99
all: $(TARGET)
exe:
gcc $(CFLAGS) ./tdengineTest.c -o $(ROOT)/tdengineTest $(LFLAGS)
clean:
rm $(ROOT)tdengineTest
\ No newline at end of file
select * from db.devices where devgroup=0;
select * from db.devices where devgroup=10;
select * from db.devices where devgroup=20;
select * from db.devices where devgroup=30;
select * from db.devices where devgroup=40;
select * from db.devices where devgroup=50;
select * from db.devices where devgroup=60;
select * from db.devices where devgroup=70;
select * from db.devices where devgroup=80;
select * from db.devices where devgroup=90;
select count(*) from db.devices where devgroup<10;
select count(*) from db.devices where devgroup<20;
select count(*) from db.devices where devgroup<30;
select count(*) from db.devices where devgroup<40;
select count(*) from db.devices where devgroup<50;
select count(*) from db.devices where devgroup<60;
select count(*) from db.devices where devgroup<70;
select count(*) from db.devices where devgroup<80;
select count(*) from db.devices where devgroup<90;
select count(*) from db.devices;
select avg(temperature) from db.devices where devgroup<10;
select avg(temperature) from db.devices where devgroup<20;
select avg(temperature) from db.devices where devgroup<30;
select avg(temperature) from db.devices where devgroup<40;
select avg(temperature) from db.devices where devgroup<50;
select avg(temperature) from db.devices where devgroup<60;
select avg(temperature) from db.devices where devgroup<70;
select avg(temperature) from db.devices where devgroup<80;
select avg(temperature) from db.devices where devgroup<90;
select avg(temperature) from db.devices;
select sum(temperature) from db.devices where devgroup<10;
select sum(temperature) from db.devices where devgroup<20;
select sum(temperature) from db.devices where devgroup<30;
select sum(temperature) from db.devices where devgroup<40;
select sum(temperature) from db.devices where devgroup<50;
select sum(temperature) from db.devices where devgroup<60;
select sum(temperature) from db.devices where devgroup<70;
select sum(temperature) from db.devices where devgroup<80;
select sum(temperature) from db.devices where devgroup<90;
select sum(temperature) from db.devices;
select max(temperature) from db.devices where devgroup<10;
select max(temperature) from db.devices where devgroup<20;
select max(temperature) from db.devices where devgroup<30;
select max(temperature) from db.devices where devgroup<40;
select max(temperature) from db.devices where devgroup<50;
select max(temperature) from db.devices where devgroup<60;
select max(temperature) from db.devices where devgroup<70;
select max(temperature) from db.devices where devgroup<80;
select max(temperature) from db.devices where devgroup<90;
select max(temperature) from db.devices;
select min(temperature) from db.devices where devgroup<10;
select min(temperature) from db.devices where devgroup<20;
select min(temperature) from db.devices where devgroup<30;
select min(temperature) from db.devices where devgroup<40;
select min(temperature) from db.devices where devgroup<50;
select min(temperature) from db.devices where devgroup<60;
select min(temperature) from db.devices where devgroup<70;
select min(temperature) from db.devices where devgroup<80;
select min(temperature) from db.devices where devgroup<90;
select min(temperature) from db.devices;
select spread(temperature) from db.devices where devgroup<10;
select spread(temperature) from db.devices where devgroup<20;
select spread(temperature) from db.devices where devgroup<30;
select spread(temperature) from db.devices where devgroup<40;
select spread(temperature) from db.devices where devgroup<50;
select spread(temperature) from db.devices where devgroup<60;
select spread(temperature) from db.devices where devgroup<70;
select spread(temperature) from db.devices where devgroup<80;
select spread(temperature) from db.devices where devgroup<90;
select spread(temperature) from db.devices;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<10 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<20 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<30 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<40 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<50 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<60 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<70 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<80 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<90 group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices group by devgroup;
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<10 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<20 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<30 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<40 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<50 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<60 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<70 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<80 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices where devgroup<90 interval(1m);
select count(temperature), sum(temperature), avg(temperature) from db.devices interval(1m);
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdint.h>
#include <taos.h>
#include <time.h>
#include <pthread.h>
#include <sys/time.h>
#include <inttypes.h>
typedef struct {
char sql[256];
char dataDir[256];
int filesNum;
int clients;
int rowsPerRequest;
int write;
} ProArgs;
typedef struct {
int64_t totalRows;
} StatisInfo;
typedef struct {
pthread_t pid;
int threadId;
int sID;
int eID;
} ThreadObj;
static StatisInfo statis;
static ProArgs arguments;
void parseArg(int argc, char *argv[]);
void writeData();
void readData();
int main(int argc, char *argv[]) {
statis.totalRows = 0;
parseArg(argc, argv);
if (arguments.write) {
writeData();
} else {
readData();
}
}
void parseArg(int argc, char *argv[]) {
strcpy(arguments.sql, "./sqlCmd.txt");
strcpy(arguments.dataDir, "./testdata");
arguments.filesNum = 2;
arguments.clients = 1;
arguments.rowsPerRequest = 100;
for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-sql") == 0) {
if (i < argc - 1) {
strcpy(arguments.sql, argv[++i]);
}
else {
fprintf(stderr, "'-sql' requires a parameter, default:%s\n", arguments.sql);
exit(EXIT_FAILURE);
}
}
else if (strcmp(argv[i], "-dataDir") == 0) {
if (i < argc - 1) {
strcpy(arguments.dataDir, argv[++i]);
}
else {
fprintf(stderr, "'-dataDir' requires a parameter, default:%s\n", arguments.dataDir);
exit(EXIT_FAILURE);
}
}
else if (strcmp(argv[i], "-numOfFiles") == 0) {
if (i < argc - 1) {
arguments.filesNum = atoi(argv[++i]);
}
else {
fprintf(stderr, "'-numOfFiles' requires a parameter, default:%d\n", arguments.filesNum);
exit(EXIT_FAILURE);
}
}
else if (strcmp(argv[i], "-clients") == 0) {
if (i < argc - 1) {
arguments.clients = atoi(argv[++i]);
}
else {
fprintf(stderr, "'-clients' requires a parameter, default:%d\n", arguments.clients);
exit(EXIT_FAILURE);
}
}
else if (strcmp(argv[i], "-rowsPerRequest") == 0) {
if (i < argc - 1) {
arguments.rowsPerRequest = atoi(argv[++i]);
}
else {
fprintf(stderr, "'-rowsPerRequest' requires a parameter, default:%d\n", arguments.rowsPerRequest);
exit(EXIT_FAILURE);
}
}
else if (strcmp(argv[i], "-w") == 0) {
arguments.write = 1;
}
}
}
static void taos_error(TAOS_RES *tres, TAOS *conn) {
printf("TDengine error: %s\n", tres?taos_errstr(tres):"null result");
taos_close(conn);
exit(1);
}
int64_t getTimeStampMs() {
struct timeval systemTime;
gettimeofday(&systemTime, NULL);
return (int64_t)systemTime.tv_sec * 1000L + (int64_t)systemTime.tv_usec / 1000;
}
void writeDataImp(void *param) {
ThreadObj *pThread = (ThreadObj *)param;
printf("Thread %d, writing sID %d, eID %d\n", pThread->threadId, pThread->sID, pThread->eID);
void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
if (taos == NULL) {
// where to find errstr?
// taos_error(NULL, taos);
printf("TDengine error: %s\n", "failed to connect");
exit(1);
}
TAOS_RES* result = taos_query(taos, "use db");
int32_t code = taos_errno(result);
if (code != 0) {
taos_error(result, taos);
}
taos_free_result(result);
char *sql = calloc(1, 8*1024*1024);
int sqlLen = 0;
int lastMachineid = 0;
int counter = 0;
int totalRecords = 0;
for (int j = pThread->sID; j <= pThread->eID; j++) {
char fileName[300];
sprintf(fileName, "%s/testdata%d.csv", arguments.dataDir, j);
FILE *fp = fopen(fileName, "r");
if (fp == NULL) {
printf("failed to open file %s\n", fileName);
exit(1);
}
printf("open file %s success\n", fileName);
char *line = NULL;
size_t len = 0;
while (!feof(fp)) {
free(line);
line = NULL;
len = 0;
getline(&line, &len, fp);
if (line == NULL) break;
if (strlen(line) < 10) continue;
int machineid;
char machinename[16];
int machinegroup;
int64_t timestamp;
int temperature;
float humidity;
sscanf(line, "%d%s%d%" PRId64 "%d%f", &machineid, machinename, &machinegroup, &timestamp, &temperature, &humidity);
if (counter == 0) {
sqlLen = sprintf(sql, "insert into");
}
if (lastMachineid != machineid) {
lastMachineid = machineid;
sqlLen += sprintf(sql + sqlLen, " dev%d values",
machineid);
}
sqlLen += sprintf(sql + sqlLen, "(%" PRId64 ",%d,%f)", timestamp, temperature, humidity);
counter++;
if (counter >= arguments.rowsPerRequest) {
result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
printf("insert into dev%d values (%" PRId64 ",%d,%f)\n",machineid, timestamp, temperature, humidity);
printf("thread:%d error:%d reason:%s\n", pThread->threadId, code, taos_errstr(result));
}
taos_free_result(result);
totalRecords += counter;
counter = 0;
lastMachineid = -1;
sqlLen = 0;
}
}
fclose(fp);
}
if (counter > 0) {
result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
// printf("insert into dev%d using devices tags(%d,'%s',%d) values (%" PRId64 ",%d,%f)",machineid, machineid, machinename, machinegroup, timestamp, temperature, humidity);
printf("thread:%d error:%d reason:%s\n", pThread->threadId, code, taos_errstr(taos));
}
taos_free_result(result);
totalRecords += counter;
}
__sync_fetch_and_add(&statis.totalRows, totalRecords);
free(sql);
}
void writeData() {
printf("write data\n");
printf("---- clients: %d\n", arguments.clients);
printf("---- dataDir: %s\n", arguments.dataDir);
printf("---- numOfFiles: %d\n", arguments.filesNum);
printf("---- rowsPerRequest: %d\n", arguments.rowsPerRequest);
taos_init();
void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
if (taos == NULL) {
// where to find errstr?
// taos_error(NULL, taos);
printf("TDengine error: %s\n", "failed to connect");
exit(1);
}
TAOS_RES *result = taos_query(taos, "create database if not exists db");
int32_t code = taos_errno(result);
if (code != 0) {
taos_error(result, taos);
}
taos_free_result(result);
result = taos_query(taos,
"create stable if not exists db.devices(ts timestamp, temperature int, humidity float) "
"tags(devid int, devname binary(16), devgroup int)");
code = taos_errno(result);
if (code != 0) {
taos_error(result, taos);
}
taos_free_result(result);
//create tables before insert the data
result = taos_query(taos, "use db");
code = taos_errno(result);
if (code != 0) {
taos_error(result, taos);
}
taos_free_result(result);
char *sql = calloc(1, 8*1024*1024);
int sqlLen = 0;
int lastMachineid = 0;
int counter = 0;
int totalRecords = 0;
for (int i = 0; i < arguments.filesNum; i++) {
char fileName[300];
sprintf(fileName, "%s/testdata%d.csv", arguments.dataDir, i);
FILE *fp = fopen(fileName, "r");
if (fp == NULL) {
printf("failed to open file %s\n", fileName);
exit(1);
}
printf("open file %s success\n", fileName);
char *line = NULL;
size_t len = 0;
while (!feof(fp)) {
free(line);
line = NULL;
len = 0;
getline(&line, &len, fp);
if (line == NULL) break;
if (strlen(line) < 10) continue;
int machineid;
char machinename[16];
int machinegroup;
int64_t timestamp;
int temperature;
float humidity;
sscanf(line, "%d%s%d%" PRId64 "%d%f", &machineid, machinename, &machinegroup, &timestamp, &temperature, &humidity);
if (counter == 0) {
sqlLen = sprintf(sql, "create table if not exists");
}
if (lastMachineid != machineid) {
lastMachineid = machineid;
sqlLen += sprintf(sql + sqlLen, " dev%d using devices tags(%d,'%s',%d)", machineid, machineid, machinename, machinegroup);
}
counter++;
if (counter >= arguments.rowsPerRequest) {
result = taos_query(taos, sql);
code = taos_errno(result);
if (code != 0) {
printf("create table error:%d reason:%s\n", code, taos_errstr(result));
}
taos_free_result(result);
totalRecords += counter;
counter = 0;
lastMachineid = -1;
sqlLen = 0;
}
}
fclose(fp);
}
int64_t st = getTimeStampMs();
int a = arguments.filesNum / arguments.clients;
int b = arguments.filesNum % arguments.clients;
int last = 0;
ThreadObj *threads = calloc((size_t)arguments.clients, sizeof(ThreadObj));
for (int i = 0; i < arguments.clients; ++i) {
ThreadObj *pthread = threads + i;
pthread_attr_t thattr;
pthread->threadId = i + 1;
pthread->sID = last;
if (i < b) {
pthread->eID = last + a;
} else {
pthread->eID = last + a - 1;
}
last = pthread->eID + 1;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pthread_create(&pthread->pid, &thattr, (void *(*)(void *))writeDataImp, pthread);
}
for (int i = 0; i < arguments.clients; i++) {
pthread_join(threads[i].pid, NULL);
}
int64_t elapsed = getTimeStampMs() - st;
float seconds = (float)elapsed / 1000;
float rs = (float)statis.totalRows / seconds;
free(threads);
printf("---- Spent %f seconds to insert %" PRId64 " records, speed: %f Rows/Second\n", seconds, statis.totalRows, rs);
}
void readDataImp(void *param)
{
ThreadObj *pThread = (ThreadObj *)param;
printf("Thread %d\n", pThread->threadId);
FILE *fp = fopen(arguments.sql, "r");
if (fp == NULL) {
printf("failed to open file %s\n", arguments.sql);
exit(1);
}
printf("open file %s success\n", arguments.sql);
void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
if (taos == NULL) {
// where to find errstr?
// taos_error(NULL, taos);
printf("TDengine error: %s\n", "failed to connect");
exit(1);
}
char *line = NULL;
size_t len = 0;
while (!feof(fp)) {
free(line);
line = NULL;
len = 0;
getline(&line, &len, fp);
if (line == NULL) break;
if (strlen(line) < 10) continue;
int64_t st = getTimeStampMs();
TAOS_RES *result = taos_query(taos, line);
int32_t code = taos_errno(result);
if (code != 0) {
taos_error(result, taos);
}
TAOS_ROW row;
int rows = 0;
//int num_fields = taos_field_count(taos);
//TAOS_FIELD *fields = taos_fetch_fields(result);
while ((row = taos_fetch_row(result))) {
rows++;
//char temp[256];
//taos_print_row(temp, row, fields, num_fields);
//printf("%s\n", temp);
}
taos_free_result(result);
int64_t elapsed = getTimeStampMs() - st;
float seconds = (float)elapsed / 1000;
printf("---- Spent %f seconds to retrieve %d records, Thread:%d query: %s\n", seconds, rows, pThread->threadId, line);
}
fclose(fp);
}
void readData() {
printf("read data\n");
printf("---- sql: %s\n", arguments.sql);
printf("---- clients: %d\n", arguments.clients);
void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
if (taos == NULL) {
// where to find errstr?
// taos_error(NULL, taos);
printf("TDengine error: %s\n", "failed to connect");
exit(1);
}
ThreadObj *threads = calloc((size_t)arguments.clients, sizeof(ThreadObj));
for (int i = 0; i < arguments.clients; ++i) {
ThreadObj *pthread = threads + i;
pthread_attr_t thattr;
pthread->threadId = i + 1;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pthread_create(&pthread->pid, &thattr, (void *(*)(void *))readDataImp, pthread);
}
for (int i = 0; i < arguments.clients; i++) {
pthread_join(threads[i].pid, NULL);
}
free(threads);
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
namespace TDengineDriver
{
enum TDengineDataType
{
TSDB_DATA_TYPE_NULL = 0, // 1 bytes
TSDB_DATA_TYPE_BOOL = 1, // 1 bytes
TSDB_DATA_TYPE_TINYINT = 2, // 1 bytes
TSDB_DATA_TYPE_SMALLINT = 3, // 2 bytes
TSDB_DATA_TYPE_INT = 4, // 4 bytes
TSDB_DATA_TYPE_BIGINT = 5, // 8 bytes
TSDB_DATA_TYPE_FLOAT = 6, // 4 bytes
TSDB_DATA_TYPE_DOUBLE = 7, // 8 bytes
TSDB_DATA_TYPE_BINARY = 8, // string
TSDB_DATA_TYPE_TIMESTAMP = 9,// 8 bytes
TSDB_DATA_TYPE_NCHAR = 10, // unicode string
TSDB_DATA_TYPE_UTINYINT = 11,// 1 byte
TSDB_DATA_TYPE_USMALLINT= 12,// 2 bytes
TSDB_DATA_TYPE_UINT = 13, // 4 bytes
TSDB_DATA_TYPE_UBIGINT= 14 // 8 bytes
}
enum TDengineInitOption
{
TSDB_OPTION_LOCALE = 0,
TSDB_OPTION_CHARSET = 1,
TSDB_OPTION_TIMEZONE = 2,
TDDB_OPTION_CONFIGDIR = 3,
TDDB_OPTION_SHELL_ACTIVITY_TIMER = 4
}
class TDengineMeta
{
public string name;
public short size;
public byte type;
public string TypeName()
{
switch ((TDengineDataType)type)
{
case TDengineDataType.TSDB_DATA_TYPE_BOOL:
return "BOOL";
case TDengineDataType.TSDB_DATA_TYPE_TINYINT:
return "TINYINT";
case TDengineDataType.TSDB_DATA_TYPE_SMALLINT:
return "SMALLINT";
case TDengineDataType.TSDB_DATA_TYPE_INT:
return "INT";
case TDengineDataType.TSDB_DATA_TYPE_BIGINT:
return "BIGINT";
case TDengineDataType.TSDB_DATA_TYPE_UTINYINT:
return "TINYINT UNSIGNED";
case TDengineDataType.TSDB_DATA_TYPE_USMALLINT:
return "SMALLINT UNSIGNED";
case TDengineDataType.TSDB_DATA_TYPE_UINT:
return "INT UNSIGNED";
case TDengineDataType.TSDB_DATA_TYPE_UBIGINT:
return "BIGINT UNSIGNED";
case TDengineDataType.TSDB_DATA_TYPE_FLOAT:
return "FLOAT";
case TDengineDataType.TSDB_DATA_TYPE_DOUBLE:
return "DOUBLE";
case TDengineDataType.TSDB_DATA_TYPE_BINARY:
return "STRING";
case TDengineDataType.TSDB_DATA_TYPE_TIMESTAMP:
return "TIMESTAMP";
case TDengineDataType.TSDB_DATA_TYPE_NCHAR:
return "NCHAR";
default:
return "undefine";
}
}
}
class TDengine
{
public const int TSDB_CODE_SUCCESS = 0;
[DllImport("taos", EntryPoint = "taos_init", CallingConvention = CallingConvention.Cdecl)]
static extern public void Init();
[DllImport("taos", EntryPoint = "taos_cleanup", CallingConvention = CallingConvention.Cdecl)]
static extern public void Cleanup();
[DllImport("taos", EntryPoint = "taos_options", CallingConvention = CallingConvention.Cdecl)]
static extern public void Options(int option, string value);
[DllImport("taos", EntryPoint = "taos_connect", CallingConvention = CallingConvention.Cdecl)]
static extern public IntPtr Connect(string ip, string user, string password, string db, short port);
[DllImport("taos", EntryPoint = "taos_errstr", CallingConvention = CallingConvention.Cdecl)]
static extern private IntPtr taos_errstr(IntPtr res);
static public string Error(IntPtr res)
{
IntPtr errPtr = taos_errstr(res);
return Marshal.PtrToStringAnsi(errPtr);
}
[DllImport("taos", EntryPoint = "taos_errno", CallingConvention = CallingConvention.Cdecl)]
static extern public int ErrorNo(IntPtr res);
[DllImport("taos", EntryPoint = "taos_query", CallingConvention = CallingConvention.Cdecl)]
static extern public IntPtr Query(IntPtr conn, string sqlstr);
[DllImport("taos", EntryPoint = "taos_affected_rows", CallingConvention = CallingConvention.Cdecl)]
static extern public int AffectRows(IntPtr res);
[DllImport("taos", EntryPoint = "taos_field_count", CallingConvention = CallingConvention.Cdecl)]
static extern public int FieldCount(IntPtr res);
[DllImport("taos", EntryPoint = "taos_fetch_fields", CallingConvention = CallingConvention.Cdecl)]
static extern private IntPtr taos_fetch_fields(IntPtr res);
static public List<TDengineMeta> FetchFields(IntPtr res)
{
const int fieldSize = 68;
List<TDengineMeta> metas = new List<TDengineMeta>();
if (res == IntPtr.Zero)
{
return metas;
}
int fieldCount = FieldCount(res);
IntPtr fieldsPtr = taos_fetch_fields(res);
for (int i = 0; i < fieldCount; ++i)
{
int offset = i * fieldSize;
TDengineMeta meta = new TDengineMeta();
meta.name = Marshal.PtrToStringAnsi(fieldsPtr + offset);
meta.type = Marshal.ReadByte(fieldsPtr + offset + 65);
meta.size = Marshal.ReadInt16(fieldsPtr + offset + 66);
metas.Add(meta);
}
return metas;
}
[DllImport("taos", EntryPoint = "taos_fetch_row", CallingConvention = CallingConvention.Cdecl)]
static extern public IntPtr FetchRows(IntPtr res);
[DllImport("taos", EntryPoint = "taos_free_result", CallingConvention = CallingConvention.Cdecl)]
static extern public IntPtr FreeResult(IntPtr res);
[DllImport("taos", EntryPoint = "taos_close", CallingConvention = CallingConvention.Cdecl)]
static extern public int Close(IntPtr taos);
//get precisionin parameter restultset
[DllImport("taos", EntryPoint = "taos_result_precision", CallingConvention = CallingConvention.Cdecl)]
static extern public int ResultPrecision(IntPtr taos);
}
}
此差异已折叠。
const taos = require('td2.0-connector');
var conn = taos.connect({host:"localhost", user:"root", password:"taosdata", config:"/etc/taos",port:6030})
var c1 = conn.cursor();
function checkData(sql,row,col,data){
console.log(sql)
c1.execute(sql)
var d = c1.fetchall();
let checkdata = d[row][col];
if (checkdata == data) {
console.log('check pass')
}
else{
console.log('check failed')
console.log('checked is :',checkdata)
console.log("expected is :",data)
}
}
// nano basic case
c1.execute('reset query cache')
c1.execute('drop database if exists db')
c1.execute('create database db precision "ns";')
c1.execute('use db');
c1.execute('create table tb (ts timestamp, speed int)')
c1.execute('insert into tb values(\'2021-06-10 00:00:00.100000001\', 1);')
c1.execute('insert into tb values(1623254400150000000, 2);')
c1.execute('import into tb values(1623254400300000000, 3);')
c1.execute('import into tb values(1623254400299999999, 4);')
c1.execute('insert into tb values(1623254400300000001, 5);')
c1.execute('insert into tb values(1623254400999999999, 7);')
c1.execute('insert into tb values(1623254400123456789, 8);')
sql = 'select * from tb;'
console.log('*******************************************')
console.log('this is area about checkdata result')
//check data about insert data
checkData(sql,0,0,'2021-06-10 00:00:00.100000001')
checkData(sql,1,0,'2021-06-10 00:00:00.123456789')
checkData(sql,2,0,'2021-06-10 00:00:00.150000000')
checkData(sql,3,0,'2021-06-10 00:00:00.299999999')
checkData(sql,4,0,'2021-06-10 00:00:00.300000000')
checkData(sql,5,0,'2021-06-10 00:00:00.300000001')
checkData(sql,6,0,'2021-06-10 00:00:00.999999999')
checkData(sql,0,1,1)
checkData(sql,1,1,8)
checkData(sql,2,1,2)
checkData(sql,5,1,5)
// us basic case
c1.execute('reset query cache')
c1.execute('drop database if exists usdb')
c1.execute('create database usdb precision "us";')
c1.execute('use usdb');
c1.execute('create table tb (ts timestamp, speed int)')
c1.execute('insert into tb values(\'2021-06-10 00:00:00.100001\', 1);')
c1.execute('insert into tb values(1623254400150000, 2);')
c1.execute('import into tb values(1623254400300000, 3);')
c1.execute('import into tb values(1623254400299999, 4);')
c1.execute('insert into tb values(1623254400300001, 5);')
c1.execute('insert into tb values(1623254400999999, 7);')
c1.execute('insert into tb values(1623254400123789, 8);')
sql = 'select * from tb;'
console.log('*******************************************')
//check data about insert data
checkData(sql,0,0,'2021-06-10 00:00:00.100001')
checkData(sql,1,0,'2021-06-10 00:00:00.123789')
checkData(sql,2,0,'2021-06-10 00:00:00.150000')
checkData(sql,3,0,'2021-06-10 00:00:00.299999')
checkData(sql,4,0,'2021-06-10 00:00:00.300000')
checkData(sql,5,0,'2021-06-10 00:00:00.300001')
checkData(sql,6,0,'2021-06-10 00:00:00.999999')
checkData(sql,0,1,1)
checkData(sql,1,1,8)
checkData(sql,2,1,2)
checkData(sql,5,1,5)
console.log('*******************************************')
// ms basic case
c1.execute('reset query cache')
c1.execute('drop database if exists msdb')
c1.execute('create database msdb precision "ms";')
c1.execute('use msdb');
c1.execute('create table tb (ts timestamp, speed int)')
c1.execute('insert into tb values(\'2021-06-10 00:00:00.101\', 1);')
c1.execute('insert into tb values(1623254400150, 2);')
c1.execute('import into tb values(1623254400300, 3);')
c1.execute('import into tb values(1623254400299, 4);')
c1.execute('insert into tb values(1623254400301, 5);')
c1.execute('insert into tb values(1623254400789, 7);')
c1.execute('insert into tb values(1623254400999, 8);')
sql = 'select * from tb;'
console.log('*******************************************')
console.log('this is area about checkdata result')
//check data about insert data
checkData(sql,0,0,'2021-06-10 00:00:00.101')
checkData(sql,1,0,'2021-06-10 00:00:00.150')
checkData(sql,2,0,'2021-06-10 00:00:00.299')
checkData(sql,3,0,'2021-06-10 00:00:00.300')
checkData(sql,4,0,'2021-06-10 00:00:00.301')
checkData(sql,5,0,'2021-06-10 00:00:00.789')
checkData(sql,6,0,'2021-06-10 00:00:00.999')
checkData(sql,0,1,1)
checkData(sql,1,1,2)
checkData(sql,2,1,4)
checkData(sql,5,1,7)
console.log('*******************************************')
// offfical query result to show
// console.log('this is area about fetch all data')
// var query = c1.query(sql)
// var promise = query.execute();
// promise.then(function(result) {
// result.pretty();
// });
console.log('*******************************************')
c1.execute('use db')
sql2 = 'select count(*) from tb where ts > 1623254400100000000 and ts < 1623254400100000002;'
checkData(sql2,0,0,1)
sql3 = 'select count(*) from tb where ts > \'2021-06-10 0:00:00.100000001\' and ts < \'2021-06-10 0:00:00.160000000\';'
checkData(sql3,0,0,2)
sql4 = 'select count(*) from tb where ts > 1623254400100000000 and ts < 1623254400150000000;'
checkData(sql4,0,0,2)
sql5 = 'select count(*) from tb where ts > \'2021-06-10 0:00:00.100000000\' and ts < \'2021-06-10 0:00:00.150000000\';'
checkData(sql5,0,0,2)
sql6 = 'select count(*) from tb where ts > 1623254400400000000;'
checkData(sql6,0,0,1)
sql7 = 'select count(*) from tb where ts < \'2021-06-10 00:00:00.400000000\';'
checkData(sql7,0,0,6)
sql8 = 'select count(*) from tb where ts > now + 400000000b;'
c1.execute(sql8)
sql9 = 'select count(*) from tb where ts >= \'2021-06-10 0:00:00.100000001\';'
checkData(sql9,0,0,7)
sql10 = 'select count(*) from tb where ts <= 1623254400300000000;'
checkData(sql10,0,0,5)
sql11 = 'select count(*) from tb where ts = \'2021-06-10 0:00:00.000000000\';'
c1.execute(sql11)
sql12 = 'select count(*) from tb where ts = 1623254400150000000;'
checkData(sql12,0,0,1)
sql13 = 'select count(*) from tb where ts = \'2021-06-10 0:00:00.100000001\';'
checkData(sql13,0,0,1)
sql14 = 'select count(*) from tb where ts between 1623254400000000000 and 1623254400400000000;'
checkData(sql14,0,0,6)
sql15 = 'select count(*) from tb where ts between \'2021-06-10 0:00:00.299999999\' and \'2021-06-10 0:00:00.300000001\';'
checkData(sql15,0,0,3)
sql16 = 'select avg(speed) from tb interval(5000000000b);'
checkData(sql16,0,0,'2021-06-10 00:00:00.000000000')
sql17 = 'select avg(speed) from tb interval(100000000b)'
checkData(sql17,0,1,3.6666666666666665)
checkData(sql17,1,1,4.000000000)
checkData(sql17,2,0,'2021-06-10 00:00:00.300000000')
checkData(sql17,3,0,'2021-06-10 00:00:00.900000000')
console.log("print break ")
// sql18 = 'select avg(speed) from tb interval(999b)'
// c1.execute(sql18)
console.log("print break2 ")
sql19 = 'select avg(speed) from tb interval(1u);'
checkData(sql19,2,1,2.000000000)
checkData(sql19,3,0,'2021-06-10 00:00:00.299999000')
sql20 = 'select avg(speed) from tb interval(100000000b) sliding (100000000b);'
checkData(sql20,2,1,4.000000000)
checkData(sql20,3,0,'2021-06-10 00:00:00.900000000')
sql21 = 'select last(*) from tb;'
checkData(sql21,0,0,'2021-06-10 00:00:00.999999999')
sql22 = 'select first(*) from tb;'
checkData(sql22,0,0,'2021-06-10 00:00:00.100000001')
// timezone support
console.log('testing nanosecond support in other timestamps')
c1.execute('create table tb2 (ts timestamp, speed int, ts2 timestamp);')
c1.execute('insert into tb2 values(\'2021-06-10 0:00:00.100000001\', 1, \'2021-06-11 0:00:00.100000001\');')
c1.execute('insert into tb2 values(1623254400150000000, 2, 1623340800150000000);')
c1.execute('import into tb2 values(1623254400300000000, 3, 1623340800300000000);')
c1.execute('import into tb2 values(1623254400299999999, 4, 1623340800299999999);')
c1.execute('insert into tb2 values(1623254400300000001, 5, 1623340800300000001);')
c1.execute('insert into tb2 values(1623254400999999999, 7, 1623513600999999999);')
sql23 = 'select * from tb2;'
checkData(sql23,0,0,'2021-06-10 00:00:00.100000001')
checkData(sql23,1,0,'2021-06-10 00:00:00.150000000')
checkData(sql23,2,1,4)
checkData(sql23,3,1,3)
checkData(sql23,4,2,'2021-06-11 00:00:00.300000001')
checkData(sql23,5,2,'2021-06-13 00:00:00.999999999')
sql24 = 'select count(*) from tb2 where ts2 >= \'2021-06-11 0:00:00.100000001\';'
checkData(sql24,0,0,6)
sql25 = 'select count(*) from tb2 where ts2 <= 1623340800400000000;'
checkData(sql25,0,0,5)
sql26 = 'select count(*) from tb2 where ts2 = \'2021-06-11 0:00:00.300000001\';'
checkData(sql26,0,0,1)
sql27 = 'select count(*) from tb2 where ts2 = 1623340800300000001;'
checkData(sql27,0,0,1)
sql28 = 'select count(*) from tb2 where ts2 between 1623340800000000000 and 1623340800450000000;'
checkData(sql28,0,0,5)
sql29 = 'select count(*) from tb2 where ts2 between \'2021-06-11 0:00:00.299999999\' and \'2021-06-11 0:00:00.300000001\';'
checkData(sql29,0,0,3)
sql30 = 'select count(*) from tb2 where ts2 <> 1623513600999999999;'
checkData(sql30,0,0,5)
sql31 = 'select count(*) from tb2 where ts2 <> \'2021-06-11 0:00:00.100000001\';'
checkData(sql31,0,0,5)
sql32 = 'select count(*) from tb2 where ts2 != 1623513600999999999;'
checkData(sql32,0,0,5)
sql33 = 'select count(*) from tb2 where ts2 != \'2021-06-11 0:00:00.100000001\';'
checkData(sql33,0,0,5)
c1.execute('insert into tb2 values(now + 500000000b, 6, now +2d);')
sql34 = 'select count(*) from tb2;'
checkData(sql34,0,0,7)
// check timezone support
c1.execute('use db;')
c1.execute('create stable st (ts timestamp ,speed float ) tags(time timestamp ,id int);')
c1.execute('insert into stb1 using st tags("2021-06-10 0:00:00.123456789" , 1 ) values("2021-06-10T0:00:00.123456789+07:00" , 1.0);' )
sql35 = 'select first(*) from stb1;'
checkData(sql35,0,0,'2021-06-10 01:00:00.123456789')
c1.execute('use usdb;')
c1.execute('create stable st (ts timestamp ,speed float ) tags(time timestamp ,id int);')
c1.execute('insert into stb1 using st tags("2021-06-10 0:00:00.123456" , 1 ) values("2021-06-10T0:00:00.123456+07:00" , 1.0);' )
sql36 = 'select first(*) from stb1;'
checkData(sql36,0,0,'2021-06-10 01:00:00.123456')
c1.execute('use msdb;')
c1.execute('create stable st (ts timestamp ,speed float ) tags(time timestamp ,id int);')
c1.execute('insert into stb1 using st tags("2021-06-10 0:00:00.123456" , 1 ) values("2021-06-10T0:00:00.123456+07:00" , 1.0);' )
sql36 = 'select first(*) from stb1;'
checkData(sql36,0,0,'2021-06-10 01:00:00.123')
此差异已折叠。
const TDengineCursor = require('./cursor')
const CTaosInterface = require('./cinterface')
module.exports = TDengineConnection;
/**
* TDengine Connection Class
* @param {object} options - Options for configuring the connection with TDengine
* @return {TDengineConnection}
* @class TDengineConnection
* @constructor
* @example
* //Initialize a new connection
* var conn = new TDengineConnection({host:"127.0.0.1", user:"root", password:"taosdata", config:"/etc/taos",port:0})
*
*/
function TDengineConnection(options) {
this._conn = null;
this._host = null;
this._user = "root"; //The default user
this._password = "taosdata"; //The default password
this._database = null;
this._port = 0;
this._config = null;
this._chandle = null;
this._configConn(options)
return this;
}
/**
* Configure the connection to TDengine
* @private
* @memberof TDengineConnection
*/
TDengineConnection.prototype._configConn = function _configConn(options) {
if (options['host']) {
this._host = options['host'];
}
if (options['user']) {
this._user = options['user'];
}
if (options['password']) {
this._password = options['password'];
}
if (options['database']) {
this._database = options['database'];
}
if (options['port']) {
this._port = options['port'];
}
if (options['config']) {
this._config = options['config'];
}
this._chandle = new CTaosInterface(this._config);
this._conn = this._chandle.connect(this._host, this._user, this._password, this._database, this._port);
}
/** Close the connection to TDengine */
TDengineConnection.prototype.close = function close() {
this._chandle.close(this._conn);
}
/**
* Initialize a new cursor to interact with TDengine with
* @return {TDengineCursor}
*/
TDengineConnection.prototype.cursor = function cursor() {
//Pass the connection object to the cursor
return new TDengineCursor(this);
}
TDengineConnection.prototype.commit = function commit() {
return this;
}
TDengineConnection.prototype.rollback = function rollback() {
return this;
}
/**
* Clear the results from connector
* @private
*/
/*
TDengineConnection.prototype._clearResultSet = function _clearResultSet() {
var result = this._chandle.useResult(this._conn).result;
if (result) {
this._chandle.freeResult(result)
}
}
*/
此差异已折叠。
此差异已折叠。
此差异已折叠。
/* Wrap a callback, reduce code amount */
function wrapCB(callback, input) {
if (typeof callback === 'function') {
callback(input);
}
return;
}
global.wrapCB = wrapCB;
function toTaosTSString(date) {
date = new Date(date);
let tsArr = date.toISOString().split("T")
return tsArr[0] + " " + tsArr[1].substring(0, tsArr[1].length-1);
}
global.toTaosTSString = toTaosTSString;
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
var TDengineConnection = require('./nodetaos/connection.js')
module.exports.connect = function (connection={}) {
return new TDengineConnection(connection);
}
此差异已折叠。
此差异已折叠。
const taos = require('../tdengine');
var conn = taos.connect();
var c1 = conn.cursor();
let stime = new Date();
let interval = 1000;
function convertDateToTS(date) {
let tsArr = date.toISOString().split("T")
return "\"" + tsArr[0] + " " + tsArr[1].substring(0, tsArr[1].length - 1) + "\"";
}
function R(l, r) {
return Math.random() * (r - l) - r;
}
function randomBool() {
if (Math.random() < 0.5) {
return true;
}
return false;
}
// Initialize
//c1.execute('drop database td_connector_test;');
const dbname = 'nodejs_test_us';
c1.execute('create database if not exists ' + dbname + ' precision "us"');
c1.execute('use ' + dbname)
c1.execute('create table if not exists tstest (ts timestamp, _int int);');
c1.execute('insert into tstest values(1625801548423914, 0)');
// Select
console.log('select * from tstest');
c1.execute('select * from tstest');
var d = c1.fetchall();
console.log(c1.fields);
let ts = d[0][0];
console.log(ts);
if (ts.taosTimestamp() != 1625801548423914) {
throw "microseconds not match!";
}
if (ts.getMicroseconds() % 1000 !== 914) {
throw "micronsecond precision error";
}
setTimeout(function () {
c1.query('drop database nodejs_us_test;');
}, 200);
setTimeout(function () {
conn.close();
}, 2000);
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
1641976781445,1
1641976781446,2
1641976781447,3
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册