提交 3f7c27b2 编写于 作者: Z zyyang

change

...@@ -45,7 +45,7 @@ def pre_test(){ ...@@ -45,7 +45,7 @@ def pre_test(){
git pull git pull
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
git --no-pager diff --name-only FETCH_HEAD $(git merge-base FETCH_HEAD develop)|grep -v -E '.*md|//src//connector|Jenkinsfile' || exit 0 git --no-pager diff --name-only FETCH_HEAD $(git merge-base FETCH_HEAD develop)|grep -v -E '.*md|.*src/connector|Jenkinsfile' || exit 0
cd ${WK} cd ${WK}
git reset --hard HEAD~10 git reset --hard HEAD~10
git checkout develop git checkout develop
......
...@@ -2410,7 +2410,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2410,7 +2410,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
// record the total inserted rows // record the total inserted rows
if (numOfRows > 0) { if (numOfRows > 0) {
pParentObj->res.numOfRows += numOfRows; atomic_add_fetch_32(&pParentObj->res.numOfRows, numOfRows);
} }
if (taos_errno(tres) != TSDB_CODE_SUCCESS) { if (taos_errno(tres) != TSDB_CODE_SUCCESS) {
......
...@@ -36,6 +36,7 @@ ...@@ -36,6 +36,7 @@
<maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version> <maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version>
<commons-logging.version>1.1.2</commons-logging.version> <commons-logging.version>1.1.2</commons-logging.version>
<commons-lang3.version>3.5</commons-lang3.version> <commons-lang3.version>3.5</commons-lang3.version>
<maven.test.jvmargs></maven.test.jvmargs>
</properties> </properties>
<dependencies> <dependencies>
<dependency> <dependency>
...@@ -122,11 +123,14 @@ ...@@ -122,11 +123,14 @@
<artifactId>maven-surefire-plugin</artifactId> <artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version> <version>2.12.4</version>
<configuration> <configuration>
<forkMode>pertest</forkMode>
<argLine>${maven.test.jvmargs}</argLine>
<includes> <includes>
<include>**/*Test.java</include> <include>**/*Test.java</include>
</includes> </includes>
<excludes> <excludes>
<exclude>**/AppMemoryLeakTest.java</exclude> <exclude>**/AppMemoryLeakTest.java</exclude>
<exclude>**/TaosInfoMonitorTest.java</exclude>
<exclude>**/FailOverTest.java</exclude> <exclude>**/FailOverTest.java</exclude>
</excludes> </excludes>
<testFailureIgnore>true</testFailureIgnore> <testFailureIgnore>true</testFailureIgnore>
......
...@@ -14,6 +14,8 @@ ...@@ -14,6 +14,8 @@
*****************************************************************************/ *****************************************************************************/
package com.taosdata.jdbc; package com.taosdata.jdbc;
import com.taosdata.jdbc.utils.TaosInfo;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLWarning; import java.sql.SQLWarning;
import java.util.List; import java.util.List;
...@@ -21,6 +23,8 @@ import java.util.List; ...@@ -21,6 +23,8 @@ import java.util.List;
public class TSDBJNIConnector { public class TSDBJNIConnector {
private static volatile Boolean isInitialized = false; private static volatile Boolean isInitialized = false;
private TaosInfo taosInfo = TaosInfo.getInstance();
static { static {
System.loadLibrary("taos"); System.loadLibrary("taos");
System.out.println("java.library.path:" + System.getProperty("java.library.path")); System.out.println("java.library.path:" + System.getProperty("java.library.path"));
...@@ -91,7 +95,8 @@ public class TSDBJNIConnector { ...@@ -91,7 +95,8 @@ public class TSDBJNIConnector {
*/ */
public boolean connect(String host, int port, String dbName, String user, String password) throws SQLException { public boolean connect(String host, int port, String dbName, String user, String password) throws SQLException {
if (this.taos != TSDBConstants.JNI_NULL_POINTER) { if (this.taos != TSDBConstants.JNI_NULL_POINTER) {
this.closeConnectionImp(this.taos); // this.closeConnectionImp(this.taos);
closeConnection();
this.taos = TSDBConstants.JNI_NULL_POINTER; this.taos = TSDBConstants.JNI_NULL_POINTER;
} }
...@@ -99,7 +104,8 @@ public class TSDBJNIConnector { ...@@ -99,7 +104,8 @@ public class TSDBJNIConnector {
if (this.taos == TSDBConstants.JNI_NULL_POINTER) { if (this.taos == TSDBConstants.JNI_NULL_POINTER) {
throw new SQLException(TSDBConstants.WrapErrMsg(this.getErrMsg(0L)), "", this.getErrCode(0l)); throw new SQLException(TSDBConstants.WrapErrMsg(this.getErrMsg(0L)), "", this.getErrCode(0l));
} }
// invoke connectImp only here
taosInfo.conn_open_increment();
return true; return true;
} }
...@@ -120,6 +126,7 @@ public class TSDBJNIConnector { ...@@ -120,6 +126,7 @@ public class TSDBJNIConnector {
Long pSql = 0l; Long pSql = 0l;
try { try {
pSql = this.executeQueryImp(sql.getBytes(TaosGlobalConfig.getCharset()), this.taos); pSql = this.executeQueryImp(sql.getBytes(TaosGlobalConfig.getCharset()), this.taos);
taosInfo.stmt_count_increment();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
this.freeResultSetImp(this.taos, pSql); this.freeResultSetImp(this.taos, pSql);
...@@ -244,10 +251,11 @@ public class TSDBJNIConnector { ...@@ -244,10 +251,11 @@ public class TSDBJNIConnector {
private native int fetchRowImp(long connection, long resultSet, TSDBResultSetRowData rowData); private native int fetchRowImp(long connection, long resultSet, TSDBResultSetRowData rowData);
public int fetchBlock(long resultSet, TSDBResultSetBlockData blockData) { public int fetchBlock(long resultSet, TSDBResultSetBlockData blockData) {
return this.fetchBlockImp(this.taos, resultSet, blockData); return this.fetchBlockImp(this.taos, resultSet, blockData);
} }
private native int fetchBlockImp(long connection, long resultSet, TSDBResultSetBlockData blockData); private native int fetchBlockImp(long connection, long resultSet, TSDBResultSetBlockData blockData);
/** /**
* Execute close operation from C to release connection pointer by JNI * Execute close operation from C to release connection pointer by JNI
* *
...@@ -262,6 +270,8 @@ public class TSDBJNIConnector { ...@@ -262,6 +270,8 @@ public class TSDBJNIConnector {
} else { } else {
throw new SQLException("Undefined error code returned by TDengine when closing a connection"); throw new SQLException("Undefined error code returned by TDengine when closing a connection");
} }
// invoke closeConnectionImpl only here
taosInfo.connect_close_increment();
} }
private native int closeConnectionImp(long connection); private native int closeConnectionImp(long connection);
......
...@@ -14,6 +14,8 @@ ...@@ -14,6 +14,8 @@
*****************************************************************************/ *****************************************************************************/
package com.taosdata.jdbc; package com.taosdata.jdbc;
import com.taosdata.jdbc.utils.TaosInfo;
import java.sql.*; import java.sql.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
......
package com.taosdata.jdbc.utils;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class TDNode {
private int index;
private int running;
private int deployed;
private boolean testCluster;
private String path;
private String cfgDir;
private String dataDir;
private String logDir;
private String cfgPath;
public TDNode(int index) {
this.index = index;
running = 0;
deployed = 0;
testCluster = false;
}
public void setPath(String path) {
this.path = path;
}
public void setTestCluster(boolean testCluster) {
this.testCluster = testCluster;
}
public void setRunning(int running) {
this.running = running;
}
public void searchTaosd(File dir, ArrayList<String> taosdPath) {
File[] fileList = dir.listFiles();
if(fileList == null || fileList.length == 0) {
return;
}
for(File file : fileList) {
if(file.isFile()) {
if(file.getName().equals("taosd")) {
taosdPath.add(file.getAbsolutePath());
}
} else {
searchTaosd(file, taosdPath);
}
}
}
public void start() {
String selfPath = System.getProperty("user.dir");
String binPath = "";
String projDir = selfPath + "/../../../";
try {
ArrayList<String> taosdPath = new ArrayList<>();
File dir = new File(projDir);
String realProjDir = dir.getCanonicalPath();
dir = new File(realProjDir);
System.out.println("project Dir: " + projDir);
searchTaosd(dir, taosdPath);
if(taosdPath.size() == 0) {
System.out.println("The project path doens't exist");
return;
} else {
for(String p : taosdPath) {
if(!p.contains("packaging")) {
binPath = p;
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
if(binPath.isEmpty()) {
System.out.println("taosd not found");
return;
} else {
System.out.println("taosd found in " + binPath);
}
if(this.deployed == 0) {
System.out.println("dnode" + index + "is not deployed");
return;
}
String cmd = "nohup " + binPath + " -c " + cfgDir + " > /dev/null 2>&1 & ";
System.out.println("start taosd cmd: " + cmd);
try{
Runtime.getRuntime().exec(cmd);
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
}
this.running = 1;
}
public Integer getTaosdPid() {
String cmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'";
String[] cmds = {"sh", "-c", cmd};
try {
Process process = Runtime.getRuntime().exec(cmds);
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line = null;
Integer res = null;
while((line = reader.readLine()) != null) {
if(!line.isEmpty()) {
res = Integer.valueOf(line);
break;
}
}
return res;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public void stop() {
if (this.running != 0) {
Integer pid = null;
while((pid = getTaosdPid()) != null) {
String killCmd = "kill -term " + pid;
String[] killCmds = {"sh", "-c", killCmd};
try {
Runtime.getRuntime().exec(killCmds).waitFor();
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
e.printStackTrace();
}
}
try {
for(int port = 6030; port < 6041; port ++) {
String fuserCmd = "fuser -k -n tcp " + port;
Runtime.getRuntime().exec(fuserCmd).waitFor();
}
} catch (Exception e) {
e.printStackTrace();
}
this.running = 0;
System.out.println("dnode:" + this.index + " is stopped by kill -term");
}
}
public void startIP() {
try{
String cmd = "sudo ifconfig lo:" + index + "192.168.0." + index + " up";
Runtime.getRuntime().exec(cmd).waitFor();
} catch (Exception e) {
e.printStackTrace();
}
}
public void stopIP() {
try{
String cmd = "sudo ifconfig lo:" + index + "192.168.0." + index + " down";
Runtime.getRuntime().exec(cmd).waitFor();
} catch (Exception e) {
e.printStackTrace();
}
}
public void setCfgConfig(String option, String value) {
try{
String cmd = "echo " + option + " " + value + " >> " + this.cfgPath;
String[] cmdLine = {"sh", "-c", cmd};
Process ps = Runtime.getRuntime().exec(cmdLine);
ps.waitFor();
} catch (Exception e) {
e.printStackTrace();
}
}
public String getDnodeRootDir() {
String dnodeRootDir = this.path + "/sim/psim/dnode" + this.index;
return dnodeRootDir;
}
public String getDnodesRootDir() {
String dnodesRootDir = this.path + "/sim/psim" + this.index;
return dnodesRootDir;
}
public void deploy() {
this.logDir = this.path + "/sim/dnode" + this.index + "/log";
this.dataDir = this.path + "/sim/dnode" + this.index + "/data";
this.cfgDir = this.path + "/sim/dnode" + this.index + "/cfg";
this.cfgPath = this.path + "/sim/dnode" + this.index + "/cfg/taos.cfg";
try {
String cmd = "rm -rf " + this.logDir;
Runtime.getRuntime().exec(cmd).waitFor();
cmd = "rm -rf " + this.cfgDir;
Runtime.getRuntime().exec(cmd).waitFor();
cmd = "rm -rf " + this.dataDir;
Runtime.getRuntime().exec(cmd).waitFor();
cmd = "mkdir -p " + this.logDir;
Runtime.getRuntime().exec(cmd).waitFor();
cmd = "mkdir -p " + this.cfgDir;
Runtime.getRuntime().exec(cmd).waitFor();
cmd = "mkdir -p " + this.dataDir;
Runtime.getRuntime().exec(cmd).waitFor();
cmd = "touch " + this.cfgPath;
Runtime.getRuntime().exec(cmd).waitFor();
} catch (Exception e) {
e.printStackTrace();
}
if(this.testCluster) {
startIP();
setCfgConfig("masterIp", "192.168.0.1");
setCfgConfig("secondIp", "192.168.0.2");
setCfgConfig("publicIp", "192.168.0." + this.index);
setCfgConfig("internalIp", "192.168.0." + this.index);
setCfgConfig("privateIp", "192.168.0." + this.index);
}
setCfgConfig("dataDir", this.dataDir);
setCfgConfig("logDir", this.logDir);
setCfgConfig("numOfLogLines", "1000000/00");
setCfgConfig("mnodeEqualVnodeNum", "0");
setCfgConfig("walLevel", "1");
setCfgConfig("statusInterval", "1");
setCfgConfig("numOfMnodes", "3");
setCfgConfig("numOfThreadsPerCore", "2.0");
setCfgConfig("monitor", "0");
setCfgConfig("maxVnodeConnections", "30000");
setCfgConfig("maxMgmtConnections", "30000");
setCfgConfig("maxMeterConnections", "30000");
setCfgConfig("maxShellConns", "30000");
setCfgConfig("locale", "en_US.UTF-8");
setCfgConfig("charset", "UTF-8");
setCfgConfig("asyncLog", "0");
setCfgConfig("anyIp", "0");
setCfgConfig("dDebugFlag", "135");
setCfgConfig("mDebugFlag", "135");
setCfgConfig("sdbDebugFlag", "135");
setCfgConfig("rpcDebugFlag", "135");
setCfgConfig("tmrDebugFlag", "131");
setCfgConfig("cDebugFlag", "135");
setCfgConfig("httpDebugFlag", "135");
setCfgConfig("monitorDebugFlag", "135");
setCfgConfig("udebugFlag", "135");
setCfgConfig("jnidebugFlag", "135");
setCfgConfig("qdebugFlag", "135");
this.deployed = 1;
}
}
\ No newline at end of file
package com.taosdata.jdbc.utils;
import java.io.File;
import java.util.*;
public class TDNodes {
private ArrayList<TDNode> tdNodes;
private boolean testCluster;
public TDNodes () {
tdNodes = new ArrayList<>();
for(int i = 1; i < 11; i ++) {
tdNodes.add(new TDNode(i));
}
}
public void setTestCluster(boolean testCluster) {
this.testCluster = testCluster;
}
public void check(int index) {
if(index < 1 || index > 10) {
System.out.println("index: " + index + " should on a scale of [1, 10]");
return;
}
}
public void deploy(int index) {
try {
File file = new File(System.getProperty("user.dir") + "/../../../");
String projectRealPath = file.getCanonicalPath();
check(index);
tdNodes.get(index - 1).setTestCluster(this.testCluster);
tdNodes.get(index - 1).setPath(projectRealPath);
tdNodes.get(index - 1).deploy();
} catch (Exception e) {
e.printStackTrace();
System.out.println("deploy Test Exception");
}
}
public void cfg(int index, String option, String value) {
check(index);
tdNodes.get(index - 1).setCfgConfig(option, value);
}
public TDNode getTDNode(int index) {
check(index);
return tdNodes.get(index - 1);
}
public void start(int index) {
check(index);
tdNodes.get(index - 1).start();
}
public void stop(int index) {
check(index);
tdNodes.get(index - 1).stop();
}
public void startIP(int index) {
check(index);
tdNodes.get(index - 1).startIP();
}
public void stopIP(int index) {
check(index);
tdNodes.get(index - 1).stopIP();
}
}
\ No newline at end of file
package com.taosdata.jdbc.utils;
import javax.management.*;
import java.lang.management.ManagementFactory;
import java.util.concurrent.atomic.AtomicLong;
public class TaosInfo implements TaosInfoMBean {
private static volatile TaosInfo instance;
private AtomicLong connect_open = new AtomicLong();
private AtomicLong connect_close = new AtomicLong();
private AtomicLong statement_count = new AtomicLong();
static {
try {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName("TaosInfoMBean:name=TaosInfo");
server.registerMBean(TaosInfo.getInstance(), name);
} catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
e.printStackTrace();
}
}
@Override
public long getConnect_open() {
return connect_open.get();
}
@Override
public long getConnect_close() {
return connect_close.get();
}
@Override
public long getConnect_active() {
return connect_open.get() - connect_close.get();
}
@Override
public long getStatement_count() {
return statement_count.get();
}
/*******************************************************/
public void conn_open_increment() {
connect_open.incrementAndGet();
}
public void connect_close_increment() {
connect_close.incrementAndGet();
}
public void stmt_count_increment() {
statement_count.incrementAndGet();
}
/********************************************************************************/
private TaosInfo() {
}
public static TaosInfo getInstance() {
if (instance == null) {
synchronized (TaosInfo.class) {
if (instance == null) {
instance = new TaosInfo();
}
}
}
return instance;
}
}
package com.taosdata.jdbc.utils;
public interface TaosInfoMBean {
long getConnect_open();
long getConnect_close();
long getConnect_active();
long getStatement_count();
}
package com.taosdata.jdbc.cases;
import org.junit.Test;
import java.sql.*;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class TaosInfoMonitorTest {
@Test
public void testCreateTooManyConnection() throws ClassNotFoundException {
Class.forName("com.taosdata.jdbc.TSDBDriver");
final String url = "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata";
List<Connection> connectionList = IntStream.range(0, 100).mapToObj(i -> {
try {
TimeUnit.MILLISECONDS.sleep(100);
return DriverManager.getConnection(url);
} catch (SQLException | InterruptedException e) {
e.printStackTrace();
}
return null;
}).collect(Collectors.toList());
connectionList.stream().forEach(conn -> {
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("show databases");
while (rs.next()) {
}
TimeUnit.MILLISECONDS.sleep(100);
} catch (SQLException | InterruptedException e) {
e.printStackTrace();
}
});
connectionList.stream().forEach(conn -> {
try {
conn.close();
TimeUnit.MILLISECONDS.sleep(100);
} catch (SQLException | InterruptedException e) {
e.printStackTrace();
}
});
}
}
...@@ -227,6 +227,7 @@ void cqClose(void *handle) { ...@@ -227,6 +227,7 @@ void cqClose(void *handle) {
taosRemoveRef(cqObjRef, rid); taosRemoveRef(cqObjRef, rid);
} }
freeSCqContext(pContext); freeSCqContext(pContext);
} }
......
...@@ -173,6 +173,7 @@ typedef struct SSuperTable_S { ...@@ -173,6 +173,7 @@ typedef struct SSuperTable_S {
int childTblCount; int childTblCount;
bool superTblExists; // 0: no, 1: yes bool superTblExists; // 0: no, 1: yes
bool childTblExists; // 0: no, 1: yes bool childTblExists; // 0: no, 1: yes
int batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql
int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
char childTblPrefix[MAX_TB_NAME_SIZE]; char childTblPrefix[MAX_TB_NAME_SIZE];
char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample
...@@ -808,13 +809,14 @@ static void init_rand_data() { ...@@ -808,13 +809,14 @@ static void init_rand_data() {
static void printfInsertMeta() { static void printfInsertMeta() {
printf("\033[1m\033[40;32m================ insert.json parse result START ================\033[0m\n"); printf("\033[1m\033[40;32m================ insert.json parse result START ================\033[0m\n");
printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port); printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port);
printf("user: \033[33m%s\033[0m\n", g_Dbs.user); printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
printf("password: \033[33m%s\033[0m\n", g_Dbs.password); printf("password: \033[33m%s\033[0m\n", g_Dbs.password);
printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile); printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile);
printf("thread count: \033[33m%d\033[0m\n", g_Dbs.threadCount); printf("thread num of insert data: \033[33m%d\033[0m\n", g_Dbs.threadCount);
printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl);
printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
printf("database[\033[33m%d\033[0m]:\n", i); printf("database[\033[33m%d\033[0m]:\n", i);
printf(" database name: \033[33m%s\033[0m\n", g_Dbs.db[i].dbName); printf(" database name: \033[33m%s\033[0m\n", g_Dbs.db[i].dbName);
...@@ -944,11 +946,12 @@ static void printfInsertMeta() { ...@@ -944,11 +946,12 @@ static void printfInsertMeta() {
static void printfInsertMetaToFile(FILE* fp) { static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, "================ insert.json parse result START================\n"); fprintf(fp, "================ insert.json parse result START================\n");
fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port); fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port);
fprintf(fp, "user: %s\n", g_Dbs.user); fprintf(fp, "user: %s\n", g_Dbs.user);
fprintf(fp, "password: %s\n", g_Dbs.password); fprintf(fp, "password: %s\n", g_Dbs.password);
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile); fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
fprintf(fp, "thread count: %d\n", g_Dbs.threadCount); fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount);
fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl);
fprintf(fp, "database count: %d\n", g_Dbs.dbCount); fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
...@@ -1730,19 +1733,27 @@ static int createDatabases() { ...@@ -1730,19 +1733,27 @@ static int createDatabases() {
void * createTable(void *sarg) void * createTable(void *sarg)
{ {
char command[BUFFER_SIZE] = "\0";
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo; SSuperTable* superTblInfo = winfo->superTblInfo;
int64_t lastPrintTime = taosGetTimestampMs(); int64_t lastPrintTime = taosGetTimestampMs();
char* buffer = calloc(superTblInfo->maxSqlLen, 1);
int len = 0;
int batchNum = 0;
//printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id); //printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) { for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
if (0 == g_Dbs.use_metric) { if (0 == g_Dbs.use_metric) {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable); snprintf(buffer, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable);
} else { } else {
if (0 == len) {
batchNum = 0;
memset(buffer, 0, superTblInfo->maxSqlLen);
len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "create table ");
}
char* tagsValBuf = NULL; char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) { if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagVaulesForStb(superTblInfo); tagsValBuf = generateTagVaulesForStb(superTblInfo);
...@@ -1750,13 +1761,22 @@ void * createTable(void *sarg) ...@@ -1750,13 +1761,22 @@ void * createTable(void *sarg)
tagsValBuf = getTagValueFromTagSample(superTblInfo, i % superTblInfo->tagSampleCount); tagsValBuf = getTagValueFromTagSample(superTblInfo, i % superTblInfo->tagSampleCount);
} }
if (NULL == tagsValBuf) { if (NULL == tagsValBuf) {
free(buffer);
return NULL; return NULL;
} }
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.%s tags %s;", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf);
len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "if not exists %s.%s%d using %s.%s tags %s ", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf);
free(tagsValBuf); free(tagsValBuf);
batchNum++;
if ((batchNum < superTblInfo->batchCreateTableNum) && ((superTblInfo->maxSqlLen - len) >= (superTblInfo->lenOfTagOfOneRow + 256))) {
continue;
}
} }
if (0 != queryDbExec(winfo->taos, command, NO_INSERT_TYPE)){ len = 0;
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){
free(buffer);
return NULL; return NULL;
} }
...@@ -1766,7 +1786,12 @@ void * createTable(void *sarg) ...@@ -1766,7 +1786,12 @@ void * createTable(void *sarg)
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
} }
} }
if (0 != len) {
(void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE);
}
free(buffer);
return NULL; return NULL;
} }
...@@ -2422,6 +2447,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -2422,6 +2447,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("failed to read json, auto_create_table not found"); printf("failed to read json, auto_create_table not found");
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num");
if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint;
} else if (!batchCreateTbl) {
g_Dbs.db[i].superTbls[j].batchCreateTableNum = 2000;
} else {
printf("failed to read json, batch_create_tbl_num not found");
goto PARSE_OVER;
}
cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no
if (childTblExists && childTblExists->type == cJSON_String && childTblExists->valuestring != NULL) { if (childTblExists && childTblExists->type == cJSON_String && childTblExists->valuestring != NULL) {
...@@ -3679,14 +3714,14 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu ...@@ -3679,14 +3714,14 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
b = ntables % threads; b = ntables % threads;
} }
TAOS* taos; //TAOS* taos;
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { //if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); // taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
if (NULL == taos) { // if (NULL == taos) {
printf("connect to server fail, reason: %s\n", taos_errstr(NULL)); // printf("connect to server fail, reason: %s\n", taos_errstr(NULL));
exit(-1); // exit(-1);
} // }
} //}
int32_t timePrec = TSDB_TIME_PRECISION_MILLI; int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
if (0 != precision[0]) { if (0 != precision[0]) {
...@@ -3719,7 +3754,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu ...@@ -3719,7 +3754,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
t_info->start_time = start_time; t_info->start_time = start_time;
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
t_info->taos = taos; //t_info->taos = taos;
t_info->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
if (NULL == t_info->taos) {
printf("connect to server fail from insert sub thread, reason: %s\n", taos_errstr(NULL));
exit(-1);
}
} else { } else {
t_info->taos = NULL; t_info->taos = NULL;
#ifdef TD_LOWA_CURL #ifdef TD_LOWA_CURL
...@@ -3754,6 +3794,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu ...@@ -3754,6 +3794,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
threadInfo *t_info = infos + i; threadInfo *t_info = infos + i;
tsem_destroy(&(t_info->lock_sem)); tsem_destroy(&(t_info->lock_sem));
taos_close(t_info->taos);
superTblInfo->totalAffectedRows += t_info->totalAffectedRows; superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
superTblInfo->totalRowsInserted += t_info->totalRowsInserted; superTblInfo->totalRowsInserted += t_info->totalRowsInserted;
...@@ -3766,7 +3807,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu ...@@ -3766,7 +3807,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
double end = getCurrentTime(); double end = getCurrentTime();
taos_close(taos); //taos_close(taos);
free(pids); free(pids);
free(infos); free(infos);
......
...@@ -99,7 +99,7 @@ python3 test.py -f query/queryFillTest.py ...@@ -99,7 +99,7 @@ python3 test.py -f query/queryFillTest.py
python3 test.py -f tools/taosdemoTest.py python3 test.py -f tools/taosdemoTest.py
python3 test.py -f tools/taosdumpTest.py python3 test.py -f tools/taosdumpTest.py
python3 test.py -f tools/lowaTest.py python3 test.py -f tools/lowaTest.py
python3 test.py -f tools/taosdemoTest2.py #python3 test.py -f tools/taosdemoTest2.py
# subscribe # subscribe
python3 test.py -f subscribe/singlemeter.py python3 test.py -f subscribe/singlemeter.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册