未验证 提交 dde16ee4 编写于 作者: C Cary Xu 提交者: GitHub

Merge branch 'develop' into hotfix/TD-10824-D

IF (TD_LINUX) IF (TD_LINUX)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh") SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")") INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND chmod 777 ${TD_MAKE_INSTALL_SH})") INSTALL(CODE "execute_process(COMMAND bash ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR} Linux ${TD_VER_NUMBER})")
INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR} Linux ${TD_VER_NUMBER})")
ELSEIF (TD_WINDOWS) ELSEIF (TD_WINDOWS)
IF (TD_POWER) IF (TD_POWER)
SET(CMAKE_INSTALL_PREFIX C:/PowerDB) SET(CMAKE_INSTALL_PREFIX C:/PowerDB)
...@@ -41,6 +40,5 @@ ELSEIF (TD_WINDOWS) ...@@ -41,6 +40,5 @@ ELSEIF (TD_WINDOWS)
ELSEIF (TD_DARWIN) ELSEIF (TD_DARWIN)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh") SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")") INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
INSTALL(CODE "execute_process(COMMAND chmod 777 ${TD_MAKE_INSTALL_SH})") INSTALL(CODE "execute_process(COMMAND bash ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR} Darwin ${TD_VER_NUMBER})")
INSTALL(CODE "execute_process(COMMAND ${TD_MAKE_INSTALL_SH} ${TD_COMMUNITY_DIR} ${PROJECT_BINARY_DIR} Darwin ${TD_VER_NUMBER})")
ENDIF () ENDIF ()
Subproject commit edad746514b2a53a8cf6061c93b98b52a5388692 Subproject commit 9ae793ad2d567eb11d10627b65698f612542e988
...@@ -112,6 +112,7 @@ ...@@ -112,6 +112,7 @@
<include>**/*Test.java</include> <include>**/*Test.java</include>
</includes> </includes>
<excludes> <excludes>
<exclude>**/HttpClientPoolUtilTest.java</exclude>
<exclude>**/AppMemoryLeakTest.java</exclude> <exclude>**/AppMemoryLeakTest.java</exclude>
<exclude>**/JDBCTypeAndTypeCompareTest.java</exclude> <exclude>**/JDBCTypeAndTypeCompareTest.java</exclude>
<exclude>**/ConnectMultiTaosdByRestfulWithDifferentTokenTest.java</exclude> <exclude>**/ConnectMultiTaosdByRestfulWithDifferentTokenTest.java</exclude>
......
...@@ -32,6 +32,7 @@ public class TSDBErrorNumbers { ...@@ -32,6 +32,7 @@ public class TSDBErrorNumbers {
public static final int ERROR_USER_IS_REQUIRED = 0x2319; // user is required public static final int ERROR_USER_IS_REQUIRED = 0x2319; // user is required
public static final int ERROR_PASSWORD_IS_REQUIRED = 0x231a; // password is required public static final int ERROR_PASSWORD_IS_REQUIRED = 0x231a; // password is required
public static final int ERROR_INVALID_JSON_FORMAT = 0x231b; public static final int ERROR_INVALID_JSON_FORMAT = 0x231b;
public static final int ERROR_HTTP_ENTITY_IS_NULL = 0x231c; //http entity is null
public static final int ERROR_UNKNOWN = 0x2350; //unknown error public static final int ERROR_UNKNOWN = 0x2350; //unknown error
...@@ -74,6 +75,7 @@ public class TSDBErrorNumbers { ...@@ -74,6 +75,7 @@ public class TSDBErrorNumbers {
errorNumbers.add(ERROR_USER_IS_REQUIRED); errorNumbers.add(ERROR_USER_IS_REQUIRED);
errorNumbers.add(ERROR_PASSWORD_IS_REQUIRED); errorNumbers.add(ERROR_PASSWORD_IS_REQUIRED);
errorNumbers.add(ERROR_INVALID_JSON_FORMAT); errorNumbers.add(ERROR_INVALID_JSON_FORMAT);
errorNumbers.add(ERROR_HTTP_ENTITY_IS_NULL);
errorNumbers.add(ERROR_RESTFul_Client_Protocol_Exception); errorNumbers.add(ERROR_RESTFul_Client_Protocol_Exception);
......
...@@ -5,6 +5,7 @@ import com.taosdata.jdbc.TSDBErrorNumbers; ...@@ -5,6 +5,7 @@ import com.taosdata.jdbc.TSDBErrorNumbers;
import org.apache.http.HeaderElement; import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator; import org.apache.http.HeaderElementIterator;
import org.apache.http.HttpEntity; import org.apache.http.HttpEntity;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.ClientProtocolException; import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig; import org.apache.http.client.config.RequestConfig;
...@@ -30,12 +31,12 @@ import java.sql.SQLException; ...@@ -30,12 +31,12 @@ import java.sql.SQLException;
public class HttpClientPoolUtil { public class HttpClientPoolUtil {
private static final String DEFAULT_CONTENT_TYPE = "application/json"; private static final String DEFAULT_CONTENT_TYPE = "application/json";
private static final int DEFAULT_MAX_TOTAL = 200;
private static final int DEFAULT_MAX_PER_ROUTE = 20;
private static final int DEFAULT_TIME_OUT = 15000;
private static final int DEFAULT_HTTP_KEEP_TIME = 15000;
private static final int DEFAULT_MAX_RETRY_COUNT = 5; private static final int DEFAULT_MAX_RETRY_COUNT = 5;
private static final int DEFAULT_MAX_TOTAL = 50;
private static final int DEFAULT_MAX_PER_ROUTE = 5;
private static final int DEFAULT_HTTP_KEEP_TIME = -1;
private static final ConnectionKeepAliveStrategy DEFAULT_KEEP_ALIVE_STRATEGY = (response, context) -> { private static final ConnectionKeepAliveStrategy DEFAULT_KEEP_ALIVE_STRATEGY = (response, context) -> {
HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE)); HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) { while (it.hasNext()) {
...@@ -52,29 +53,19 @@ public class HttpClientPoolUtil { ...@@ -52,29 +53,19 @@ public class HttpClientPoolUtil {
return DEFAULT_HTTP_KEEP_TIME * 1000; return DEFAULT_HTTP_KEEP_TIME * 1000;
}; };
private static final HttpRequestRetryHandler retryHandler = (exception, executionCount, httpContext) -> {
if (executionCount >= DEFAULT_MAX_RETRY_COUNT)
// do not retry if over max retry count
return false;
if (exception instanceof InterruptedIOException)
// timeout
return false;
if (exception instanceof UnknownHostException)
// unknown host
return false;
if (exception instanceof SSLException)
// SSL handshake exception
return false;
return true;
};
private static CloseableHttpClient httpClient; private static CloseableHttpClient httpClient;
static { static {
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL); connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL);
connectionManager.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE); connectionManager.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE);
httpClient = HttpClients.custom().setKeepAliveStrategy(DEFAULT_KEEP_ALIVE_STRATEGY).setConnectionManager(connectionManager).setRetryHandler(retryHandler).build();
httpClient = HttpClients.custom()
.setKeepAliveStrategy(DEFAULT_KEEP_ALIVE_STRATEGY)
.setConnectionManager(connectionManager)
.setRetryHandler((exception, executionCount, httpContext) -> executionCount < DEFAULT_MAX_RETRY_COUNT)
.build();
} }
/*** execute GET request ***/ /*** execute GET request ***/
...@@ -118,9 +109,10 @@ public class HttpClientPoolUtil { ...@@ -118,9 +109,10 @@ public class HttpClientPoolUtil {
HttpContext context = HttpClientContext.create(); HttpContext context = HttpClientContext.create();
CloseableHttpResponse httpResponse = httpClient.execute(method, context); CloseableHttpResponse httpResponse = httpClient.execute(method, context);
httpEntity = httpResponse.getEntity(); httpEntity = httpResponse.getEntity();
if (httpEntity != null) { if (httpEntity == null) {
responseBody = EntityUtils.toString(httpEntity, StandardCharsets.UTF_8); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_HTTP_ENTITY_IS_NULL, "httpEntity is null, sql: " + data);
} }
responseBody = EntityUtils.toString(httpEntity, StandardCharsets.UTF_8);
} catch (ClientProtocolException e) { } catch (ClientProtocolException e) {
e.printStackTrace(); e.printStackTrace();
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESTFul_Client_Protocol_Exception, e.getMessage()); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESTFul_Client_Protocol_Exception, e.getMessage());
...@@ -139,9 +131,6 @@ public class HttpClientPoolUtil { ...@@ -139,9 +131,6 @@ public class HttpClientPoolUtil {
private static HttpRequestBase getRequest(String uri, String methodName) { private static HttpRequestBase getRequest(String uri, String methodName) {
HttpRequestBase method; HttpRequestBase method;
RequestConfig requestConfig = RequestConfig.custom() RequestConfig requestConfig = RequestConfig.custom()
.setSocketTimeout(DEFAULT_TIME_OUT * 1000)
.setConnectTimeout(DEFAULT_TIME_OUT * 1000)
.setConnectionRequestTimeout(DEFAULT_TIME_OUT * 1000)
.setExpectContinueEnabled(false) .setExpectContinueEnabled(false)
.build(); .build();
if (HttpPut.METHOD_NAME.equalsIgnoreCase(methodName)) { if (HttpPut.METHOD_NAME.equalsIgnoreCase(methodName)) {
......
package com.taosdata.jdbc.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBError;
import org.junit.Test;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class HttpClientPoolUtilTest {
String user = "root";
String password = "taosdata";
String host = "127.0.0.1";
String dbname = "log";
@Test
public void test() {
// given
List<Thread> threads = IntStream.range(0, 4000).mapToObj(i -> new Thread(() -> {
useDB();
// try {
// TimeUnit.SECONDS.sleep(10);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
})).collect(Collectors.toList());
threads.forEach(Thread::start);
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void useDB() {
try {
user = URLEncoder.encode(user, StandardCharsets.UTF_8.displayName());
password = URLEncoder.encode(password, StandardCharsets.UTF_8.displayName());
String loginUrl = "http://" + host + ":" + 6041 + "/rest/login/" + user + "/" + password + "";
String result = HttpClientPoolUtil.execute(loginUrl);
JSONObject jsonResult = JSON.parseObject(result);
String status = jsonResult.getString("status");
String token = jsonResult.getString("desc");
if (!status.equals("succ")) {
throw new SQLException(jsonResult.getString("desc"));
}
String url = "http://" + host + ":6041/rest/sql";
String sql = "use " + dbname;
result = HttpClientPoolUtil.execute(url, sql, token);
JSONObject resultJson = JSON.parseObject(result);
if (resultJson.getString("status").equals("error")) {
throw TSDBError.createSQLException(resultJson.getInteger("code"), resultJson.getString("desc"));
}
} catch (UnsupportedEncodingException | SQLException e) {
e.printStackTrace();
}
}
}
\ No newline at end of file
...@@ -154,12 +154,13 @@ typedef struct HttpContext { ...@@ -154,12 +154,13 @@ typedef struct HttpContext {
char db[/*TSDB_ACCT_ID_LEN + */TSDB_DB_NAME_LEN]; char db[/*TSDB_ACCT_ID_LEN + */TSDB_DB_NAME_LEN];
TAOS * taos; TAOS * taos;
void * ppContext; void * ppContext;
pthread_mutex_t ctxMutex;
HttpSession *session; HttpSession *session;
z_stream gzipStream; z_stream gzipStream;
HttpParser *parser; HttpParser *parser;
HttpSqlCmd singleCmd; HttpSqlCmd singleCmd;
HttpSqlCmds *multiCmds; HttpSqlCmds *multiCmds;
JsonBuf * jsonBuf; JsonBuf *jsonBuf;
HttpEncodeMethod *encodeMethod; HttpEncodeMethod *encodeMethod;
HttpDecodeMethod *decodeMethod; HttpDecodeMethod *decodeMethod;
struct HttpThread *pThread; struct HttpThread *pThread;
......
...@@ -67,6 +67,8 @@ static void httpDestroyContext(void *data) { ...@@ -67,6 +67,8 @@ static void httpDestroyContext(void *data) {
pContext->parser = NULL; pContext->parser = NULL;
} }
pthread_mutex_destroy(&pContext->ctxMutex);
tfree(pContext); tfree(pContext);
} }
...@@ -128,6 +130,8 @@ HttpContext *httpCreateContext(SOCKET fd) { ...@@ -128,6 +130,8 @@ HttpContext *httpCreateContext(SOCKET fd) {
// set the ref to 0 // set the ref to 0
taosCacheRelease(tsHttpServer.contextCache, (void **)&ppContext, false); taosCacheRelease(tsHttpServer.contextCache, (void **)&ppContext, false);
pthread_mutex_init(&pContext->ctxMutex, NULL);
return pContext; return pContext;
} }
......
...@@ -45,15 +45,14 @@ bool httpProcessData(HttpContext* pContext) { ...@@ -45,15 +45,14 @@ bool httpProcessData(HttpContext* pContext) {
httpTrace("context:%p, fd:%d, process options request", pContext, pContext->fd); httpTrace("context:%p, fd:%d, process options request", pContext, pContext->fd);
httpSendOptionResp(pContext, "process options request success"); httpSendOptionResp(pContext, "process options request success");
} else { } else {
if (!httpDecodeRequest(pContext)) { pthread_mutex_lock(&pContext->ctxMutex);
/*
* httpCloseContextByApp has been called when parsing the error if (httpDecodeRequest(pContext)) {
*/
// httpCloseContextByApp(pContext);
} else {
httpClearParser(pContext->parser); httpClearParser(pContext->parser);
httpProcessRequest(pContext); httpProcessRequest(pContext);
} }
pthread_mutex_unlock(&pContext->ctxMutex);
} }
return true; return true;
......
...@@ -406,7 +406,14 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int32_t code) { ...@@ -406,7 +406,14 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int32_t code) {
if (pContext->session == NULL) { if (pContext->session == NULL) {
httpSendErrorResp(pContext, TSDB_CODE_HTTP_SESSION_FULL); httpSendErrorResp(pContext, TSDB_CODE_HTTP_SESSION_FULL);
} else { } else {
// httpProcessRequestCb called by another thread and a subsequent thread calls this
// function again, if this function called by httpProcessRequestCb executes memset
// just before the subsequent thread executes *Cmd function, nativSql will be NULL
pthread_mutex_lock(&pContext->ctxMutex);
httpExecCmd(pContext); httpExecCmd(pContext);
pthread_mutex_unlock(&pContext->ctxMutex);
} }
} }
......
...@@ -54,27 +54,36 @@ class TDTestCase: ...@@ -54,27 +54,36 @@ class TDTestCase:
binPath = buildPath + "/build/bin/" binPath = buildPath + "/build/bin/"
if(threadID == 0): if(threadID == 0):
os.system("%staosdemo -y -t %d -n %d -b INT,INT,INT,INT -m t" % print("%staosdemo -y -t %d -n %d -b INT,INT,INT,INT" %
(binPath, self.numberOfTables, self.numberOfRecords))
os.system("%staosdemo -y -t %d -n %d -b INT,INT,INT,INT" %
(binPath, self.numberOfTables, self.numberOfRecords)) (binPath, self.numberOfTables, self.numberOfRecords))
if(threadID == 1): if(threadID == 1):
time.sleep(2) time.sleep(2)
print("use test") print("use test")
while True: max_try = 100
count = 0
while (count < max_try):
try: try:
tdSql.execute("use test") tdSql.execute("use test")
break break
except Exception as e: except Exception as e:
tdLog.info("use database test failed") tdLog.info("use database test failed")
time.sleep(1) time.sleep(2)
count += 1
print("try %d times" % count)
continue continue
# check if all the tables have heen created # check if all the tables have heen created
while True: count = 0
while (count < max_try):
try: try:
tdSql.query("show tables") tdSql.query("show tables")
except Exception as e: except Exception as e:
tdLog.info("show tables test failed") tdLog.info("show tables test failed")
time.sleep(1) time.sleep(2)
count += 1
print("try %d times" % count)
continue continue
rows = tdSql.queryRows rows = tdSql.queryRows
...@@ -83,13 +92,17 @@ class TDTestCase: ...@@ -83,13 +92,17 @@ class TDTestCase:
break break
time.sleep(1) time.sleep(1)
# check if there are any records in the last created table # check if there are any records in the last created table
while True: count = 0
while (count < max_try):
print("query started") print("query started")
print("try %d times" % count)
try: try:
tdSql.query("select * from test.t7") tdSql.query("select * from test.d7")
except Exception as e: except Exception as e:
tdLog.info("select * test failed") tdLog.info("select * test failed")
time.sleep(2) time.sleep(2)
count += 1
print("try %d times" % count)
continue continue
rows = tdSql.queryRows rows = tdSql.queryRows
...@@ -100,8 +113,8 @@ class TDTestCase: ...@@ -100,8 +113,8 @@ class TDTestCase:
print("alter table test.meters add column c10 int") print("alter table test.meters add column c10 int")
tdSql.execute("alter table test.meters add column c10 int") tdSql.execute("alter table test.meters add column c10 int")
print("insert into test.t7 values (now, 1, 2, 3, 4, 0)") print("insert into test.d7 values (now, 1, 2, 3, 4, 0)")
tdSql.execute("insert into test.t7 values (now, 1, 2, 3, 4, 0)") tdSql.execute("insert into test.d7 values (now, 1, 2, 3, 4, 0)")
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册