未验证 提交 d5ac6029 编写于 作者: Z Zhiyu Yang 提交者: GitHub

[TS-572]<fix>: fix noHttpResponseException in httpClient (#8583)

上级 5278870c
......@@ -5,7 +5,7 @@
......@@ -3,7 +3,7 @@
......@@ -58,6 +58,13 @@
......@@ -70,6 +77,18 @@
......@@ -135,7 +135,6 @@ public class TSDBDriver extends AbstractDriver {
return new TSDBConnection(props, this.dbMetaData);
} catch (SQLWarning sqlWarning) {
return new TSDBConnection(props, this.dbMetaData);
} catch (SQLException sqlEx) {
throw sqlEx;
......@@ -50,9 +50,13 @@ public class RestfulDriver extends AbstractDriver {
String password = URLEncoder.encode(props.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD), StandardCharsets.UTF_8.displayName());
loginUrl = "http://" + props.getProperty(TSDBDriver.PROPERTY_KEY_HOST) + ":" + props.getProperty(TSDBDriver.PROPERTY_KEY_PORT) + "/rest/login/" + user + "/" + password + "";
} catch (UnsupportedEncodingException e) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_VARIABLE, "unsupported UTF-8 concoding, user: " + props.getProperty(TSDBDriver.PROPERTY_KEY_USER) + ", password: " + props.getProperty(TSDBDriver.PROPERTY_KEY_PASSWORD));
int poolSize = Integer.valueOf(props.getProperty("httpPoolSize", HttpClientPoolUtil.DEFAULT_MAX_PER_ROUTE));
boolean keepAlive = Boolean.valueOf(props.getProperty("httpKeepAlive", HttpClientPoolUtil.DEFAULT_HTTP_KEEP_ALIVE));
HttpClientPoolUtil.init(poolSize, keepAlive);
String result = HttpClientPoolUtil.execute(loginUrl);
JSONObject jsonResult = JSON.parseObject(result);
String status = jsonResult.getString("status");
......@@ -9,6 +9,7 @@ import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.*;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
......@@ -22,15 +23,17 @@ import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
public class HttpClientPoolUtil {
private static final String DEFAULT_CONTENT_TYPE = "application/json";
private static final int DEFAULT_MAX_RETRY_COUNT = 5;
private static final int DEFAULT_MAX_TOTAL = 50;
private static final int DEFAULT_MAX_PER_ROUTE = 5;
public static final String DEFAULT_HTTP_KEEP_ALIVE = "true";
public static final String DEFAULT_MAX_PER_ROUTE = "20";
private static final int DEFAULT_HTTP_KEEP_TIME = -1;
private static String isKeepAlive;
private static final ConnectionKeepAliveStrategy DEFAULT_KEEP_ALIVE_STRATEGY = (response, context) -> {
HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
......@@ -48,37 +51,41 @@ public class HttpClientPoolUtil {
private static final CloseableHttpClient httpClient;
private static CloseableHttpClient httpClient;
static {
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
httpClient = HttpClients.custom()
.setRetryHandler((exception, executionCount, httpContext) -> executionCount < DEFAULT_MAX_RETRY_COUNT)
public static void init(Integer connPoolSize, boolean keepAlive) {
if (httpClient == null) {
synchronized (HttpClientPoolUtil.class) {
if (httpClient == null) {
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(connPoolSize * 10);
httpClient = HttpClients.custom()
.setRetryHandler((exception, executionCount, httpContext) -> executionCount < DEFAULT_MAX_RETRY_COUNT)
/*** execute GET request ***/
public static String execute(String uri) throws SQLException {
HttpEntity httpEntity = null;
String responseBody = "";
try {
HttpRequestBase method = getRequest(uri, HttpGet.METHOD_NAME);
HttpContext context = HttpClientContext.create();
CloseableHttpResponse httpResponse = httpClient.execute(method, context);
HttpRequestBase method = getRequest(uri, HttpGet.METHOD_NAME);
HttpContext context = HttpClientContext.create();
try (CloseableHttpResponse httpResponse = httpClient.execute(method, context)) {
httpEntity = httpResponse.getEntity();
if (httpEntity != null) {
responseBody = EntityUtils.toString(httpEntity, StandardCharsets.UTF_8);
} catch (ClientProtocolException e) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESTFul_Client_Protocol_Exception, e.getMessage());
} catch (IOException exception) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESTFul_Client_IOException, exception.getMessage());
} finally {
if (httpEntity != null) {
......@@ -88,30 +95,27 @@ public class HttpClientPoolUtil {
return responseBody;
/*** execute POST request ***/
public static String execute(String uri, String data, String token) throws SQLException {
HttpEntityEnclosingRequestBase method = (HttpEntityEnclosingRequestBase) getRequest(uri, HttpPost.METHOD_NAME);
method.setHeader(HTTP.CONTENT_TYPE, "text/plain");
method.setHeader(HTTP.CONN_DIRECTIVE, isKeepAlive);
method.setHeader("Authorization", "Taosd " + token);
method.setEntity(new StringEntity(data, StandardCharsets.UTF_8));
HttpContext context = HttpClientContext.create();
HttpEntity httpEntity = null;
String responseBody = "";
try {
HttpEntityEnclosingRequestBase method = (HttpEntityEnclosingRequestBase) getRequest(uri, HttpPost.METHOD_NAME);
method.setHeader(HTTP.CONTENT_TYPE, "text/plain");
method.setHeader("Authorization", "Taosd " + token);
method.setEntity(new StringEntity(data, StandardCharsets.UTF_8));
HttpContext context = HttpClientContext.create();
CloseableHttpResponse httpResponse = httpClient.execute(method, context);
try (CloseableHttpResponse httpResponse = httpClient.execute(method, context)) {
httpEntity = httpResponse.getEntity();
if (httpEntity == null) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_HTTP_ENTITY_IS_NULL, "httpEntity is null, sql: " + data);
responseBody = EntityUtils.toString(httpEntity, StandardCharsets.UTF_8);
} catch (ClientProtocolException e) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESTFul_Client_Protocol_Exception, e.getMessage());
} catch (IOException exception) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESTFul_Client_IOException, exception.getMessage());
} finally {
if (httpEntity != null) {
......@@ -142,4 +146,12 @@ public class HttpClientPoolUtil {
return method;
public static void reset() {
synchronized (HttpClientPoolUtil.class) {
ClientConnectionManager cm = httpClient.getConnectionManager();
cm.closeIdleConnections(100, TimeUnit.MILLISECONDS);
\ No newline at end of file
......@@ -16,7 +16,6 @@ public class TaosInfo implements TaosInfoMBean {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName("TaosInfoMBean:name=TaosInfo");
server.registerMBean(TaosInfo.getInstance(), name);
} catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
......@@ -49,14 +49,9 @@ public class Utils {
try {
return parseMicroSecTimestamp(timeStampStr);
} catch (DateTimeParseException ee) {
try {
return parseNanoSecTimestamp(timeStampStr);
} catch (DateTimeParseException eee) {
return parseNanoSecTimestamp(timeStampStr);
return null;
private static LocalDateTime parseMilliSecTimestamp(String timeStampStr) throws DateTimeParseException {
package com.taosdata.jdbc.rs;
import org.junit.Assert;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class HttpKeepAliveTest {
private static final String host = "";
public void test() throws SQLException {
int multi = 4000;
AtomicInteger exceptionCount = new AtomicInteger();
Properties props = new Properties();
props.setProperty("httpKeepAlive", "false");
props.setProperty("httpPoolSize", "20");
Connection connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata", props);
List<Thread> threads = IntStream.range(0, multi).mapToObj(i -> new Thread(
() -> {
try (Statement stmt = connection.createStatement()) {
stmt.execute("insert into log.tb_not_exists values(now, 1)");
stmt.execute("select last(*) from log.dn");
} catch (SQLException throwables) {
for (Thread thread : threads) {
try {
} catch (InterruptedException e) {
Assert.assertEquals(multi, exceptionCount.get());
......@@ -6,8 +6,7 @@ import java.sql.*;
public class WasNullTest {
// private static final String host = "";
private static final String host = "master";
private static final String host = "";
private Connection conn;
......@@ -2,7 +2,6 @@ package com.taosdata.jdbc.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.TSDBError;
import org.junit.Test;
import java.io.UnsupportedEncodingException;
......@@ -18,13 +17,21 @@ public class HttpClientPoolUtilTest {
String user = "root";
String password = "taosdata";
String host = "";
String dbname = "log";
// String host = "master";
public void test() {
public void useLog() {
// given
List<Thread> threads = IntStream.range(0, 4000).mapToObj(i -> new Thread(() -> {
int multi = 10;
// when
List<Thread> threads = IntStream.range(0, multi).mapToObj(i -> new Thread(() -> {
try {
String token = login(multi);
executeOneSql("use log", token);
} catch (SQLException | UnsupportedEncodingException e) {
......@@ -38,31 +45,60 @@ public class HttpClientPoolUtilTest {
private void useDB() {
try {
user = URLEncoder.encode(user, StandardCharsets.UTF_8.displayName());
password = URLEncoder.encode(password, StandardCharsets.UTF_8.displayName());
String loginUrl = "http://" + host + ":" + 6041 + "/rest/login/" + user + "/" + password + "";
String result = HttpClientPoolUtil.execute(loginUrl);
JSONObject jsonResult = JSON.parseObject(result);
String status = jsonResult.getString("status");
String token = jsonResult.getString("desc");
if (!status.equals("succ")) {
throw new SQLException(jsonResult.getString("desc"));
public void tableNotExist() {
// given
int multi = 20;
// when
List<Thread> threads = IntStream.range(0, multi * 25).mapToObj(i -> new Thread(() -> {
try {
// String token = "/KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04";
String token = login(multi);
executeOneSql("insert into log.tb_not_exist values(now, 1)", token);
executeOneSql("select last(*) from log.dn", token);
} catch (SQLException | UnsupportedEncodingException e) {
String url = "http://" + host + ":6041/rest/sql";
String sql = "use " + dbname;
result = HttpClientPoolUtil.execute(url, sql, token);
JSONObject resultJson = JSON.parseObject(result);
if (resultJson.getString("status").equals("error")) {
throw TSDBError.createSQLException(resultJson.getInteger("code"), resultJson.getString("desc"));
for (Thread thread : threads) {
try {
} catch (InterruptedException e) {
} catch (UnsupportedEncodingException | SQLException e) {
private String login(int connPoolSize) throws SQLException, UnsupportedEncodingException {
user = URLEncoder.encode(user, StandardCharsets.UTF_8.displayName());
password = URLEncoder.encode(password, StandardCharsets.UTF_8.displayName());
String loginUrl = "http://" + host + ":" + 6041 + "/rest/login/" + user + "/" + password + "";
HttpClientPoolUtil.init(connPoolSize, false);
String result = HttpClientPoolUtil.execute(loginUrl);
JSONObject jsonResult = JSON.parseObject(result);
String status = jsonResult.getString("status");
String token = jsonResult.getString("desc");
if (!status.equals("succ")) {
throw new SQLException(jsonResult.getString("desc"));
return token;
private boolean executeOneSql(String sql, String token) throws SQLException {
String url = "http://" + host + ":6041/rest/sql";
String result = HttpClientPoolUtil.execute(url, sql, token);
JSONObject resultJson = JSON.parseObject(result);
if (resultJson.getString("status").equals("error")) {
// HttpClientPoolUtil.reset();
// throw TSDBError.createSQLException(resultJson.getInteger("code"), resultJson.getString("desc"));
return false;
return true;
\ No newline at end of file
org.apache.commons.logging.simplelog.dateTimeFormat=yyyy-mm-dd hh:MM:ss.SSS
\ No newline at end of file
......@@ -18,7 +18,7 @@
<!-- druid -->
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册