未验证 提交 02ef66aa 编写于 作者: Z Zhiyu Yang 提交者: GitHub

[TS-498]<fix>: alter keep alive time in restful pool (#8503)

* [TS-498]<test>: add a test case for HttpClientPoolUtil

* [TS-498]<fix>: fixed 'no sql input' bug

* throw exception when httpEntity is null

* [TS-498]<fix>: alter Keep Alive Time in RESTful pool
Co-authored-by: Nxywang <xywang@taosdata.com>
上级 d326cfc6
...@@ -112,6 +112,7 @@ ...@@ -112,6 +112,7 @@
<include>**/*Test.java</include> <include>**/*Test.java</include>
</includes> </includes>
<excludes> <excludes>
<exclude>**/HttpClientPoolUtilTest.java</exclude>
<exclude>**/AppMemoryLeakTest.java</exclude> <exclude>**/AppMemoryLeakTest.java</exclude>
<exclude>**/JDBCTypeAndTypeCompareTest.java</exclude> <exclude>**/JDBCTypeAndTypeCompareTest.java</exclude>
<exclude>**/ConnectMultiTaosdByRestfulWithDifferentTokenTest.java</exclude> <exclude>**/ConnectMultiTaosdByRestfulWithDifferentTokenTest.java</exclude>
......
...@@ -32,6 +32,7 @@ public class TSDBErrorNumbers { ...@@ -32,6 +32,7 @@ public class TSDBErrorNumbers {
public static final int ERROR_USER_IS_REQUIRED = 0x2319; // user is required public static final int ERROR_USER_IS_REQUIRED = 0x2319; // user is required
public static final int ERROR_PASSWORD_IS_REQUIRED = 0x231a; // password is required public static final int ERROR_PASSWORD_IS_REQUIRED = 0x231a; // password is required
public static final int ERROR_INVALID_JSON_FORMAT = 0x231b; public static final int ERROR_INVALID_JSON_FORMAT = 0x231b;
public static final int ERROR_HTTP_ENTITY_IS_NULL = 0x231c; //http entity is null
public static final int ERROR_UNKNOWN = 0x2350; //unknown error public static final int ERROR_UNKNOWN = 0x2350; //unknown error
...@@ -74,6 +75,7 @@ public class TSDBErrorNumbers { ...@@ -74,6 +75,7 @@ public class TSDBErrorNumbers {
errorNumbers.add(ERROR_USER_IS_REQUIRED); errorNumbers.add(ERROR_USER_IS_REQUIRED);
errorNumbers.add(ERROR_PASSWORD_IS_REQUIRED); errorNumbers.add(ERROR_PASSWORD_IS_REQUIRED);
errorNumbers.add(ERROR_INVALID_JSON_FORMAT); errorNumbers.add(ERROR_INVALID_JSON_FORMAT);
errorNumbers.add(ERROR_HTTP_ENTITY_IS_NULL);
errorNumbers.add(ERROR_RESTFul_Client_Protocol_Exception); errorNumbers.add(ERROR_RESTFul_Client_Protocol_Exception);
......
...@@ -5,6 +5,7 @@ import com.taosdata.jdbc.TSDBErrorNumbers; ...@@ -5,6 +5,7 @@ import com.taosdata.jdbc.TSDBErrorNumbers;
import org.apache.http.HeaderElement; import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator; import org.apache.http.HeaderElementIterator;
import org.apache.http.HttpEntity; import org.apache.http.HttpEntity;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.ClientProtocolException; import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig; import org.apache.http.client.config.RequestConfig;
...@@ -30,12 +31,12 @@ import java.sql.SQLException; ...@@ -30,12 +31,12 @@ import java.sql.SQLException;
public class HttpClientPoolUtil { public class HttpClientPoolUtil {
private static final String DEFAULT_CONTENT_TYPE = "application/json"; private static final String DEFAULT_CONTENT_TYPE = "application/json";
private static final int DEFAULT_MAX_TOTAL = 200;
private static final int DEFAULT_MAX_PER_ROUTE = 20;
private static final int DEFAULT_TIME_OUT = 15000;
private static final int DEFAULT_HTTP_KEEP_TIME = 15000;
private static final int DEFAULT_MAX_RETRY_COUNT = 5; private static final int DEFAULT_MAX_RETRY_COUNT = 5;
private static final int DEFAULT_MAX_TOTAL = 50;
private static final int DEFAULT_MAX_PER_ROUTE = 5;
private static final int DEFAULT_HTTP_KEEP_TIME = -1;
private static final ConnectionKeepAliveStrategy DEFAULT_KEEP_ALIVE_STRATEGY = (response, context) -> { private static final ConnectionKeepAliveStrategy DEFAULT_KEEP_ALIVE_STRATEGY = (response, context) -> {
HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE)); HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) { while (it.hasNext()) {
...@@ -52,29 +53,19 @@ public class HttpClientPoolUtil { ...@@ -52,29 +53,19 @@ public class HttpClientPoolUtil {
return DEFAULT_HTTP_KEEP_TIME * 1000; return DEFAULT_HTTP_KEEP_TIME * 1000;
}; };
private static final HttpRequestRetryHandler retryHandler = (exception, executionCount, httpContext) -> {
if (executionCount >= DEFAULT_MAX_RETRY_COUNT)
// do not retry if over max retry count
return false;
if (exception instanceof InterruptedIOException)
// timeout
return false;
if (exception instanceof UnknownHostException)
// unknown host
return false;
if (exception instanceof SSLException)
// SSL handshake exception
return false;
return true;
};
private static CloseableHttpClient httpClient; private static CloseableHttpClient httpClient;
static { static {
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL); connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL);
connectionManager.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE); connectionManager.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE);
httpClient = HttpClients.custom().setKeepAliveStrategy(DEFAULT_KEEP_ALIVE_STRATEGY).setConnectionManager(connectionManager).setRetryHandler(retryHandler).build();
httpClient = HttpClients.custom()
.setKeepAliveStrategy(DEFAULT_KEEP_ALIVE_STRATEGY)
.setConnectionManager(connectionManager)
.setRetryHandler((exception, executionCount, httpContext) -> executionCount < DEFAULT_MAX_RETRY_COUNT)
.build();
} }
/*** execute GET request ***/ /*** execute GET request ***/
...@@ -118,9 +109,10 @@ public class HttpClientPoolUtil { ...@@ -118,9 +109,10 @@ public class HttpClientPoolUtil {
HttpContext context = HttpClientContext.create(); HttpContext context = HttpClientContext.create();
CloseableHttpResponse httpResponse = httpClient.execute(method, context); CloseableHttpResponse httpResponse = httpClient.execute(method, context);
httpEntity = httpResponse.getEntity(); httpEntity = httpResponse.getEntity();
if (httpEntity != null) { if (httpEntity == null) {
responseBody = EntityUtils.toString(httpEntity, StandardCharsets.UTF_8); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_HTTP_ENTITY_IS_NULL, "httpEntity is null, sql: " + data);
} }
responseBody = EntityUtils.toString(httpEntity, StandardCharsets.UTF_8);
} catch (ClientProtocolException e) { } catch (ClientProtocolException e) {
e.printStackTrace(); e.printStackTrace();
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESTFul_Client_Protocol_Exception, e.getMessage()); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESTFul_Client_Protocol_Exception, e.getMessage());
...@@ -139,9 +131,6 @@ public class HttpClientPoolUtil { ...@@ -139,9 +131,6 @@ public class HttpClientPoolUtil {
private static HttpRequestBase getRequest(String uri, String methodName) { private static HttpRequestBase getRequest(String uri, String methodName) {
HttpRequestBase method; HttpRequestBase method;
RequestConfig requestConfig = RequestConfig.custom() RequestConfig requestConfig = RequestConfig.custom()
.setSocketTimeout(DEFAULT_TIME_OUT * 1000)
.setConnectTimeout(DEFAULT_TIME_OUT * 1000)
.setConnectionRequestTimeout(DEFAULT_TIME_OUT * 1000)
.setExpectContinueEnabled(false) .setExpectContinueEnabled(false)
.build(); .build();
if (HttpPut.METHOD_NAME.equalsIgnoreCase(methodName)) { if (HttpPut.METHOD_NAME.equalsIgnoreCase(methodName)) {
......
package com.taosdata.jdbc.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBError;
import org.junit.Test;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class HttpClientPoolUtilTest {
String user = "root";
String password = "taosdata";
String host = "127.0.0.1";
String dbname = "log";
@Test
public void test() {
// given
List<Thread> threads = IntStream.range(0, 4000).mapToObj(i -> new Thread(() -> {
useDB();
// try {
// TimeUnit.SECONDS.sleep(10);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
})).collect(Collectors.toList());
threads.forEach(Thread::start);
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void useDB() {
try {
user = URLEncoder.encode(user, StandardCharsets.UTF_8.displayName());
password = URLEncoder.encode(password, StandardCharsets.UTF_8.displayName());
String loginUrl = "http://" + host + ":" + 6041 + "/rest/login/" + user + "/" + password + "";
String result = HttpClientPoolUtil.execute(loginUrl);
JSONObject jsonResult = JSON.parseObject(result);
String status = jsonResult.getString("status");
String token = jsonResult.getString("desc");
if (!status.equals("succ")) {
throw new SQLException(jsonResult.getString("desc"));
}
String url = "http://" + host + ":6041/rest/sql";
String sql = "use " + dbname;
result = HttpClientPoolUtil.execute(url, sql, token);
JSONObject resultJson = JSON.parseObject(result);
if (resultJson.getString("status").equals("error")) {
throw TSDBError.createSQLException(resultJson.getInteger("code"), resultJson.getString("desc"));
}
} catch (UnsupportedEncodingException | SQLException e) {
e.printStackTrace();
}
}
}
\ No newline at end of file
...@@ -140,28 +140,29 @@ typedef enum { ...@@ -140,28 +140,29 @@ typedef enum {
} EHTTP_CONTEXT_FAILED_CAUSE; } EHTTP_CONTEXT_FAILED_CAUSE;
typedef struct HttpContext { typedef struct HttpContext {
int32_t refCount; int32_t refCount;
SOCKET fd; SOCKET fd;
uint32_t accessTimes; uint32_t accessTimes;
uint32_t lastAccessTime; uint32_t lastAccessTime;
int32_t state; int32_t state;
uint8_t reqType; uint8_t reqType;
uint8_t parsed; uint8_t parsed;
uint8_t error; uint8_t error;
char ipstr[22]; char ipstr[22];
char user[TSDB_USER_LEN]; // parsed from auth token or login message char user[TSDB_USER_LEN]; // parsed from auth token or login message
char pass[HTTP_PASSWORD_LEN]; char pass[HTTP_PASSWORD_LEN];
char db[/*TSDB_ACCT_ID_LEN + */TSDB_DB_NAME_LEN]; char db[/*TSDB_ACCT_ID_LEN + */TSDB_DB_NAME_LEN];
TAOS * taos; TAOS * taos;
void * ppContext; void * ppContext;
HttpSession *session; pthread_mutex_t ctxMutex;
z_stream gzipStream; HttpSession *session;
HttpParser *parser; z_stream gzipStream;
HttpSqlCmd singleCmd; HttpParser *parser;
HttpSqlCmds *multiCmds; HttpSqlCmd singleCmd;
JsonBuf * jsonBuf; HttpSqlCmds *multiCmds;
HttpEncodeMethod *encodeMethod; JsonBuf *jsonBuf;
HttpDecodeMethod *decodeMethod; HttpEncodeMethod *encodeMethod;
HttpDecodeMethod *decodeMethod;
struct HttpThread *pThread; struct HttpThread *pThread;
} HttpContext; } HttpContext;
......
...@@ -67,6 +67,8 @@ static void httpDestroyContext(void *data) { ...@@ -67,6 +67,8 @@ static void httpDestroyContext(void *data) {
pContext->parser = NULL; pContext->parser = NULL;
} }
pthread_mutex_destroy(&pContext->ctxMutex);
tfree(pContext); tfree(pContext);
} }
...@@ -128,6 +130,8 @@ HttpContext *httpCreateContext(SOCKET fd) { ...@@ -128,6 +130,8 @@ HttpContext *httpCreateContext(SOCKET fd) {
// set the ref to 0 // set the ref to 0
taosCacheRelease(tsHttpServer.contextCache, (void **)&ppContext, false); taosCacheRelease(tsHttpServer.contextCache, (void **)&ppContext, false);
pthread_mutex_init(&pContext->ctxMutex, NULL);
return pContext; return pContext;
} }
......
...@@ -45,15 +45,14 @@ bool httpProcessData(HttpContext* pContext) { ...@@ -45,15 +45,14 @@ bool httpProcessData(HttpContext* pContext) {
httpTrace("context:%p, fd:%d, process options request", pContext, pContext->fd); httpTrace("context:%p, fd:%d, process options request", pContext, pContext->fd);
httpSendOptionResp(pContext, "process options request success"); httpSendOptionResp(pContext, "process options request success");
} else { } else {
if (!httpDecodeRequest(pContext)) { pthread_mutex_lock(&pContext->ctxMutex);
/*
* httpCloseContextByApp has been called when parsing the error if (httpDecodeRequest(pContext)) {
*/
// httpCloseContextByApp(pContext);
} else {
httpClearParser(pContext->parser); httpClearParser(pContext->parser);
httpProcessRequest(pContext); httpProcessRequest(pContext);
} }
pthread_mutex_unlock(&pContext->ctxMutex);
} }
return true; return true;
......
...@@ -406,7 +406,14 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int32_t code) { ...@@ -406,7 +406,14 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int32_t code) {
if (pContext->session == NULL) { if (pContext->session == NULL) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_SESSION_FULL); httpSendErrorResp(pContext, TSDB_CODE_HTTP_SESSION_FULL);
} else { } else {
// httpProcessRequestCb called by another thread and a subsequent thread calls this
// function again, if this function called by httpProcessRequestCb executes memset
// just before the subsequent thread executes *Cmd function, nativSql will be NULL
pthread_mutex_lock(&pContext->ctxMutex);
httpExecCmd(pContext); httpExecCmd(pContext);
pthread_mutex_unlock(&pContext->ctxMutex);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册