提交 50bec3f6 编写于 作者: R Rebecca Ysteboe 提交者: Oleg Nenashev

[JENKINS-38711] Add UncaughtExceptionHandler to remoting related threads (#2986)

* [JENKINS-38711] Add UncaughtExceptionHandler to remoting related threads

* [JENKINS-38711] Restart listener thread if it fails

* [JENKINS-38711] Reschedule the thread on a delay

* [JENKINS-38711] Allow the listener to actually be restarted

* [JENKINS-38711] Minor updates based on feedback
上级 253b0c8b
......@@ -30,6 +30,8 @@ import java.io.Writer;
import java.nio.charset.Charset;
import java.security.interfaces.RSAPublicKey;
import javax.annotation.Nullable;
import hudson.model.AperiodicWork;
import jenkins.model.Jenkins;
import jenkins.model.identity.InstanceIdentityProvider;
import jenkins.util.SystemProperties;
......@@ -53,6 +55,7 @@ import java.net.Socket;
import java.nio.channels.ServerSocketChannel;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
......@@ -98,6 +101,11 @@ public final class TcpSlaveAgentListener extends Thread {
throw (BindException)new BindException("Failed to listen on port "+port+" because it's already in use.").initCause(e);
}
this.configuredPort = port;
setUncaughtExceptionHandler((t, e) -> {
LOGGER.log(Level.SEVERE, "Uncaught exception in TcpSlaveAgentListener " + t + ", attempting to reschedule thread", e);
shutdown();
TcpSlaveAgentListenerRescheduler.schedule(t, port, e);
});
LOGGER.log(Level.FINE, "TCP agent listener started on port {0}", getPort());
......@@ -155,7 +163,14 @@ public final class TcpSlaveAgentListener extends Thread {
// we take care of buffering on our own
s.setTcpNoDelay(true);
new ConnectionHandler(s).start();
new ConnectionHandler(s, new ConnectionHandlerFailureCallback(this, configuredPort) {
@Override
public void run(Throwable cause) {
LOGGER.log(Level.WARNING, "Connection handler failed, restarting listener", cause);
shutdown();
TcpSlaveAgentListenerRescheduler.schedule(getParentThread(), getPort(), cause);
}
}).start();
}
} catch (IOException e) {
if(!shuttingDown) {
......@@ -194,12 +209,21 @@ public final class TcpSlaveAgentListener extends Thread {
*/
private final int id;
public ConnectionHandler(Socket s) {
public ConnectionHandler(Socket s, ConnectionHandlerFailureCallback parentTerminator) {
this.s = s;
synchronized(getClass()) {
id = iotaGen++;
}
setName("TCP agent connection handler #"+id+" with "+s.getRemoteSocketAddress());
setUncaughtExceptionHandler((t, e) -> {
LOGGER.log(Level.SEVERE, "Uncaught exception in TcpSlaveAgentListener ConnectionHandler " + t, e);
try {
s.close();
parentTerminator.run(e);
} catch (IOException e1) {
LOGGER.log(Level.WARNING, "Could not close socket after unexpected thread death", e1);
}
});
}
@Override
......@@ -301,6 +325,27 @@ public final class TcpSlaveAgentListener extends Thread {
}
}
// This is essentially just to be able to pass the parent thread into the callback, as it can't access it otherwise
private abstract class ConnectionHandlerFailureCallback {
private Thread parentThread;
private int port;
public ConnectionHandlerFailureCallback(Thread parentThread, int port) {
this.parentThread = parentThread;
this.port = port;
}
public int getPort() {
return port;
}
public Thread getParentThread() {
return parentThread;
}
public abstract void run(Throwable cause);
}
/**
* This extension provides a Ping protocol that allows people to verify that the TcpSlaveAgentListener is alive.
* We also use this to wake the acceptor thread on termination.
......@@ -383,6 +428,66 @@ public final class TcpSlaveAgentListener extends Thread {
}
}
/**
* Reschedules the <code>TcpSlaveAgentListener</code> on demand. Disables itself after running.
*/
@Extension
@Restricted(NoExternalUse.class)
public static class TcpSlaveAgentListenerRescheduler extends AperiodicWork {
private Thread originThread;
private int port;
private Throwable cause;
private long recurrencePeriod = 5000;
private boolean isActive;
public TcpSlaveAgentListenerRescheduler(Thread originThread, int port, Throwable cause) {
this.originThread = originThread;
this.port = port;
this.cause = cause;
this.isActive = false;
}
@Override
public long getRecurrencePeriod() {
return recurrencePeriod;
}
@Override
public AperiodicWork getNewInstance() {
return new TcpSlaveAgentListenerRescheduler(originThread, port, cause);
}
@Override
protected void doAperiodicRun() {
if (isActive) {
try {
if (originThread.isAlive()) {
originThread.interrupt();
}
new TcpSlaveAgentListener(port).start();
LOGGER.log(Level.INFO, "Restarted TcpSlaveAgentListener");
isActive = false;
} catch (IOException e) {
LOGGER.log(Level.SEVERE, "Could not reschedule TcpSlaveAgentListener - trying again.", cause);
}
}
}
public static void schedule(Thread originThread, int port, Throwable cause) {
schedule(originThread, port, cause,5000);
}
public static void schedule(Thread originThread, int port, Throwable cause, long approxDelay) {
TcpSlaveAgentListenerRescheduler rescheduler = AperiodicWork.all().get(TcpSlaveAgentListenerRescheduler.class);
rescheduler.originThread = originThread;
rescheduler.port = port;
rescheduler.cause = cause;
rescheduler.recurrencePeriod = approxDelay;
rescheduler.isActive = true;
}
}
/**
* Connection terminated because we are reconnected from the current peer.
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册