提交 92f89c48 编写于 作者: S shenglian zhou

(query):Merge branch 'develop' into szhou/feature/string-case-transform-substr-trim

......@@ -134,7 +134,7 @@ taos> source <filename>;
## <a class="anchor" id="demo"></a>TDengine 极速体验
启动 TDengine 的服务,在 Linux 终端执行 taosBenchmark (曾命名为 taosdemo):
启动 TDengine 的服务,在 Linux 终端执行 taosBenchmark (曾命名为 taosdemo,在 2.4 之后的版本请按照独立的 taosTools 软件包):
```bash
$ taosBenchmark
......
......@@ -46,10 +46,12 @@ TDengine 的 JDBC 驱动实现尽可能与关系型数据库驱动保持一致
</tr>
</table>
注意:与 JNI 方式不同,RESTful 接口是无状态的。在使用JDBC-RESTful时,需要在sql中指定表、超级表的数据库名称。(从 TDengine 2.2.0.0 版本开始,也可以在 RESTful url 中指定当前 SQL 语句所使用的默认数据库名。)例如:
注意:
* 与 JNI 方式不同,RESTful 接口是无状态的。在使用JDBC-RESTful时,需要在sql中指定表、超级表的数据库名称。例如:
```sql
INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('beijing') VALUES(now, 24.6);
```
* 从taos-jdbcdriver-2.0.36和TDengine 2.2.0.0 版本开始,如果在url中指定了dbname,那么,JDBC-RESTful会默认使用/rest/sql/dbname作为resful请求的url,在sql中不需要指定dbname。例如:url为jdbc:TAOS-RS://127.0.0.1:6041/test,那么,可以执行sql:insert into t1 using weather(ts, temperatrue) tags('beijing') values(now, 24.6);
## <a class="anchor" id="version"></a>TAOS-JDBCDriver 版本以及支持的 TDengine 版本和 JDK 版本
......
......@@ -138,7 +138,7 @@ taos> source <filename>;
## <a class="anchor" id="demo"></a>Experience TDengine’s Lightning Speed
After starting the TDengine server, you can execute the command `taosBenchmark` (was named `taosdemo`) in the Linux terminal.
After starting the TDengine server, you can execute the command `taosBenchmark` (was named `taosdemo`, please install taosTools package if you use TDengine 2.4 or later version) in the Linux terminal.
```bash
$ taosBenchmark
......
......@@ -2007,9 +2007,23 @@ int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, in
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHECK
if (bind == NULL || bind->num <= 0 || bind->num > INT16_MAX || colIdx < 0) {
tscError("0x%"PRIx64" invalid parameter", pStmt->pSql->self);
STMT_RET(invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), "invalid bind param"));
if (bind == NULL) {
tscError("0x%" PRIx64 " invalid parameter: bind is NULL", pStmt->pSql->self);
STMT_RET(invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), "invalid bind param: bind is NULL"));
}
if (bind->num <= 0 || bind->num > INT16_MAX) {
char errMsg[128];
sprintf(errMsg, "invalid parameter: bind->num:%d out of range [0, %d)", bind->num, INT16_MAX);
tscError("0x%" PRIx64 " %s", pStmt->pSql->self, errMsg);
STMT_RET(invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), errMsg));
}
if (colIdx < 0) {
char errMsg[128];
sprintf(errMsg, "invalid parameter: column index:%d less than 0", colIdx);
tscError("0x%" PRIx64 " %s", pStmt->pSql->self, errMsg);
STMT_RET(invalidOperationMsg(tscGetErrorMsgPayload(&pStmt->pSql->cmd), errMsg));
}
if (!pStmt->isInsert) {
......
......@@ -52,6 +52,11 @@
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
......
......@@ -92,6 +92,16 @@ public class TSDBDriver extends AbstractDriver {
*/
public static final String PROPERTY_KEY_BATCH_ERROR_IGNORE = "batchErrorIgnore";
/**
* message receive from server timeout. ms
*/
public static final String PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT = "messageWaitTimeout";
/**
* max message number send to server concurrently
*/
public static final String PROPERTY_KEY_MAX_CONCURRENT_REQUEST = "maxConcurrentRequest";
private TSDBDatabaseMetaData dbMetaData = null;
static {
......
......@@ -4,12 +4,22 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.*;
import com.taosdata.jdbc.utils.HttpClientPoolUtil;
import com.taosdata.jdbc.ws.InFlightRequest;
import com.taosdata.jdbc.ws.Transport;
import com.taosdata.jdbc.ws.WSClient;
import com.taosdata.jdbc.ws.WSConnection;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.sql.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
public class RestfulDriver extends AbstractDriver {
......@@ -39,20 +49,56 @@ public class RestfulDriver extends AbstractDriver {
String port = props.getProperty(TSDBDriver.PROPERTY_KEY_PORT, "6041");
String database = props.containsKey(TSDBDriver.PROPERTY_KEY_DBNAME) ? props.getProperty(TSDBDriver.PROPERTY_KEY_DBNAME) : null;
String loginUrl;
String user;
String password;
try {
if (!props.containsKey(TSDBDriver.PROPERTY_KEY_USER))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_USER_IS_REQUIRED);
if (!props.containsKey(TSDBDriver.PROPERTY_KEY_PASSWORD))
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_PASSWORD_IS_REQUIRED);
String user = URLEncoder.encode(props.getProperty(TSDBDriver.PROPERTY_KEY_USER), StandardCharsets.UTF_8.displayName());
String password = URLEncoder.encode(props.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD), StandardCharsets.UTF_8.displayName());
loginUrl = "http://" + props.getProperty(TSDBDriver.PROPERTY_KEY_HOST) + ":" + props.getProperty(TSDBDriver.PROPERTY_KEY_PORT) + "/rest/login/" + user + "/" + password + "";
user = URLEncoder.encode(props.getProperty(TSDBDriver.PROPERTY_KEY_USER), StandardCharsets.UTF_8.displayName());
password = URLEncoder.encode(props.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD), StandardCharsets.UTF_8.displayName());
} catch (UnsupportedEncodingException e) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_VARIABLE, "unsupported UTF-8 concoding, user: " + props.getProperty(TSDBDriver.PROPERTY_KEY_USER) + ", password: " + props.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD));
}
String loginUrl;
String batchLoad = info.getProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD);
// if (Boolean.parseBoolean(batchLoad)) {
if (false) {
loginUrl = "ws://" + props.getProperty(TSDBDriver.PROPERTY_KEY_HOST)
+ ":" + props.getProperty(TSDBDriver.PROPERTY_KEY_PORT) + "/rest/ws";
WSClient client;
Transport transport;
try {
int timeout = props.containsKey(TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT)
? Integer.parseInt(props.getProperty(TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT))
: Transport.DEFAULT_MESSAGE_WAIT_TIMEOUT;
int maxRequest = props.containsKey(TSDBDriver.PROPERTY_KEY_MAX_CONCURRENT_REQUEST)
? Integer.parseInt(props.getProperty(TSDBDriver.PROPERTY_KEY_MAX_CONCURRENT_REQUEST))
: Transport.DEFAULT_MAX_REQUEST;
InFlightRequest inFlightRequest = new InFlightRequest(timeout, maxRequest);
CountDownLatch latch = new CountDownLatch(1);
Map<String, String> httpHeaders = new HashMap<>();
client = new WSClient(new URI(loginUrl), user, password, database, inFlightRequest, httpHeaders, latch, maxRequest);
transport = new Transport(client, inFlightRequest);
if (!client.connectBlocking()) {
throw new SQLException("can't create connection with server");
}
if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
throw new SQLException("auth timeout");
}
if (client.isAuth()) {
throw new SQLException("auth failure");
}
} catch (URISyntaxException e) {
throw new SQLException("Websocket url parse error: " + loginUrl, e);
} catch (InterruptedException e) {
throw new SQLException("creat websocket connection has been Interrupted ", e);
}
return new WSConnection(url, props, transport, database, true);
}
loginUrl = "http://" + props.getProperty(TSDBDriver.PROPERTY_KEY_HOST) + ":" + props.getProperty(TSDBDriver.PROPERTY_KEY_PORT) + "/rest/login/" + user + "/" + password + "";
int poolSize = Integer.parseInt(props.getProperty("httpPoolSize", HttpClientPoolUtil.DEFAULT_MAX_PER_ROUTE));
boolean keepAlive = Boolean.parseBoolean(props.getProperty("httpKeepAlive", HttpClientPoolUtil.DEFAULT_HTTP_KEEP_ALIVE));
......
package com.taosdata.jdbc.ws;
import java.util.Map;
import java.util.concurrent.*;
/**
* Unfinished execution
*/
public class InFlightRequest implements AutoCloseable {
private final int timeoutSec;
private final Semaphore semaphore;
private final Map<String, ResponseFuture> futureMap = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final ScheduledFuture<?> scheduledFuture;
public InFlightRequest(int timeoutSec, int concurrentNum) {
this.timeoutSec = timeoutSec;
this.semaphore = new Semaphore(concurrentNum);
this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this::removeTimeoutFuture, timeoutSec, timeoutSec, TimeUnit.MILLISECONDS);
}
public void put(ResponseFuture responseFuture) throws InterruptedException, TimeoutException {
if (semaphore.tryAcquire(timeoutSec, TimeUnit.MILLISECONDS)) {
futureMap.put(responseFuture.getId(), responseFuture);
} else {
throw new TimeoutException();
}
}
public ResponseFuture remove(String id) {
ResponseFuture future = futureMap.remove(id);
if (null != future) {
semaphore.release();
}
return future;
}
private void removeTimeoutFuture() {
futureMap.entrySet().removeIf(entry -> {
if (System.nanoTime() - entry.getValue().getTimestamp() > timeoutSec * 1_000_000L) {
try {
entry.getValue().getFuture().completeExceptionally(new TimeoutException());
}finally {
semaphore.release();
}
return true;
} else {
return false;
}
});
}
@Override
public void close() {
scheduledFuture.cancel(true);
scheduledExecutorService.shutdown();
}
}
package com.taosdata.jdbc.ws;
import com.taosdata.jdbc.ws.entity.Response;
import java.util.concurrent.CompletableFuture;
public class ResponseFuture {
private final String id;
private final CompletableFuture<Response> future;
private final long timestamp;
public ResponseFuture(String id, CompletableFuture<Response> future) {
this.id = id;
this.future = future;
timestamp = System.nanoTime();
}
public String getId() {
return id;
}
public CompletableFuture<Response> getFuture() {
return future;
}
long getTimestamp() {
return timestamp;
}
}
package com.taosdata.jdbc.ws;
import com.taosdata.jdbc.ws.entity.Request;
import com.taosdata.jdbc.ws.entity.Response;
import java.sql.SQLException;
import java.util.concurrent.CompletableFuture;
/**
* send message
*/
public class Transport implements AutoCloseable {
public static final int DEFAULT_MAX_REQUEST = 100;
public static final int DEFAULT_MESSAGE_WAIT_TIMEOUT = 3_000;
private final WSClient client;
private final InFlightRequest inFlightRequest;
public Transport(WSClient client, InFlightRequest inFlightRequest) {
this.client = client;
this.inFlightRequest = inFlightRequest;
}
public CompletableFuture<Response> send(Request request) {
CompletableFuture<Response> completableFuture = new CompletableFuture<>();
try {
inFlightRequest.put(new ResponseFuture(request.id(), completableFuture));
client.send(request.toString());
} catch (Throwable t) {
inFlightRequest.remove(request.id());
completableFuture.completeExceptionally(t);
}
return completableFuture;
}
public boolean isClosed() throws SQLException {
return client.isClosed();
}
@Override
public void close() throws SQLException {
client.close();
}
}
package com.taosdata.jdbc.ws;
import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.ws.entity.*;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.*;
public class WSClient extends WebSocketClient implements AutoCloseable {
private final String user;
private final String password;
private final String database;
private final CountDownLatch latch;
private final InFlightRequest inFlightRequest;
ThreadPoolExecutor executor;
private boolean auth;
public boolean isAuth() {
return auth;
}
/**
* create websocket connection client
*
* @param serverUri connection url
* @param user database user
* @param password database password
* @param database connection database
*/
public WSClient(URI serverUri, String user, String password, String database, InFlightRequest inFlightRequest, Map<String, String> httpHeaders, CountDownLatch latch, int maxRequest) {
super(serverUri, httpHeaders);
this.user = user;
this.password = password;
this.database = database;
this.inFlightRequest = inFlightRequest;
this.latch = latch;
executor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(maxRequest),
r -> {
Thread t = new Thread(r);
t.setName("parse-message-" + t.getId());
return t;
},
new ThreadPoolExecutor.CallerRunsPolicy());
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
// certification
Request request = Request.generateConnect(user, password, database);
this.send(request.toString());
}
@Override
public void onMessage(String message) {
if (!"".equals(message)) {
executor.submit(() -> {
JSONObject jsonObject = JSONObject.parseObject(message);
if (Action.CONN.getAction().equals(jsonObject.getString("action"))) {
latch.countDown();
if (Code.SUCCESS.getCode() != jsonObject.getInteger("code")) {
auth = false;
this.close();
}
} else {
Response response = parseMessage(jsonObject);
ResponseFuture remove = inFlightRequest.remove(response.id());
if (null != remove) {
remove.getFuture().complete(response);
}
}
});
}
}
private Response parseMessage(JSONObject message) {
Action action = Action.of(message.getString("action"));
return message.toJavaObject(action.getResponseClazz());
}
@Override
public void onMessage(ByteBuffer bytes) {
super.onMessage(bytes);
}
@Override
public void onClose(int code, String reason, boolean remote) {
if (remote) {
throw new RuntimeException("The remote server closed the connection: " + reason);
} else {
throw new RuntimeException("close connection: " + reason);
}
}
@Override
public void onError(Exception e) {
this.close();
}
@Override
public void close() {
super.close();
executor.shutdown();
inFlightRequest.close();
}
}
package com.taosdata.jdbc.ws;
import com.taosdata.jdbc.AbstractConnection;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.TSDBErrorNumbers;
import com.taosdata.jdbc.rs.RestfulDatabaseMetaData;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
public class WSConnection extends AbstractConnection {
private final Transport transport;
private final DatabaseMetaData metaData;
private final String database;
private boolean fetchType;
public WSConnection(String url, Properties properties, Transport transport, String database, boolean fetchType) {
super(properties);
this.transport = transport;
this.database = database;
this.fetchType = fetchType;
this.metaData = new RestfulDatabaseMetaData(url, properties.getProperty(TSDBDriver.PROPERTY_KEY_USER), this);
}
@Override
public Statement createStatement() throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
// return new WSStatement(transport, database , fetchType);
return null;
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
if (isClosed())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
// return new WSPreparedStatement();
return null;
}
@Override
public void close() throws SQLException {
transport.close();
}
@Override
public boolean isClosed() throws SQLException {
return transport.isClosed();
}
@Override
public DatabaseMetaData getMetaData() throws SQLException {
if (isClosed()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
}
return this.metaData;
}
}
package com.taosdata.jdbc.ws.entity;
import java.util.HashMap;
import java.util.Map;
/**
* request type
*/
public enum Action {
CONN("conn", ConnectResp.class),
QUERY("query", QueryResp.class),
FETCH("fetch", FetchResp.class),
FETCH_JSON("fetch_json", FetchJsonResp.class),
// fetch_block's class is meaningless
FETCH_BLOCK("fetch_block", Response.class),
;
private final String action;
private final Class<? extends Response> clazz;
Action(String action, Class<? extends Response> clazz) {
this.action = action;
this.clazz = clazz;
}
public String getAction() {
return action;
}
public Class<? extends Response> getResponseClazz() {
return clazz;
}
private static final Map<String, Action> actions = new HashMap<>();
static {
for (Action value : Action.values()) {
actions.put(value.action, value);
IdUtil.init(value.action);
}
}
public static Action of(String action) {
if (null == action || action.equals("")) {
return null;
}
return actions.get(action);
}
}
package com.taosdata.jdbc.ws.entity;
/**
* response message info
*/
public enum Code {
SUCCESS(0, "success"),
;
private final int code;
private final String message;
Code(int code, String message) {
this.code = code;
this.message = message;
}
public int getCode() {
return code;
}
public String getMessage() {
return message;
}
public static Code of(int code) {
for (Code value : Code.values()) {
if (value.code == code) {
return value;
}
}
return null;
}
}
package com.taosdata.jdbc.ws.entity;
/**
* connection result pojo
*/
public class ConnectResp extends Response {
private int code;
private String message;
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
package com.taosdata.jdbc.ws.entity;
public class FetchBlockResp {
}
package com.taosdata.jdbc.ws.entity;
public class FetchJsonResp extends Response{
private long id;
private Object[][] data;
public Object[][] getData() {
return data;
}
public void setData(Object[][] data) {
this.data = data;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
}
package com.taosdata.jdbc.ws.entity;
/**
* fetch result pojo
*/
public class FetchResp extends Response{
private int code;
private String message;
private long id;
private boolean completed;
private int[] lengths;
private int rows;
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public boolean isCompleted() {
return completed;
}
public void setCompleted(boolean completed) {
this.completed = completed;
}
public int[] getLengths() {
return lengths;
}
public void setLengths(int[] lengths) {
this.lengths = lengths;
}
public int getRows() {
return rows;
}
public void setRows(int rows) {
this.rows = rows;
}
}
\ No newline at end of file
package com.taosdata.jdbc.ws.entity;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* generate id for request
*/
public class IdUtil {
private static final Map<String, AtomicLong> ids = new HashMap<>();
public static long getId(String action) {
return ids.get(action).incrementAndGet();
}
public static void init(String action) {
ids.put(action, new AtomicLong(0));
}
}
package com.taosdata.jdbc.ws.entity;
import com.alibaba.fastjson.annotation.JSONField;
/**
* query result pojo
*/
public class QueryResp extends Response {
private int code;
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
private String message;
private long id;
@JSONField(name = "is_update")
private boolean isUpdate;
@JSONField(name = "affected_rows")
private int affectedRows;
@JSONField(name = "fields_count")
private int fieldsCount;
@JSONField(name = "fields_names")
private String[] fieldsNames;
@JSONField(name = "fields_types")
private int[] fieldsTypes;
@JSONField(name = "fields_lengths")
private int[] fieldsLengths;
private int precision;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public boolean isUpdate() {
return isUpdate;
}
public void setUpdate(boolean update) {
isUpdate = update;
}
public int getAffectedRows() {
return affectedRows;
}
public void setAffectedRows(int affectedRows) {
this.affectedRows = affectedRows;
}
public int getFieldsCount() {
return fieldsCount;
}
public void setFieldsCount(int fieldsCount) {
this.fieldsCount = fieldsCount;
}
public String[] getFieldsNames() {
return fieldsNames;
}
public void setFieldsNames(String[] fieldsNames) {
this.fieldsNames = fieldsNames;
}
public int[] getFieldsTypes() {
return fieldsTypes;
}
public void setFieldsTypes(int[] fieldsTypes) {
this.fieldsTypes = fieldsTypes;
}
public int[] getFieldsLengths() {
return fieldsLengths;
}
public void setFieldsLengths(int[] fieldsLengths) {
this.fieldsLengths = fieldsLengths;
}
public int getPrecision() {
return precision;
}
public void setPrecision(int precision) {
this.precision = precision;
}
}
package com.taosdata.jdbc.ws.entity;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.annotation.JSONField;
/**
* send to taosadapter
*/
public class Request {
private String action;
private Payload args;
public Request(String action, Payload args) {
this.action = action;
this.args = args;
}
public String id() {
return action + "_" + args.getReqId();
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public Payload getArgs() {
return args;
}
public void setArgs(Payload args) {
this.args = args;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
public static Request generateConnect(String user, String password, String db) {
long reqId = IdUtil.getId(Action.CONN.getAction());
ConnectReq connectReq = new ConnectReq(reqId, user, password, db);
return new Request(Action.CONN.getAction(), connectReq);
}
public static Request generateQuery(String sql) {
long reqId = IdUtil.getId(Action.QUERY.getAction());
QueryReq queryReq = new QueryReq(reqId, sql);
return new Request(Action.QUERY.getAction(), queryReq);
}
public static Request generateFetch(long id) {
long reqId = IdUtil.getId(Action.FETCH.getAction());
FetchReq fetchReq = new FetchReq(reqId, id);
return new Request(Action.FETCH.getAction(), fetchReq);
}
public static Request generateFetchJson(long id) {
long reqId = IdUtil.getId(Action.FETCH_JSON.getAction());
FetchReq fetchReq = new FetchReq(reqId, id);
return new Request(Action.FETCH_JSON.getAction(), fetchReq);
}
public static Request generateFetchBlock(long id) {
long reqId = IdUtil.getId(Action.FETCH_BLOCK.getAction());
FetchReq fetchReq = new FetchReq(reqId, id);
return new Request(Action.FETCH_BLOCK.getAction(), fetchReq);
}
}
class Payload {
@JSONField(name = "req_id")
private final long reqId;
public Payload(long reqId) {
this.reqId = reqId;
}
public long getReqId() {
return reqId;
}
}
class ConnectReq extends Payload {
private String user;
private String password;
private String db;
public ConnectReq(long reqId, String user, String password, String db) {
super(reqId);
this.user = user;
this.password = password;
this.db = db;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getDb() {
return db;
}
public void setDb(String db) {
this.db = db;
}
}
class QueryReq extends Payload {
private String sql;
public QueryReq(long reqId, String sql) {
super(reqId);
this.sql = sql;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
}
class FetchReq extends Payload {
private long id;
public FetchReq(long reqId, long id) {
super(reqId);
this.id = id;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
}
\ No newline at end of file
package com.taosdata.jdbc.ws.entity;
import com.alibaba.fastjson.annotation.JSONField;
/**
* return from taosadapter
*/
public class Response {
private String action;
@JSONField(name = "req_id")
private long reqId;
public String id() {
return action + "_" + reqId;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public long getReqId() {
return reqId;
}
public void setReqId(long reqId) {
this.reqId = reqId;
}
}
\ No newline at end of file
package com.taosdata.jdbc.ws;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.annotation.CatalogRunner;
import com.taosdata.jdbc.annotation.Description;
import com.taosdata.jdbc.annotation.TestTarget;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.sql.*;
import java.util.Properties;
/**
* You need to start taosadapter before testing this method
*/
@Ignore
@RunWith(CatalogRunner.class)
@TestTarget(alias = "test connection with server", author = "huolibo",version = "2.0.37")
public class WSConnectionTest {
private static final String host = "192.168.1.98";
private static final int port = 6041;
private Connection connection;
@Test
@Description("normal test with websocket server")
public void normalConection() throws SQLException {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/test?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties);
}
@Test
@Description("url has no db")
public void withoutDBConection() throws SQLException {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/?user=root&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties);
}
@Test
@Description("user and password in property")
public void propertyUserPassConection() throws SQLException {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, "root");
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD,"taosdata");
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties);
}
@Test
// @Test(expected = SQLException.class)
@Description("wrong password or user")
public void wrongUserOrPasswordConection() throws SQLException {
String url = "jdbc:TAOS-RS://" + host + ":" + port + "/test?user=abc&password=taosdata";
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
connection = DriverManager.getConnection(url, properties);
}
}
......@@ -276,6 +276,11 @@
"which": "^2.0.1"
}
},
"data-uri-to-buffer": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/data-uri-to-buffer/-/data-uri-to-buffer-4.0.0.tgz",
"integrity": "sha512-Vr3mLBA8qWmcuschSLAOogKgQ/Jwxulv3RNE4FXnYWRGujzrRWQI4m12fQqRkwX06C0KanhLr4hK+GydchZsaA=="
},
"debug": {
"version": "4.3.2",
"resolved": "https://registry.nlark.com/debug/download/debug-4.3.2.tgz?cache=0&other_urls=https%3A%2F%2Fregistry.nlark.com%2Fdebug%2Fdownload%2Fdebug-4.3.2.tgz",
......@@ -549,6 +554,15 @@
"integrity": "sha1-PYpcZog6FqMMqGQ+hR8Zuqd5eRc=",
"dev": true
},
"fetch-blob": {
"version": "3.1.4",
"resolved": "https://registry.npmjs.org/fetch-blob/-/fetch-blob-3.1.4.tgz",
"integrity": "sha512-Eq5Xv5+VlSrYWEqKrusxY1C3Hm/hjeAsCGVG3ft7pZahlUAChpGZT/Ms1WmSLnEAisEXszjzu/s+ce6HZB2VHA==",
"requires": {
"node-domexception": "^1.0.0",
"web-streams-polyfill": "^3.0.3"
}
},
"file-entry-cache": {
"version": "6.0.1",
"resolved": "https://registry.npm.taobao.org/file-entry-cache/download/file-entry-cache-6.0.1.tgz?cache=0&sync_timestamp=1613794546707&other_urls=https%3A%2F%2Fregistry.npm.taobao.org%2Ffile-entry-cache%2Fdownload%2Ffile-entry-cache-6.0.1.tgz",
......@@ -580,6 +594,14 @@
"integrity": "sha1-C+4AUBiusmDQo6865ljdATbsG5k=",
"dev": true
},
"formdata-polyfill": {
"version": "4.0.10",
"resolved": "https://registry.npmjs.org/formdata-polyfill/-/formdata-polyfill-4.0.10.tgz",
"integrity": "sha512-buewHzMvYL29jdeQTVILecSaZKnt/RJWjoZCF5OW60Z67/GmSLBkOFM7qh1PI3zFNtJbaZL5eQu1vLfazOwj4g==",
"requires": {
"fetch-blob": "^3.1.2"
}
},
"fs.realpath": {
"version": "1.0.0",
"resolved": "https://registry.nlark.com/fs.realpath/download/fs.realpath-1.0.0.tgz",
......@@ -968,10 +990,20 @@
"integrity": "sha1-Sr6/7tdUHywnrPspvbvRXI1bpPc=",
"dev": true
},
"node-domexception": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/node-domexception/-/node-domexception-1.0.0.tgz",
"integrity": "sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ=="
},
"node-fetch": {
"version": "2.6.2",
"resolved": "https://registry.nlark.com/node-fetch/download/node-fetch-2.6.2.tgz?cache=0&sync_timestamp=1630935314150&other_urls=https%3A%2F%2Fregistry.nlark.com%2Fnode-fetch%2Fdownload%2Fnode-fetch-2.6.2.tgz",
"integrity": "sha1-mGmWgYtzeF5HsZZcw06wk6HUZNA="
"version": "3.1.1",
"resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-3.1.1.tgz",
"integrity": "sha512-SMk+vKgU77PYotRdWzqZGTZeuFKlsJ0hu4KPviQKkfY+N3vn2MIzr0rvpnYpR8MtB3IEuhlEcuOLbGvLRlA+yg==",
"requires": {
"data-uri-to-buffer": "^4.0.0",
"fetch-blob": "^3.1.3",
"formdata-polyfill": "^4.0.10"
}
},
"object-inspect": {
"version": "1.11.0",
......@@ -1331,6 +1363,11 @@
"integrity": "sha1-LeGWGMZtwkfc+2+ZM4A12CRaLO4=",
"dev": true
},
"web-streams-polyfill": {
"version": "3.2.0",
"resolved": "https://registry.npmjs.org/web-streams-polyfill/-/web-streams-polyfill-3.2.0.tgz",
"integrity": "sha512-EqPmREeOzttaLRm5HS7io98goBgZ7IVz79aDvqjD0kYXLtFZTc0T/U6wHTPKyIjb+MdN7DFIIX6hgdBEpWmfPA=="
},
"which": {
"version": "2.0.2",
"resolved": "https://registry.npm.taobao.org/which/download/which-2.0.2.tgz",
......
......@@ -18,6 +18,6 @@
"assert": "^2.0.0"
},
"dependencies": {
"node-fetch": "^2.x"
"node-fetch": "^3.x"
}
}
Subproject commit a6531c1528b5b50fb234e2519106feae91cc8d38
Subproject commit 18916a1719fdfcefe1ed1d4ce0049f36c3ac4796
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"telnet_tcp_port": 6046,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_pool_size": 20,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"prepared_rand": 10,
"chinese": "no",
"insert_interval": 0,
"num_of_records_per_req": 10,
"databases": [{
"dbinfo": {
"name": "db",
"drop": "yes",
"replica": 1,
"days": 10,
"cache": 16,
"blocks": 8,
"precision": "ms",
"keep": 36500,
"minRows": 100,
"maxRows": 4096,
"comp":2,
"walLevel":1,
"cachelast":0,
"quorum":1,
"fsync":3000,
"update": 0
},
"super_tables": [{
"name": "stb1",
"child_table_exists":"no",
"childtable_count": 8,
"childtable_prefix": "stb1_",
"escape_character": "no",
"auto_create_table": "no",
"batch_create_tbl_num": 10,
"data_source": "rand",
"insert_mode": "sml-rest",
"line_protocol": "telnet",
"tcp_transfer": "yes",
"childtable_limit": 0,
"childtable_offset": 0,
"insert_rows": 20,
"insert_interval": 0,
"interlace_rows": 0,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "now",
"sample_file": "",
"use_sample_ts": "no",
"tags_file": "",
"columns": [{"type": "INT"}],
"tags": [{"type": "INT"}, {"type": "BIGINT"}, {"type": "FLOAT"}, {"type": "DOUBLE"}, {"type": "SMALLINT"}, {"type": "TINYINT"}, {"type": "BOOL"}, {"type": "NCHAR","len": 17, "count":1}, {"type": "UINT"}, {"type": "UBIGINT"}, {"type": "UTINYINT"}, {"type": "USMALLINT"}, {"type": "BINARY", "len": 19, "count":1}]
},{
"name": "stb2",
"child_table_exists":"no",
"childtable_count": 8,
"childtable_prefix": "stb2_",
"escape_character": "no",
"auto_create_table": "no",
"batch_create_tbl_num": 10,
"data_source": "rand",
"insert_mode": "sml-rest",
"line_protocol": "telnet",
"tcp_transfer": "yes",
"childtable_limit": 0,
"childtable_offset": 0,
"insert_rows": 20,
"insert_interval": 0,
"interlace_rows": 5,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "now",
"sample_file": "",
"use_sample_ts": "no",
"tags_file": "",
"columns": [{"type": "INT"}],
"tags": [{"type": "INT"}, {"type": "BIGINT"}, {"type": "FLOAT"}, {"type": "DOUBLE"}, {"type": "SMALLINT"}, {"type": "TINYINT"}, {"type": "BOOL"}, {"type": "NCHAR","len": 17, "count":1}, {"type": "UINT"}, {"type": "UBIGINT"}, {"type": "UTINYINT"}, {"type": "USMALLINT"}, {"type": "BINARY", "len": 19, "count":1}]
}]
}]
}
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def caseDescription(self):
'''
[TD-11510] taosBenchmark test cases
'''
return
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
cmd = "taosBenchmark -f ./5-taos-tools/taosbenchmark/json/sml_telnet_tcp.json"
tdLog.info("%s" % cmd)
os.system("%s" % cmd)
tdSql.execute("reset query cache")
tdSql.query("select count(tbname) from opentsdb_telnet.stb1")
tdSql.checkData(0, 0, 8)
tdSql.query("select count(*) from opentsdb_telnet.stb1")
tdSql.checkData(0, 0, 160)
tdSql.query("select count(tbname) from opentsdb_telnet.stb2")
tdSql.checkData(0, 0, 8)
tdSql.query("select count(*) from opentsdb_telnet.stb2")
tdSql.checkData(0, 0, 160)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
......@@ -21,4 +21,7 @@ IF (TD_DARWIN)
TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread ${LINK_LUA})
ADD_EXECUTABLE(epoll epoll.c)
TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread ${LINK_LUA})
ADD_EXECUTABLE(parameter-binding parameter-binding.c)
TARGET_LINK_LIBRARIES(parameter-binding taos)
ENDIF ()
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <taos.h>
#include <time.h>
bool isPrint = true;
void one_batch_one_table_1(TAOS *conn, long totalRows, long batchRows);
void one_batch_one_table_2(TAOS *conn, long totalRows, long batchRows);
void one_batch_one_table_3(TAOS *conn, long totalRows, long batchRows);
void one_batch_one_table_4(TAOS *conn, long totalRows, long batchRows);
void one_batch_one_table_5(TAOS *conn, long totalRows, long batchRows);
void one_batch_one_table_6(TAOS *conn, long totalRows, long batchRows);
void one_batch_one_table_7(TAOS *conn, long totalRows, long batchRows);
void one_batch_multi_table_1(TAOS *conn, long totalRows, long batchRows, int tables);
void one_batch_multi_table_2(TAOS *conn, long totalRows, long batchRows, int tables);
void one_batch_multi_table_3(TAOS *conn, long totalRows, long batchRows, int tables);
void execute(TAOS *conn, char *sql);
void prepare_normal_table(TAOS *conn);
void prepare_super_and_sub_table(TAOS *conn, int subTables);
void prepare_super_table(TAOS *conn, int subTables);
int64_t getCurrentTimeMill();
TAOS_STMT *A(TAOS *);
void B(TAOS_STMT *stmt, char sql[]);
void C(TAOS_STMT *stmt, char sql[]);
void D(TAOS_STMT *stmt, char sql[], int tag);
void E(TAOS_STMT *stmt);
void F(TAOS_STMT *stmt, int64_t ts_start);
void G1(TAOS_STMT *stmt, int64_t ts_start, int rows);
void G2(TAOS_STMT *stmt, int rows);
void H(TAOS_STMT *stmt, int64_t ts_start, int rows);
void I(TAOS_STMT *stmt);
void J(TAOS_STMT *stmt);
void L(TAOS_STMT *stmt);
int main() {
char host[] = "192.168.56.105";
srand(time(NULL));
// connect
TAOS *conn = taos_connect(host, "root", "taosdata", NULL, 0);
if (conn == NULL) {
printf("failed to connect to:%s, reason:%s\n", host, "null taos");
exit(-1);
}
execute(conn, "drop database if exists test");
execute(conn, "create database if not exists test");
execute(conn, "use test");
long totalRows = 1000000;
long batchRows = 32767;
int tables = 10;
prepare_super_table(conn, 1);
// A -> B -> D -> [F -> I]... -> J -> L
// one_batch_one_table_1(conn, totalRows, batchRows);
// A -> B -> [D -> [F -> I]... -> J]... -> L
// one_batch_one_table_2(conn, totalRows, batchRows);
// A -> B -> D -> [F... -> I -> J]... -> L
// one_batch_one_table_3(conn, totalRows, batchRows);
// A -> B -> D -> [H -> I -> J]... -> L
// one_batch_one_table_4(conn, totalRows, batchRows);
// A -> B -> [D -> H -> I -> J]... -> L
// one_batch_one_table_5(conn, totalRows, batchRows);
// A -> B -> [D -> H -> I -> J]... -> L
// one_batch_one_table_6(conn, totalRows, batchRows);
// A -> B -> [D -> H -> I -> J]... -> L
// one_batch_one_table_7(conn, totalRows, batchRows);
// A -> B -> [D -> [F -> I]... -> J]... -> L
// one_batch_multi_table_1(conn, totalRows, batchRows, tables);
// A -> B -> [D -> H -> I -> J]... -> L
// one_batch_multi_table_2(conn, totalRows, batchRows, tables);
// A -> B -> [D -> G1 -> G2 -> I -> J]... -> L
one_batch_multi_table_3(conn, totalRows, batchRows, tables);
// close
taos_close(conn);
taos_cleanup();
exit(0);
}
// A -> B -> D -> [F -> I]... -> J -> L
void one_batch_one_table_1(TAOS *conn, long totalRows, long batchRows) {
// given
time_t current;
time(&current);
current -= totalRows;
int64_t start = getCurrentTimeMill();
// when
TAOS_STMT *stmt = A(conn);
B(stmt, "insert into ? using weather tags(?) (ts, f1) values(?, ?)");
D(stmt, "t1", 1);
for (int i = 1; i <= totalRows; ++i) {
F(stmt, (current + i - 1) * 1000);
I(stmt);
if (i % batchRows == 0 || i == totalRows) {
J(stmt);
}
}
L(stmt);
int64_t end = getCurrentTimeMill();
printf("totalRows: %ld, batchRows: %ld, time cost: %lld ms\n", totalRows, batchRows, (end - start));
}
// A -> B -> D -> [F -> I]... -> J -> L
void one_batch_one_table_2(TAOS *conn, long totalRows, long batchRows) {
// given
time_t current;
time(&current);
current -= totalRows;
int64_t start = getCurrentTimeMill();
// when
TAOS_STMT *stmt = A(conn);
B(stmt, "insert into ? using weather tags(?) (ts, f1) values(?, ?)");
for (int i = 1; i <= totalRows; ++i) {
if (i % batchRows == 1) {
D(stmt, "t1", 1);
}
F(stmt, (current + i - 1) * 1000);
I(stmt);
if (i % batchRows == 0 || i == totalRows) {
J(stmt);
}
}
L(stmt);
int64_t end = getCurrentTimeMill();
printf("totalRows: %ld, batchRows: %ld, time cost: %lld ms\n", totalRows, batchRows, (end - start));
}
void one_batch_one_table_3(TAOS *conn, long totalRows, long batchRows) {
// given
time_t current;
time(&current);
current -= totalRows;
int64_t start = getCurrentTimeMill();
// when
TAOS_STMT *stmt = A(conn);
B(stmt, "insert into ? using weather tags(?) (ts, f1) values(?, ?)");
D(stmt, "t1", 1);
for (int i = 1; i <= totalRows; ++i) {
F(stmt, (current + i - 1) * 1000);
if (i % batchRows == 0 || i == totalRows) {
I(stmt);
J(stmt);
}
}
L(stmt);
int64_t end = getCurrentTimeMill();
printf("totalRows: %ld, batchRows: %ld, time cost: %lld ms\n", totalRows, batchRows, (end - start));
}
void one_batch_one_table_4(TAOS *conn, long totalRows, long batchRows) {
// given
time_t current;
time(&current);
current -= totalRows;
int64_t start = getCurrentTimeMill();
// when
TAOS_STMT *stmt = A(conn);
B(stmt, "insert into ? using weather tags(?) values(?,?)");
D(stmt, "t1", 1);
for (int i = 1; i <= totalRows; i += batchRows) {
int rows = (i + batchRows) > totalRows ? (totalRows + 1 - i) : batchRows;
H(stmt, (current + i) * 1000, rows);
I(stmt);
J(stmt);
}
L(stmt);
int64_t end = getCurrentTimeMill();
printf("totalRows: %ld, batchRows: %ld, time cost: %lld ms\n", totalRows, batchRows, (end - start));
}
void one_batch_one_table_5(TAOS *conn, long totalRows, long batchRows) {
// given
time_t current;
time(&current);
current -= totalRows;
int64_t start = getCurrentTimeMill();
// when
TAOS_STMT *stmt = A(conn);
B(stmt, "insert into ? using weather tags(?) values(?,?)");
for (int i = 1; i <= totalRows; i += batchRows) {
D(stmt, "t1", 1);
int rows = (i + batchRows) > totalRows ? (totalRows + 1 - i) : batchRows;
H(stmt, (current + i) * 1000, rows);
I(stmt);
J(stmt);
}
L(stmt);
int64_t end = getCurrentTimeMill();
printf("totalRows: %ld, batchRows: %ld, time cost: %lld ms\n", totalRows, batchRows, (end - start));
}
void one_batch_one_table_6(TAOS *conn, long totalRows, long batchRows) {
// given
time_t current;
time(&current);
current -= totalRows;
int64_t start = getCurrentTimeMill();
// when
TAOS_STMT *stmt = A(conn);
B(stmt, "insert into ? using weather tags(?) values(?,?)");
D(stmt, "t1", 1);
for (int i = 1; i <= totalRows; i += batchRows) {
int rows = (i + batchRows) > totalRows ? (totalRows + 1 - i) : batchRows;
G1(stmt, (current + i) * 1000, rows);
G2(stmt, rows);
I(stmt);
J(stmt);
}
L(stmt);
int64_t end = getCurrentTimeMill();
printf("totalRows: %ld, batchRows: %ld, time cost: %lld ms\n", totalRows, batchRows, (end - start));
}
void one_batch_one_table_7(TAOS *conn, long totalRows, long batchRows) {
// given
time_t current;
time(&current);
current -= totalRows;
int64_t start = getCurrentTimeMill();
// when
TAOS_STMT *stmt = A(conn);
B(stmt, "insert into ? using weather tags(?) values(?,?)");
for (int i = 1; i <= totalRows; i += batchRows) {
if (i % batchRows == 1) {
D(stmt, "t1", 1);
}
int rows = (i + batchRows) > totalRows ? (totalRows + 1 - i) : batchRows;
G1(stmt, (current + i) * 1000, rows);
G2(stmt, rows);
I(stmt);
J(stmt);
}
L(stmt);
int64_t end = getCurrentTimeMill();
printf("totalRows: %ld, batchRows: %ld, time cost: %lld ms\n", totalRows, batchRows, (end - start));
}
void one_batch_multi_table_1(TAOS *conn, long totalRows, long batchRows, int tables) {
// given
time_t current;
time(&current);
long eachTable = (totalRows - 1) / tables + 1;
current -= eachTable;
int64_t start = getCurrentTimeMill();
// when
TAOS_STMT *stmt = A(conn);
B(stmt, "insert into ? using weather tags(?) values(?, ?)");
for (int tbIndex = 0; tbIndex < tables; ++tbIndex) {
char tbname[10];
sprintf(tbname, "t%d", tbIndex);
eachTable = ((tbIndex + 1) * eachTable > totalRows) ? (totalRows - tbIndex * eachTable) : eachTable;
for (int rowIndex = 1; rowIndex <= eachTable; ++rowIndex) {
if (rowIndex % batchRows == 1) {
D(stmt, tbname, tbIndex);
if (isPrint)
printf("\ntbIndex: %d, table_rows: %ld, rowIndex: %d, batch_rows: %ld\n", tbIndex, eachTable, rowIndex,
batchRows);
}
F(stmt, (current + rowIndex - 1) * 1000);
I(stmt);
if (rowIndex % batchRows == 0 || rowIndex == eachTable) {
J(stmt);
}
}
}
L(stmt);
int64_t end = getCurrentTimeMill();
printf("totalRows: %ld, batchRows: %ld, table: %d, eachTableRows: %ld, time cost: %lld ms\n", totalRows, batchRows,
tables, eachTable, (end - start));
}
void one_batch_multi_table_2(TAOS *conn, long totalRows, long batchRows, int tables) {
// given
time_t current;
time(&current);
long eachTable = (totalRows - 1) / tables + 1;
current -= eachTable;
int64_t start = getCurrentTimeMill();
// when
TAOS_STMT *stmt = A(conn);
B(stmt, "insert into ? using weather tags(?) values(?,?)");
for (int tbIndex = 0; tbIndex < tables; ++tbIndex) {
char tbname[10];
sprintf(tbname, "t%d", tbIndex);
eachTable = ((tbIndex + 1) * eachTable > totalRows) ? (totalRows - tbIndex * eachTable) : eachTable;
for (int rowIndex = 1; rowIndex <= eachTable; rowIndex += batchRows) {
int rows = (rowIndex + batchRows) > eachTable ? (eachTable + 1 - rowIndex) : batchRows;
if (rowIndex % batchRows == 1) {
D(stmt, tbname, tbIndex);
if (isPrint)
printf("\ntbIndex: %d, table_rows: %ld, rowIndex: %d, batch_rows: %d\n", tbIndex, eachTable, rowIndex, rows);
}
H(stmt, (current + rowIndex) * 1000, rows);
I(stmt);
J(stmt);
}
}
L(stmt);
int64_t end = getCurrentTimeMill();
printf("totalRows: %ld, batchRows: %ld, table: %d, eachTableRows: %ld, time cost: %lld ms\n", totalRows, batchRows,
tables, eachTable, (end - start));
}
void one_batch_multi_table_3(TAOS *conn, long totalRows, long batchRows, int tables) {
// given
time_t current;
time(&current);
long eachTable = (totalRows - 1) / tables + 1;
current -= eachTable;
int64_t start = getCurrentTimeMill();
// when
TAOS_STMT *stmt = A(conn);
B(stmt, "insert into ? using weather tags(?) values(?, ?)");
for (int tbIndex = 0; tbIndex < tables; ++tbIndex) {
char tbname[10];
sprintf(tbname, "t%d", tbIndex);
eachTable = ((tbIndex + 1) * eachTable > totalRows) ? (totalRows - tbIndex * eachTable) : eachTable;
for (int rowIndex = 1; rowIndex <= eachTable; rowIndex += batchRows) {
int rows = (rowIndex + batchRows) > eachTable ? (eachTable + 1 - rowIndex) : batchRows;
if (rowIndex % batchRows == 1) {
D(stmt, tbname, tbIndex);
if (isPrint)
printf("\ntbIndex: %d, table_rows: %ld, rowIndex: %d, batch_rows: %d\n", tbIndex, eachTable, rowIndex, rows);
}
G1(stmt, (current + rowIndex) * 1000, rows);
G2(stmt, rows);
I(stmt);
J(stmt);
}
}
L(stmt);
int64_t end = getCurrentTimeMill();
printf("totalRows: %ld, batchRows: %ld, table: %d, eachTableRows: %ld, time cost: %lld ms\n", totalRows, batchRows,
tables, eachTable, (end - start));
}
void execute(TAOS *conn, char *sql) {
TAOS_RES *psql = taos_query(conn, sql);
if (psql == NULL) {
printf("failed to execute: %s, reason: %s\n", sql, taos_errstr(psql));
taos_free_result(psql);
taos_close(conn);
exit(-1);
}
taos_free_result(psql);
}
TAOS_STMT *A(TAOS *conn) {
if (isPrint) printf("A -> ");
return taos_stmt_init(conn);
}
void B(TAOS_STMT *stmt, char sql[]) {
if (isPrint) printf("B -> ");
int code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0) {
printf("failed to prepare stmt: %s, reason: %s\n", sql, taos_stmt_errstr(stmt));
return;
}
}
void C(TAOS_STMT *stmt, char tbname[]) {
if (isPrint) printf("C -> ");
int code = taos_stmt_set_tbname(stmt, tbname);
if (code != 0) printf("failed to set_tbname_tags, reason: %s\n", taos_stmt_errstr(stmt));
}
void D(TAOS_STMT *stmt, char tbname[], int tag) {
if (isPrint) printf("D -> ");
TAOS_BIND tags[1];
tags[0].buffer_type = TSDB_DATA_TYPE_INT;
int tag_value = tag >= 0 ? tag : rand() % 100;
tags[0].buffer = &tag_value;
tags[0].buffer_length = sizeof(tag_value);
tags[0].length = &tags[0].buffer_length;
tags[0].is_null = NULL;
// set_tbname_tags
int code = taos_stmt_set_tbname_tags(stmt, tbname, tags);
if (code != 0) printf("failed to set_tbname_tags, reason: %s\n", taos_stmt_errstr(stmt));
}
void E(TAOS_STMT *stmt) {
// TODO
}
void F(TAOS_STMT *stmt, int64_t ts) {
if (isPrint) printf("F -> ");
TAOS_BIND params[2];
// timestamp
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer = &ts;
params[0].buffer_length = sizeof(ts);
params[0].length = &params[0].buffer_length;
params[0].is_null = NULL;
// int
int value = rand() % 100;
params[1].buffer_type = TSDB_DATA_TYPE_INT;
params[1].buffer = &value;
params[1].buffer_length = sizeof(value);
params[1].length = &params[1].buffer_length;
params[1].is_null = NULL;
// bind
int code = taos_stmt_bind_param(stmt, params);
if (0 != code) printf("failed to bind_param, reason: %s\n", taos_stmt_errstr(stmt));
}
void H(TAOS_STMT *stmt, int64_t ts_start, int rows) {
if (isPrint) printf("H -> ");
TAOS_MULTI_BIND params[2];
// timestamp
int64_t ts[rows];
for (int i = 0; i < rows; ++i) {
ts[i] = ts_start + i * 1000;
}
params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
params[0].buffer = ts;
params[0].buffer_length = sizeof(ts[0]);
params[0].length = malloc(sizeof(int64_t) * rows);
char is_null[rows];
for (int i = 0; i < rows; i++) {
is_null[i] = 0;
}
params[0].is_null = is_null;
params[0].num = rows;
// f1
int32_t values[rows];
for (int i = 0; i < rows; ++i) {
values[i] = rand() % 100;
}
params[1].buffer_type = TSDB_DATA_TYPE_INT;
params[1].buffer = values;
params[1].buffer_length = sizeof(int32_t);
params[1].length = malloc(sizeof(int32_t) * rows);
params[1].is_null = is_null;
params[1].num = rows;
int code = taos_stmt_bind_param_batch(stmt, params);
if (code != 0) {
printf("failed to bind_param_batch, reason: %s\n", taos_stmt_errstr(stmt));
return;
}
}
void G1(TAOS_STMT *stmt, int64_t ts_start, int rows) {
if (isPrint) printf("G1 -> ");
// timestamp
TAOS_MULTI_BIND param0[1];
int64_t ts[rows];
for (int i = 0; i < rows; ++i) {
ts[i] = ts_start + i * 1000;
}
param0[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
param0[0].buffer = ts;
param0[0].buffer_length = sizeof(ts[0]);
param0[0].length = malloc(sizeof(int64_t) * rows);
char is_null[rows];
for (int i = 0; i < rows; i++) {
is_null[i] = 0;
}
param0[0].is_null = is_null;
param0[0].num = rows;
int code = taos_stmt_bind_single_param_batch(stmt, param0, 0);
if (code != 0) {
printf("failed to bind_single_param_batch, reason: %s\n", taos_stmt_errstr(stmt));
return;
}
}
void G2(TAOS_STMT *stmt, int rows) {
if (isPrint) printf("G2 -> ");
// f1
TAOS_MULTI_BIND param1[1];
int32_t values[rows];
for (int i = 0; i < rows; ++i) {
values[i] = rand() % 100;
}
param1[0].buffer_type = TSDB_DATA_TYPE_INT;
param1[0].buffer = values;
param1[0].buffer_length = sizeof(int32_t);
param1[0].length = malloc(sizeof(int32_t) * rows);
char is_null[rows];
for (int i = 0; i < rows; i++) {
is_null[i] = 0;
}
param1[0].is_null = is_null;
param1[0].num = rows;
int code = taos_stmt_bind_single_param_batch(stmt, param1, 1);
if (code != 0) {
printf("failed to bind_single_param_batch, reason: %s\n", taos_stmt_errstr(stmt));
return;
}
}
void I(TAOS_STMT *stmt) {
if (isPrint) printf("I -> ");
int code = taos_stmt_add_batch(stmt);
if (code != 0) {
printf("failed to add_batch, reason: %s\n", taos_stmt_errstr(stmt));
return;
}
}
void J(TAOS_STMT *stmt) {
if (isPrint) printf("J -> ");
int code = taos_stmt_execute(stmt);
if (code != 0) {
printf("failed to execute, reason: %s\n", taos_stmt_errstr(stmt));
return;
}
}
void L(TAOS_STMT *stmt) {
if (isPrint) printf("L\n");
taos_stmt_close(stmt);
}
void prepare_super_table(TAOS *conn, int subTables) {
char sql[100] = "drop table weather";
execute(conn, sql);
if (isPrint) printf("sql>>> %s\n", sql);
sprintf(sql, "create table weather(ts timestamp, f1 int) tags(t1 int)");
execute(conn, sql);
if (isPrint) printf("sql>>> %s\n", sql);
for (int i = 0; i < subTables; i++) {
sprintf(sql, "drop table t%d", i);
if (isPrint) printf("sql>>> %s\n", sql);
execute(conn, sql);
}
}
void prepare_normal_table(TAOS *conn) {
execute(conn, "drop table weather");
execute(conn, "create table weather(ts timestamp, f1 int) tags(t1 int)");
}
void prepare_super_and_sub_table(TAOS *conn, int subTables) {
execute(conn, "drop table weather");
execute(conn, "create table weather(ts timestamp, f1 int) tags(t1 int)");
for (int i = 0; i < subTables; i++) {
char sql[100];
sprintf(sql, "drop table t%d", i);
if (isPrint) printf("sql>>> %s\n", sql);
execute(conn, sql);
sprintf(sql, "create table t%d using weather tags(%d)", i, i);
if (isPrint) printf("sql>>> %s\n", sql);
execute(conn, sql);
}
}
int64_t getCurrentTimeMill() {
struct timeval tv;
gettimeofday(&tv, NULL);
return ((unsigned long long)tv.tv_sec * 1000 + (unsigned long long)tv.tv_usec / 1000);
}
\ No newline at end of file
......@@ -287,8 +287,9 @@ class TDDnode:
print(cmd)
taosadapterCmd = "nohup %s > /dev/null 2>&1 & " % (
taosadapterCmd = "nohup %s --opentsdb_telnet.enable=true > /dev/null 2>&1 & " % (
taosadapterBinPath)
tdLog.info(taosadapterCmd)
if os.system(taosadapterCmd) != 0:
tdLog.exit(taosadapterCmd)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册