提交 049c0984 编写于 作者: S Steve Yurong Su

optimize session creation

上级 df4e110d
......@@ -174,7 +174,6 @@ public class SessionPool {
// if this method throws an exception, either the server is broken, or the ip/port/user/password
// is incorrect.
@SuppressWarnings({"squid:S3776", "squid:S2446"}) // Suppress high Cognitive Complexity warning
private Session getSession() throws IoTDBConnectionException {
Session session = queue.poll();
......@@ -183,91 +182,92 @@ public class SessionPool {
}
if (session != null) {
return session;
} else {
long start = System.currentTimeMillis();
boolean canCreate = false;
}
boolean shouldCreate = false;
long start = System.currentTimeMillis();
while (session == null) {
synchronized (this) {
if (size < maxSize) {
// we can create more session
size++;
canCreate = true;
shouldCreate = true;
// but we do it after skip synchronized block because connection a session is time
// consuming.
break;
}
}
if (canCreate) {
// create a new one.
if (logger.isDebugEnabled()) {
logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
}
session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
// we have to wait for someone returns a session.
try {
session.open(enableCompression);
// avoid someone has called close() the session pool
synchronized (this) {
if (closed) {
// have to release the connection...
session.close();
throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
} else {
return session;
}
if (logger.isDebugEnabled()) {
logger.debug("no more sessions can be created, wait... queue.size={}", queue.size());
}
} catch (IoTDBConnectionException e) {
// if exception, we will throw the exception.
// Meanwhile, we have to set size--
synchronized (this) {
size--;
// we do not need to notifyAll as any waited thread can continue to work after waked up.
this.notify();
if (logger.isDebugEnabled()) {
logger.debug("open session failed, reduce the count and notify others...");
this.wait(1000);
long time = timeout < 60_000 ? timeout : 60_000;
if (System.currentTimeMillis() - start > time) {
logger.warn(
"the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
(System.currentTimeMillis() - start) / 1000,
ip,
port,
user,
password);
logger.warn(
"current occupied size {}, queue size {}, considered size {} ",
occupied.size(),
queue.size(),
size);
if (System.currentTimeMillis() - start > timeout) {
throw new IoTDBConnectionException(
String.format("timeout to get a connection from %s:%s", ip, port));
}
}
throw e;
} catch (InterruptedException e) {
logger.error("the SessionPool is damaged", e);
Thread.currentThread().interrupt();
}
} else {
while (session == null) {
synchronized (this) {
if (closed) {
throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
}
// we have to wait for someone returns a session.
try {
if (logger.isDebugEnabled()) {
logger.debug(
"no more sessions can be created, wait... queue.size={}", queue.size());
}
this.wait(1000);
long time = timeout < 60_000 ? timeout : 60_000;
if (System.currentTimeMillis() - start > time) {
logger.warn(
"the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}",
(System.currentTimeMillis() - start) / 1000,
ip,
port,
user,
password);
logger.warn(
"current occupied size {}, queue size {}, considered size {} ",
occupied.size(),
queue.size(),
size);
if (System.currentTimeMillis() - start > timeout) {
throw new IoTDBConnectionException(
String.format("timeout to get a connection from %s:%s", ip, port));
}
}
} catch (InterruptedException e) {
logger.error("the SessionPool is damaged", e);
Thread.currentThread().interrupt();
}
session = queue.poll();
session = queue.poll();
if (closed) {
throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
}
}
}
if (shouldCreate) {
// create a new one.
if (logger.isDebugEnabled()) {
logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password);
}
session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader);
try {
session.open(enableCompression);
// avoid someone has called close() the session pool
synchronized (this) {
if (closed) {
// have to release the connection...
session.close();
throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
}
}
return session;
} catch (IoTDBConnectionException e) {
// if exception, we will throw the exception.
// Meanwhile, we have to set size--
synchronized (this) {
size--;
// we do not need to notifyAll as any waited thread can continue to work after waked up.
this.notify();
if (logger.isDebugEnabled()) {
logger.debug("open session failed, reduce the count and notify others...");
}
}
throw e;
}
}
return session;
}
public int currentAvailableSize() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册