diff --git a/core/src/main/java/hudson/TcpSlaveAgentListener.java b/core/src/main/java/hudson/TcpSlaveAgentListener.java index 602bb31c724e278a5ccd9144292016805ec690ee..fea34b56b4a0bb56502bebb11a3ef5722abf8ecc 100644 --- a/core/src/main/java/hudson/TcpSlaveAgentListener.java +++ b/core/src/main/java/hudson/TcpSlaveAgentListener.java @@ -30,6 +30,8 @@ import java.io.Writer; import java.nio.charset.Charset; import java.security.interfaces.RSAPublicKey; import javax.annotation.Nullable; + +import hudson.model.AperiodicWork; import jenkins.model.Jenkins; import jenkins.model.identity.InstanceIdentityProvider; import jenkins.util.SystemProperties; @@ -53,6 +55,7 @@ import java.net.Socket; import java.nio.channels.ServerSocketChannel; import java.util.logging.Level; import java.util.logging.Logger; + import org.apache.commons.codec.binary.Base64; import org.apache.commons.io.Charsets; import org.apache.commons.io.IOUtils; @@ -98,6 +101,11 @@ public final class TcpSlaveAgentListener extends Thread { throw (BindException)new BindException("Failed to listen on port "+port+" because it's already in use.").initCause(e); } this.configuredPort = port; + setUncaughtExceptionHandler((t, e) -> { + LOGGER.log(Level.SEVERE, "Uncaught exception in TcpSlaveAgentListener " + t + ", attempting to reschedule thread", e); + shutdown(); + TcpSlaveAgentListenerRescheduler.schedule(t, port, e); + }); LOGGER.log(Level.FINE, "TCP agent listener started on port {0}", getPort()); @@ -155,7 +163,14 @@ public final class TcpSlaveAgentListener extends Thread { // we take care of buffering on our own s.setTcpNoDelay(true); - new ConnectionHandler(s).start(); + new ConnectionHandler(s, new ConnectionHandlerFailureCallback(this, configuredPort) { + @Override + public void run(Throwable cause) { + LOGGER.log(Level.WARNING, "Connection handler failed, restarting listener", cause); + shutdown(); + TcpSlaveAgentListenerRescheduler.schedule(getParentThread(), getPort(), cause); + } + }).start(); } } catch (IOException e) { if(!shuttingDown) { @@ -194,12 +209,21 @@ public final class TcpSlaveAgentListener extends Thread { */ private final int id; - public ConnectionHandler(Socket s) { + public ConnectionHandler(Socket s, ConnectionHandlerFailureCallback parentTerminator) { this.s = s; synchronized(getClass()) { id = iotaGen++; } setName("TCP agent connection handler #"+id+" with "+s.getRemoteSocketAddress()); + setUncaughtExceptionHandler((t, e) -> { + LOGGER.log(Level.SEVERE, "Uncaught exception in TcpSlaveAgentListener ConnectionHandler " + t, e); + try { + s.close(); + parentTerminator.run(e); + } catch (IOException e1) { + LOGGER.log(Level.WARNING, "Could not close socket after unexpected thread death", e1); + } + }); } @Override @@ -301,6 +325,27 @@ public final class TcpSlaveAgentListener extends Thread { } } + // This is essentially just to be able to pass the parent thread into the callback, as it can't access it otherwise + private abstract class ConnectionHandlerFailureCallback { + private Thread parentThread; + private int port; + + public ConnectionHandlerFailureCallback(Thread parentThread, int port) { + this.parentThread = parentThread; + this.port = port; + } + + public int getPort() { + return port; + } + + public Thread getParentThread() { + return parentThread; + } + + public abstract void run(Throwable cause); + } + /** * This extension provides a Ping protocol that allows people to verify that the TcpSlaveAgentListener is alive. * We also use this to wake the acceptor thread on termination. @@ -383,6 +428,66 @@ public final class TcpSlaveAgentListener extends Thread { } } + /** + * Reschedules the TcpSlaveAgentListener on demand. Disables itself after running. + */ + @Extension + @Restricted(NoExternalUse.class) + public static class TcpSlaveAgentListenerRescheduler extends AperiodicWork { + private Thread originThread; + private int port; + private Throwable cause; + private long recurrencePeriod = 5000; + private boolean isActive; + + public TcpSlaveAgentListenerRescheduler(Thread originThread, int port, Throwable cause) { + this.originThread = originThread; + this.port = port; + this.cause = cause; + this.isActive = false; + } + + @Override + public long getRecurrencePeriod() { + return recurrencePeriod; + } + + @Override + public AperiodicWork getNewInstance() { + return new TcpSlaveAgentListenerRescheduler(originThread, port, cause); + } + + @Override + protected void doAperiodicRun() { + if (isActive) { + try { + if (originThread.isAlive()) { + originThread.interrupt(); + } + new TcpSlaveAgentListener(port).start(); + LOGGER.log(Level.INFO, "Restarted TcpSlaveAgentListener"); + isActive = false; + } catch (IOException e) { + LOGGER.log(Level.SEVERE, "Could not reschedule TcpSlaveAgentListener - trying again.", cause); + } + } + } + + public static void schedule(Thread originThread, int port, Throwable cause) { + schedule(originThread, port, cause,5000); + } + + public static void schedule(Thread originThread, int port, Throwable cause, long approxDelay) { + TcpSlaveAgentListenerRescheduler rescheduler = AperiodicWork.all().get(TcpSlaveAgentListenerRescheduler.class); + rescheduler.originThread = originThread; + rescheduler.port = port; + rescheduler.cause = cause; + rescheduler.recurrencePeriod = approxDelay; + rescheduler.isActive = true; + } + } + + /** * Connection terminated because we are reconnected from the current peer. */