未验证 提交 a41c8552 编写于 作者: H HouliangQi 提交者: GitHub

Fix the CI issue introduced by thrift0.14 and the thrift server is not closed...

Fix the CI issue introduced by thrift0.14 and the thrift server is not closed properly in SessionPoolTest. (#3198)

Fix the CI issue introduced by thrift0.14 and the thrift server is not closed properly in SessionPoolTest.
上级 725cc68f
......@@ -96,12 +96,11 @@ jobs:
.\bootstrap.bat ; `
.\b2.exe
- name: Install OpenSSL
run: Invoke-WebRequest https://mirror.firedaemon.com/OpenSSL/openssl-1.1.1k.zip -OutFile D:\a\cpp\openssl-1.1.1k.zip ; `
Expand-Archive D:\a\cpp\openssl-1.1.1k.zip -DestinationPath D:\a\cpp ; `
[Environment]::SetEnvironmentVariable("Path", $env:Path + ";D:\a\cpp\openssl-1.1\x64\bin", "User") ; `
- name: Add Flex and Bison Path
run: Invoke-WebRequest https://slproweb.com/download/Win64OpenSSL-1_1_1k.exe -OutFile D:\a\cpp ; `
[Environment]::SetEnvironmentVariable("Path", $env:Path + ";D:\a\cpp", "User") ; `
- name: Add Flex and Bison Path and OpenSSL
shell: bash
run: cd /d/a/cpp && unzip win_flex_bison.zip && mv win_flex.exe flex.exe && mv win_bison.exe bison.exe && echo 'export PATH=/d/a/cpp:$PATH' >> ~/.bash_profile && source ~/.bash_profile
run: cd /d/a/cpp && unzip win_flex_bison.zip && mv win_flex.exe flex.exe && mv win_bison.exe bison.exe && mv Win64OpenSSL-1_1_1k.exe openssl.exe && echo 'export PATH=/d/a/cpp:$PATH' >> ~/.bash_profile && source ~/.bash_profile
- name: Test with Maven
shell: bash
run: source ~/.bash_profile && mvn -B clean integration-test -P compile-cpp -Dboost.include.dir=/d/a/cpp/boost_1_72_0 -Dboost.library.dir=/d/a/cpp/boost_1_72_0/stage/lib -Dtsfile.test.skip=true -Djdbc.test.skip=true -Diotdb.test.skip=true -Dtest.port.closed=true -Denforcer.skip=true -pl server,client-cpp,example/client-cpp-example -am
......@@ -241,7 +241,7 @@ public class EnvironmentUtils {
/** disable memory control</br> this function should be called before all code in the setup */
public static void envSetUp() {
logger.warn("EnvironmentUtil setup...");
IoTDBDescriptor.getInstance().getConfig().setThriftServerAwaitTimeForStopService(0);
IoTDBDescriptor.getInstance().getConfig().setThriftServerAwaitTimeForStopService(60);
// we do not start 8181 port in test.
IoTDBDescriptor.getInstance().getConfig().setEnableMetricService(false);
IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(Integer.MAX_VALUE);
......
......@@ -19,6 +19,8 @@
package org.apache.iotdb.session.pool;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
......@@ -28,6 +30,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
......@@ -44,8 +48,15 @@ import static org.junit.Assert.fail;
// this test is not for testing the correctness of Session API. So we just implement one of the API.
public class SessionPoolTest {
private static final Logger logger = LoggerFactory.getLogger(SessionPoolTest.class);
private final CompactionStrategy defaultCompaction =
IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy();
@Before
public void setUp() throws Exception {
IoTDBDescriptor.getInstance()
.getConfig()
.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.envSetUp();
......@@ -54,6 +65,7 @@ public class SessionPoolTest {
@After
public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig().setCompactionStrategy(defaultCompaction);
}
@Test
......@@ -72,20 +84,21 @@ public class SessionPoolTest {
Collections.singletonList(TSDataType.INT64),
Collections.singletonList(3L));
} catch (IoTDBConnectionException | StatementExecutionException e) {
fail();
fail(e.getMessage());
}
});
}
service.shutdown();
try {
assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
assertTrue(pool.currentAvailableSize() <= 3);
assertEquals(0, pool.currentOccupiedSize());
} catch (InterruptedException e) {
e.printStackTrace();
fail();
logger.error("insert failed", e);
fail(e.getMessage());
} finally {
pool.close();
}
assertTrue(pool.currentAvailableSize() <= 3);
assertEquals(0, pool.currentOccupiedSize());
pool.close();
}
@Test
......@@ -99,11 +112,12 @@ public class SessionPoolTest {
Collections.singletonList("s"),
Collections.singletonList(TSDataType.INT64),
Collections.singletonList(3L));
assertEquals(1, pool.currentAvailableSize());
} catch (IoTDBConnectionException | StatementExecutionException e) {
// do nothing
} finally {
pool.close();
}
assertEquals(1, pool.currentAvailableSize());
pool.close();
}
@Test
......@@ -122,7 +136,7 @@ public class SessionPoolTest {
// this is incorrect becasue wrapper is not closed.
// so all other 7 queries will be blocked
} catch (IoTDBConnectionException | StatementExecutionException e) {
fail();
fail(e.getMessage());
}
});
}
......@@ -132,10 +146,11 @@ public class SessionPoolTest {
assertEquals(0, pool.currentAvailableSize());
assertTrue(pool.currentOccupiedSize() <= 3);
} catch (InterruptedException e) {
e.printStackTrace();
fail();
logger.error("incorrectExecuteQueryStatement failed,", e);
fail(e.getMessage());
} finally {
pool.close();
}
pool.close();
}
@Test
......@@ -158,8 +173,8 @@ public class SessionPoolTest {
pool.executeQueryStatement("select * from root.sg1.d1 where time = " + no);
pool.closeResultSet(wrapper);
} catch (Exception e) {
e.printStackTrace();
fail();
logger.error("correctQuery failed", e);
fail(e.getMessage());
}
});
}
......@@ -169,13 +184,13 @@ public class SessionPoolTest {
assertTrue(pool.currentAvailableSize() <= 3);
assertEquals(0, pool.currentOccupiedSize());
} catch (InterruptedException e) {
e.printStackTrace();
fail();
logger.error("correctQuery failed", e);
fail(e.getMessage());
}
}
@Test
public void executeRawDataQuery() throws InterruptedException {
public void executeRawDataQuery() {
SessionPool pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3);
ExecutorService service = Executors.newFixedThreadPool(10);
write10Data(pool, true);
......@@ -192,16 +207,21 @@ public class SessionPoolTest {
}
pool.closeResultSet(wrapper);
} catch (Exception e) {
e.printStackTrace();
fail();
logger.error("executeRawDataQuery", e);
fail(e.getMessage());
}
});
}
service.shutdown();
assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
assertTrue(pool.currentAvailableSize() <= 3);
assertEquals(0, pool.currentOccupiedSize());
pool.close();
try {
service.shutdown();
assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
assertTrue(pool.currentAvailableSize() <= 3);
assertEquals(0, pool.currentOccupiedSize());
} catch (InterruptedException e) {
fail(e.getMessage());
} finally {
pool.close();
}
}
@Test
......@@ -219,28 +239,42 @@ public class SessionPoolTest {
}
} catch (IoTDBConnectionException e) {
pool.closeResultSet(wrapper);
pool.close();
EnvironmentUtils.stopDaemon();
EnvironmentUtils.reactiveDaemon();
pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
correctQuery(pool);
pool.close();
return;
} catch (StatementExecutionException e) {
// I do not why? the first call wrapper.hasNext() will cause InterruptedException and IoTDB
// warps
// it as StatementExecutionException, the second call can make sure that the thrift server's
// connection is closed.
// After receiving the stop request, thrift calls shutdownNow() to process the executing task.
// However, when the executing task is blocked, it will report InterruptedException error.
// And IoTDB warps it as one StatementExecutionException.
// If the thrift task thread is running, the thread will not be affected and will continue to
// run, only if the interrupt flag of the thread is set to true. So here, we call the close
// function on the client and wait for some time before the thrift server can exit normally.
try {
while (wrapper.hasNext()) {
wrapper.next();
}
} catch (IoTDBConnectionException ec) {
pool.closeResultSet(wrapper);
pool.close();
EnvironmentUtils.stopDaemon();
EnvironmentUtils.reactiveDaemon();
pool = new SessionPool("127.0.0.1", 6667, "root", "root", 3, 1, 6000, false, null, false);
correctQuery(pool);
pool.close();
} catch (StatementExecutionException es) {
fail("should be TTransportException but get an exception: " + e.getMessage());
}
return;
} finally {
if (wrapper != null) {
pool.closeResultSet(wrapper);
}
pool.close();
}
fail("should throw exception but not");
}
......@@ -260,27 +294,32 @@ public class SessionPoolTest {
while (wrapper.hasNext()) {
wrapper.next();
}
pool.closeResultSet(wrapper);
assertEquals(1, pool.currentAvailableSize());
assertEquals(0, pool.currentOccupiedSize());
} catch (IoTDBConnectionException | StatementExecutionException e) {
e.printStackTrace();
fail();
logger.error("tryIfTheServerIsRestartButDataIsGotten", e);
fail(e.getMessage());
} finally {
pool.close();
}
pool.close();
}
@Test
public void restart() throws Exception {
public void restart() {
SessionPool pool =
new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false, null, false);
write10Data(pool, true);
// stop the server.
pool.close();
EnvironmentUtils.stopDaemon();
pool = new SessionPool("127.0.0.1", 6667, "root", "root", 1, 1, 1000, false, null, false);
// all this ten data will fail.
write10Data(pool, false);
// restart the server
EnvironmentUtils.reactiveDaemon();
write10Data(pool, true);
pool.close();
}
private void write10Data(SessionPool pool, boolean failWhenThrowException) {
......@@ -295,7 +334,7 @@ public class SessionPoolTest {
} catch (IoTDBConnectionException | StatementExecutionException e) {
// will fail this 10 times.
if (failWhenThrowException) {
fail();
fail(e.getMessage());
}
}
}
......@@ -316,7 +355,7 @@ public class SessionPoolTest {
} catch (IoTDBConnectionException e) {
Assert.assertEquals("Session pool is closed", e.getMessage());
} catch (StatementExecutionException e) {
fail();
fail(e.getMessage());
}
// some other test cases are not covered:
// e.g., thread A created a new session, but not returned; thread B close the pool; A get the
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册