未验证 提交 b3f21a3e 编写于 作者: X Xiangdong Huang 提交者: GitHub

[IOTDB-832] fix reconnection failure bug in sessionPool (#1610)

* [iotdb-832] fix sessionPool logic when reconnection failed.
Co-authored-by: Nxiangdong huang <sainthxd@gmail.com>
Co-authored-by: Nqiaojialin <646274302@qq.com>
上级 ffdf15f8
......@@ -66,6 +66,7 @@ Here is a list of Status Code and related message:
|314|TSFILE_PROCESSOR_ERROR|TsFile processor related error|
|315|PATH_ILLEGAL|Illegal path|
|316|LOAD_FILE_ERROR|Meet error while loading file|
|317|STORAGE_GROUP_NOT_READY| The storage group is in recovery mode, not ready fore accepting read/write operation|
|400|EXECUTE_STATEMENT_ERROR|Execute statement error|
|401|SQL_PARSE_ERROR|Meet error while parsing SQL|
|402|GENERATE_TIME_ZONE_ERROR|Meet error while generating time zone|
......
......@@ -66,6 +66,7 @@ try {
|314|TSFILE_PROCESSOR_ERROR|TsFile处理器相关错误|
|315|PATH_ILLEGAL|路径不合法|
|316|LOAD_FILE_ERROR|加载文件错误|
|317|STORAGE_GROUP_NOT_READY| 存储组还在恢复中,还不能接受读写操作 |
|400|EXECUTE_STATEMENT_ERROR|执行语句错误|
|401|SQL_PARSE_ERROR|SQL语句分析错误|
|402|GENERATE_TIME_ZONE_ERROR|生成时区错误|
......
......@@ -47,6 +47,7 @@ import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.slf4j.Logger;
......@@ -293,7 +294,8 @@ public class StorageEngine implements IService {
}
} else {
// not finished recover, refuse the request
throw new StorageEngineException("the sg " + storageGroupName + " may not ready now, please wait and retry later");
throw new StorageEngineException("the sg " + storageGroupName + " may not ready now, please wait and retry later",
TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
}
}
return processor;
......
......@@ -43,6 +43,7 @@ public enum TSStatusCode {
TSFILE_PROCESSOR_ERROR(314),
PATH_ILLEGAL(315),
LOAD_FILE_ERROR(316),
STORAGE_GROUP_NOT_READY(317),
EXECUTE_STATEMENT_ERROR(400),
SQL_PARSE_ERROR(401),
......
......@@ -115,10 +115,12 @@ public class SessionPool {
//we have to wait for someone returns a session.
try {
this.wait(1000);
if (System.currentTimeMillis() - start > 60_000) {
long time = timeout < 60_000 ? timeout : 60_000;
if (System.currentTimeMillis() - start > time) {
logger.warn(
"the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
(System.currentTimeMillis() - start) / 1000, ip, port, user, password);
logger.warn("current occupied size {}, queue size {}, considered size {} ",occupied.size(), queue.size(), size);
if (System.currentTimeMillis() - start > timeout) {
throw new IoTDBConnectionException(
String.format("timeout to get a connection from %s:%s", ip, port));
......@@ -139,7 +141,16 @@ public class SessionPool {
logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
}
session = new Session(ip, port, user, password, fetchSize);
session.open(enableCompression);
try {
session.open(enableCompression);
} catch (IoTDBConnectionException e) {
//if exception, we will throw the exception.
//Meanwhile, we have to set size--
synchronized (this) {
size --;
}
throw e;
}
return session;
}
}
......
......@@ -99,15 +99,7 @@ public class SessionPoolTest {
public void incorrectExecuteQueryStatement() {
SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
try {
pool.insertRecord("root.sg1.d1", i, Collections.singletonList("s" + i),
Collections.singletonList(TSDataType.INT64),
Collections.singletonList((long) i));
} catch (IoTDBConnectionException | StatementExecutionException e) {
fail();
}
}
write10Data(pool, true);
//now let's query
for (int i = 0; i < 10; i++) {
final int no = i;
......@@ -143,15 +135,7 @@ public class SessionPoolTest {
private void correctQuery(SessionPool pool) {
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
try {
pool.insertRecord("root.sg1.d1", i, Collections.singletonList("s" + i),
Collections.singletonList(TSDataType.INT64),
Collections.singletonList((long) i));
} catch (IoTDBConnectionException | StatementExecutionException e) {
fail();
}
}
write10Data(pool, true);
//now let's query
for (int i = 0; i < 10; i++) {
final int no = i;
......@@ -180,15 +164,7 @@ public class SessionPoolTest {
@Test
public void tryIfTheServerIsRestart() {
SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false);
for (int i = 0; i < 10; i++) {
try {
pool.insertRecord("root.sg1.d1", i, Collections.singletonList("s" + i),
Collections.singletonList(TSDataType.INT64),
Collections.singletonList((long) i));
} catch (IoTDBConnectionException | StatementExecutionException e) {
fail();
}
}
write10Data(pool, true);
SessionDataSetWrapper wrapper = null;
try {
wrapper = pool.executeQueryStatement("select * from root.sg1.d1 where time > 1");
......@@ -212,15 +188,7 @@ public class SessionPoolTest {
@Test
public void tryIfTheServerIsRestartButDataIsGotten() {
SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 60000, false);
for (int i = 0; i < 10; i++) {
try {
pool.insertRecord("root.sg1.d1", i, Collections.singletonList("s" + i),
Collections.singletonList(TSDataType.INT64),
Collections.singletonList((long) i));
} catch (IoTDBConnectionException | StatementExecutionException e) {
fail();
}
}
write10Data(pool, true);
assertEquals(1, pool.currentAvailableSize());
SessionDataSetWrapper wrapper = null;
try {
......@@ -240,4 +208,33 @@ public class SessionPoolTest {
pool.close();
}
@Test
public void restart() throws Exception {
SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false);
write10Data(pool, true);
//stop the server.
EnvironmentUtils.stopDaemon();
//all this ten data will fail.
write10Data(pool, false);
//restart the server
EnvironmentUtils.reactiveDaemon();
write10Data(pool, true);
}
private void write10Data(SessionPool pool, boolean failWhenThrowException) {
for (int i = 0; i < 10; i++) {
try {
pool.insertRecord("root.sg1.d1", i, Collections.singletonList("s" + i),
Collections.singletonList(TSDataType.INT64),
Collections.singletonList((long) i));
} catch (IoTDBConnectionException | StatementExecutionException e) {
//will fail this 10 times.
if (failWhenThrowException) {
fail();
}
}
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册