diff --git a/src/connector/jdbc/pom.xml b/src/connector/jdbc/pom.xml index 810a85f8a33b3f244dab81e349b9df786ec50c21..a586879afe61b9272712a14f36c60fbd85ba80ed 100644 --- a/src/connector/jdbc/pom.xml +++ b/src/connector/jdbc/pom.xml @@ -112,6 +112,7 @@ **/*Test.java + **/HttpClientPoolUtilTest.java **/AppMemoryLeakTest.java **/JDBCTypeAndTypeCompareTest.java **/ConnectMultiTaosdByRestfulWithDifferentTokenTest.java diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBErrorNumbers.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBErrorNumbers.java index 0f4427fa20e272917df0327552efd1a80cd56b4d..1c380fed7dac2c54655830eef6f575e9c07e22af 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBErrorNumbers.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBErrorNumbers.java @@ -32,6 +32,7 @@ public class TSDBErrorNumbers { public static final int ERROR_USER_IS_REQUIRED = 0x2319; // user is required public static final int ERROR_PASSWORD_IS_REQUIRED = 0x231a; // password is required public static final int ERROR_INVALID_JSON_FORMAT = 0x231b; + public static final int ERROR_HTTP_ENTITY_IS_NULL = 0x231c; //http entity is null public static final int ERROR_UNKNOWN = 0x2350; //unknown error @@ -74,6 +75,7 @@ public class TSDBErrorNumbers { errorNumbers.add(ERROR_USER_IS_REQUIRED); errorNumbers.add(ERROR_PASSWORD_IS_REQUIRED); errorNumbers.add(ERROR_INVALID_JSON_FORMAT); + errorNumbers.add(ERROR_HTTP_ENTITY_IS_NULL); errorNumbers.add(ERROR_RESTFul_Client_Protocol_Exception); diff --git a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/HttpClientPoolUtil.java b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/HttpClientPoolUtil.java index de26ab7f1f458a4587ce15bebab3c2c1b0dbc070..99e46bc64f44f6326aec12734849cc5ef518c903 100644 --- a/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/HttpClientPoolUtil.java +++ b/src/connector/jdbc/src/main/java/com/taosdata/jdbc/utils/HttpClientPoolUtil.java @@ -5,6 +5,7 @@ import com.taosdata.jdbc.TSDBErrorNumbers; import org.apache.http.HeaderElement; import org.apache.http.HeaderElementIterator; import org.apache.http.HttpEntity; +import org.apache.http.NoHttpResponseException; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.config.RequestConfig; @@ -30,12 +31,12 @@ import java.sql.SQLException; public class HttpClientPoolUtil { private static final String DEFAULT_CONTENT_TYPE = "application/json"; - private static final int DEFAULT_MAX_TOTAL = 200; - private static final int DEFAULT_MAX_PER_ROUTE = 20; - private static final int DEFAULT_TIME_OUT = 15000; - private static final int DEFAULT_HTTP_KEEP_TIME = 15000; private static final int DEFAULT_MAX_RETRY_COUNT = 5; + private static final int DEFAULT_MAX_TOTAL = 50; + private static final int DEFAULT_MAX_PER_ROUTE = 5; + private static final int DEFAULT_HTTP_KEEP_TIME = -1; + private static final ConnectionKeepAliveStrategy DEFAULT_KEEP_ALIVE_STRATEGY = (response, context) -> { HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE)); while (it.hasNext()) { @@ -52,29 +53,19 @@ public class HttpClientPoolUtil { return DEFAULT_HTTP_KEEP_TIME * 1000; }; - private static final HttpRequestRetryHandler retryHandler = (exception, executionCount, httpContext) -> { - if (executionCount >= DEFAULT_MAX_RETRY_COUNT) - // do not retry if over max retry count - return false; - if (exception instanceof InterruptedIOException) - // timeout - return false; - if (exception instanceof UnknownHostException) - // unknown host - return false; - if (exception instanceof SSLException) - // SSL handshake exception - return false; - return true; - }; - private static CloseableHttpClient httpClient; static { + PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL); connectionManager.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE); - httpClient = HttpClients.custom().setKeepAliveStrategy(DEFAULT_KEEP_ALIVE_STRATEGY).setConnectionManager(connectionManager).setRetryHandler(retryHandler).build(); + + httpClient = HttpClients.custom() + .setKeepAliveStrategy(DEFAULT_KEEP_ALIVE_STRATEGY) + .setConnectionManager(connectionManager) + .setRetryHandler((exception, executionCount, httpContext) -> executionCount < DEFAULT_MAX_RETRY_COUNT) + .build(); } /*** execute GET request ***/ @@ -118,9 +109,10 @@ public class HttpClientPoolUtil { HttpContext context = HttpClientContext.create(); CloseableHttpResponse httpResponse = httpClient.execute(method, context); httpEntity = httpResponse.getEntity(); - if (httpEntity != null) { - responseBody = EntityUtils.toString(httpEntity, StandardCharsets.UTF_8); + if (httpEntity == null) { + throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_HTTP_ENTITY_IS_NULL, "httpEntity is null, sql: " + data); } + responseBody = EntityUtils.toString(httpEntity, StandardCharsets.UTF_8); } catch (ClientProtocolException e) { e.printStackTrace(); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESTFul_Client_Protocol_Exception, e.getMessage()); @@ -139,9 +131,6 @@ public class HttpClientPoolUtil { private static HttpRequestBase getRequest(String uri, String methodName) { HttpRequestBase method; RequestConfig requestConfig = RequestConfig.custom() - .setSocketTimeout(DEFAULT_TIME_OUT * 1000) - .setConnectTimeout(DEFAULT_TIME_OUT * 1000) - .setConnectionRequestTimeout(DEFAULT_TIME_OUT * 1000) .setExpectContinueEnabled(false) .build(); if (HttpPut.METHOD_NAME.equalsIgnoreCase(methodName)) { diff --git a/src/connector/jdbc/src/test/java/com/taosdata/jdbc/utils/HttpClientPoolUtilTest.java b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/utils/HttpClientPoolUtilTest.java new file mode 100644 index 0000000000000000000000000000000000000000..cae33f18e7a04e443092d8e696bb32be9600a435 --- /dev/null +++ b/src/connector/jdbc/src/test/java/com/taosdata/jdbc/utils/HttpClientPoolUtilTest.java @@ -0,0 +1,76 @@ +package com.taosdata.jdbc.utils; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.taosdata.jdbc.TSDBDriver; +import com.taosdata.jdbc.TSDBError; +import org.junit.Test; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class HttpClientPoolUtilTest { + + String user = "root"; + String password = "taosdata"; + String host = "127.0.0.1"; + String dbname = "log"; + + @Test + public void test() { + // given + List threads = IntStream.range(0, 4000).mapToObj(i -> new Thread(() -> { + useDB(); +// try { +// TimeUnit.SECONDS.sleep(10); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } + })).collect(Collectors.toList()); + + threads.forEach(Thread::start); + + for (Thread thread : threads) { + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + } + + private void useDB() { + try { + user = URLEncoder.encode(user, StandardCharsets.UTF_8.displayName()); + password = URLEncoder.encode(password, StandardCharsets.UTF_8.displayName()); + String loginUrl = "http://" + host + ":" + 6041 + "/rest/login/" + user + "/" + password + ""; + String result = HttpClientPoolUtil.execute(loginUrl); + JSONObject jsonResult = JSON.parseObject(result); + String status = jsonResult.getString("status"); + String token = jsonResult.getString("desc"); + if (!status.equals("succ")) { + throw new SQLException(jsonResult.getString("desc")); + } + + String url = "http://" + host + ":6041/rest/sql"; + String sql = "use " + dbname; + result = HttpClientPoolUtil.execute(url, sql, token); + + JSONObject resultJson = JSON.parseObject(result); + if (resultJson.getString("status").equals("error")) { + throw TSDBError.createSQLException(resultJson.getInteger("code"), resultJson.getString("desc")); + } + } catch (UnsupportedEncodingException | SQLException e) { + e.printStackTrace(); + } + } + + +} \ No newline at end of file diff --git a/src/plugins/http/inc/httpInt.h b/src/plugins/http/inc/httpInt.h index 6c567e23bc817957d7f376ef101f8e5ca88559e6..bf8efd2831e19480615be291d409d53207bb8f63 100644 --- a/src/plugins/http/inc/httpInt.h +++ b/src/plugins/http/inc/httpInt.h @@ -140,28 +140,29 @@ typedef enum { } EHTTP_CONTEXT_FAILED_CAUSE; typedef struct HttpContext { - int32_t refCount; - SOCKET fd; - uint32_t accessTimes; - uint32_t lastAccessTime; - int32_t state; - uint8_t reqType; - uint8_t parsed; - uint8_t error; - char ipstr[22]; - char user[TSDB_USER_LEN]; // parsed from auth token or login message - char pass[HTTP_PASSWORD_LEN]; - char db[/*TSDB_ACCT_ID_LEN + */TSDB_DB_NAME_LEN]; - TAOS * taos; - void * ppContext; - HttpSession *session; - z_stream gzipStream; - HttpParser *parser; - HttpSqlCmd singleCmd; - HttpSqlCmds *multiCmds; - JsonBuf * jsonBuf; - HttpEncodeMethod *encodeMethod; - HttpDecodeMethod *decodeMethod; + int32_t refCount; + SOCKET fd; + uint32_t accessTimes; + uint32_t lastAccessTime; + int32_t state; + uint8_t reqType; + uint8_t parsed; + uint8_t error; + char ipstr[22]; + char user[TSDB_USER_LEN]; // parsed from auth token or login message + char pass[HTTP_PASSWORD_LEN]; + char db[/*TSDB_ACCT_ID_LEN + */TSDB_DB_NAME_LEN]; + TAOS * taos; + void * ppContext; + pthread_mutex_t ctxMutex; + HttpSession *session; + z_stream gzipStream; + HttpParser *parser; + HttpSqlCmd singleCmd; + HttpSqlCmds *multiCmds; + JsonBuf *jsonBuf; + HttpEncodeMethod *encodeMethod; + HttpDecodeMethod *decodeMethod; struct HttpThread *pThread; } HttpContext; diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index 11945453c56ab7fdd1fc8b0c4f2510bbbdda1a6e..ccbcc985118b132369a1ee3895f4341e6cca6d59 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -67,6 +67,8 @@ static void httpDestroyContext(void *data) { pContext->parser = NULL; } + pthread_mutex_destroy(&pContext->ctxMutex); + tfree(pContext); } @@ -128,6 +130,8 @@ HttpContext *httpCreateContext(SOCKET fd) { // set the ref to 0 taosCacheRelease(tsHttpServer.contextCache, (void **)&ppContext, false); + pthread_mutex_init(&pContext->ctxMutex, NULL); + return pContext; } diff --git a/src/plugins/http/src/httpHandle.c b/src/plugins/http/src/httpHandle.c index 9719d93824b50064ec1cf23677c641428434592c..6f77994593ebbfc1dc2d9ce97b15a90a797dd8d5 100644 --- a/src/plugins/http/src/httpHandle.c +++ b/src/plugins/http/src/httpHandle.c @@ -45,15 +45,14 @@ bool httpProcessData(HttpContext* pContext) { httpTrace("context:%p, fd:%d, process options request", pContext, pContext->fd); httpSendOptionResp(pContext, "process options request success"); } else { - if (!httpDecodeRequest(pContext)) { - /* - * httpCloseContextByApp has been called when parsing the error - */ - // httpCloseContextByApp(pContext); - } else { + pthread_mutex_lock(&pContext->ctxMutex); + + if (httpDecodeRequest(pContext)) { httpClearParser(pContext->parser); httpProcessRequest(pContext); } + + pthread_mutex_unlock(&pContext->ctxMutex); } return true; diff --git a/src/plugins/http/src/httpSql.c b/src/plugins/http/src/httpSql.c index 602767a6563b3ca3430501c0dbcee65333f1d44b..e1b3b17347cba786fa12f5cc2fa7ab3cfb45bd54 100644 --- a/src/plugins/http/src/httpSql.c +++ b/src/plugins/http/src/httpSql.c @@ -406,7 +406,14 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int32_t code) { if (pContext->session == NULL) { httpSendErrorResp(pContext, TSDB_CODE_HTTP_SESSION_FULL); } else { + // httpProcessRequestCb called by another thread and a subsequent thread calls this + // function again, if this function called by httpProcessRequestCb executes memset + // just before the subsequent thread executes *Cmd function, nativSql will be NULL + pthread_mutex_lock(&pContext->ctxMutex); + httpExecCmd(pContext); + + pthread_mutex_unlock(&pContext->ctxMutex); } }