diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java index 3cdc9ab972050521be3dbdf7a9688585890de9be..c7a53ebdc0bf20fe29334cdf3245b7466de6f97e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.service.zk; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooKeeperServerMain; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; @@ -34,44 +35,62 @@ import java.util.concurrent.atomic.AtomicBoolean; public class ZKServer { private static final Logger logger = LoggerFactory.getLogger(ZKServer.class); - private static volatile PublicZooKeeperServerMain zkServer = null; - public static final int DEFAULT_ZK_TEST_PORT = 2181; - private static String dataDir = null; + private final AtomicBoolean isStarted = new AtomicBoolean(false); + + private PublicZooKeeperServerMain zooKeeperServerMain = null; + + private int port; - private static final AtomicBoolean isStarted = new AtomicBoolean(false); + private String dataDir = null; + + private String prefix; public static void main(String[] args) { - if(!isStarted()){ - ZKServer.start(); - - /** - * register hooks, which are called before the process exits - */ - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - stop(); - } - })); - }else{ - logger.info("zk server aleady started"); + ZKServer zkServer; + if (args.length == 0) { + zkServer = new ZKServer(); + } else if (args.length == 1){ + zkServer = new ZKServer(Integer.valueOf(args[0]), ""); + } else { + zkServer = new ZKServer(Integer.valueOf(args[0]), args[1]); + } + zkServer.registerHook(); + zkServer.start(); + } + + public ZKServer() { + this(DEFAULT_ZK_TEST_PORT, ""); + } + + public ZKServer(int port, String prefix) { + this.port = port; + if (prefix != null && prefix.contains("/")) { + throw new IllegalArgumentException("The prefix of path may not have '/'"); } + this.prefix = (prefix == null ? null : prefix.trim()); + } + + private void registerHook() { + /** + * register hooks, which are called before the process exits + */ + Runtime.getRuntime().addShutdownHook(new Thread(this::stop)); } /** * start service */ - public static void start() { + public void start() { try { - startLocalZkServer(DEFAULT_ZK_TEST_PORT); + startLocalZkServer(port); } catch (Exception e) { - logger.error("Failed to start ZK: " + e); + logger.error("Failed to start ZK ", e); } } - public static boolean isStarted(){ + public boolean isStarted(){ return isStarted.get(); } @@ -94,8 +113,12 @@ public class ZKServer { * * @param port The port to listen on */ - public static void startLocalZkServer(final int port) { - String zkDataDir = System.getProperty("user.dir") +"/zookeeper_data"; + public void startLocalZkServer(final int port) { + String zkDataDir = System.getProperty("user.dir") + (StringUtils.isEmpty(prefix) ? StringUtils.EMPTY : ("/" + prefix)) + "/zookeeper_data"; + File file = new File(zkDataDir); + if (file.exists()) { + logger.warn("The path of zk server exists"); + } logger.info("zk server starting, data dir path:{}" , zkDataDir); startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME,"60"); } @@ -108,31 +131,29 @@ public class ZKServer { * @param tickTime zk tick time * @param maxClientCnxns zk max client connections */ - private static synchronized void startLocalZkServer(final int port, final String dataDirPath,final int tickTime,String maxClientCnxns) { - if (zkServer != null) { - throw new RuntimeException("Zookeeper server is already started!"); - } - zkServer = new PublicZooKeeperServerMain(); - logger.info("Zookeeper data path : {} ", dataDirPath); - dataDir = dataDirPath; - final String[] args = new String[]{Integer.toString(port), dataDirPath, Integer.toString(tickTime), maxClientCnxns}; + private void startLocalZkServer(final int port, final String dataDirPath,final int tickTime,String maxClientCnxns) { + if (isStarted.compareAndSet(false, true)) { + zooKeeperServerMain = new PublicZooKeeperServerMain(); + logger.info("Zookeeper data path : {} ", dataDirPath); + dataDir = dataDirPath; + final String[] args = new String[]{Integer.toString(port), dataDirPath, Integer.toString(tickTime), maxClientCnxns}; - try { - logger.info("Zookeeper server started "); - isStarted.compareAndSet(false, true); - - zkServer.initializeAndRun(args); - } catch (QuorumPeerConfig.ConfigException e) { - logger.warn("Caught exception while starting ZK", e); - } catch (IOException e) { - logger.warn("Caught exception while starting ZK", e); + try { + logger.info("Zookeeper server started "); + isStarted.compareAndSet(false, true); + + zooKeeperServerMain.initializeAndRun(args); + } catch (QuorumPeerConfig.ConfigException | IOException e) { + logger.warn("Caught exception while starting ZK", e); + throw new RuntimeException(e); + } } } /** * Stops a local Zk instance, deleting its data directory */ - public static void stop() { + public void stop() { try { stopLocalZkServer(true); logger.info("zk server stopped"); @@ -147,19 +168,21 @@ public class ZKServer { * * @param deleteDataDir Whether or not to delete the data directory */ - private static synchronized void stopLocalZkServer(final boolean deleteDataDir) { - if (zkServer != null) { + private void stopLocalZkServer(final boolean deleteDataDir) { + if (isStarted.compareAndSet(true, false)) { try { - zkServer.shutdown(); - zkServer = null; + if (zooKeeperServerMain == null) { + return; + } + zooKeeperServerMain.shutdown(); + zooKeeperServerMain = null; if (deleteDataDir) { org.apache.commons.io.FileUtils.deleteDirectory(new File(dataDir)); } - isStarted.compareAndSet(true, false); } catch (Exception e) { logger.warn("Caught exception while stopping ZK server", e); throw new RuntimeException(e); } } } -} \ No newline at end of file +} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java index 42b942b907ffb34ece8dfd536a35be767c24c8ce..10be65e90a8cf081a0e631c892a3677ef0532f1c 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZKServerTest.java @@ -18,18 +18,44 @@ package org.apache.dolphinscheduler.service.zk; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -// ZKServer is a process, can't unit test -public class ZKServerTest { +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +public class ZKServerTest { + private static final Logger log = LoggerFactory.getLogger(ZKServerTest.class); @Test - public void isStarted() { - Assert.assertEquals(false, ZKServer.isStarted()); + public void testRunWithDefaultPort() { + AtomicReference zkServer = new AtomicReference<>(); + new Thread(() -> { + zkServer.set(new ZKServer()); + zkServer.get().start(); + }).start(); + try { + TimeUnit.SECONDS.sleep(5); + Assert.assertEquals(true, zkServer.get().isStarted()); + } catch (InterruptedException e) { + log.error("Thread interrupted", e); + } + zkServer.get().stop(); } @Test - public void stop() { - ZKServer.stop(); + public void testRunWithCustomPort() { + AtomicReference zkServer = new AtomicReference<>(); + new Thread(() -> { + zkServer.set(new ZKServer(2183, null)); + zkServer.get().start(); + }).start(); + try { + TimeUnit.SECONDS.sleep(5); + Assert.assertEquals(true, zkServer.get().isStarted()); + } catch (InterruptedException e) { + log.error("Thread interrupted", e); + } + zkServer.get().stop(); } } \ No newline at end of file