diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/ControllerStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/ControllerStateListener.java index 3f43f57bec393a1e9d27d406d6f377e7a68a1405..c417df66c87b8a3e122b50b20b4b8c1b900822f6 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/ControllerStateListener.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/ControllerStateListener.java @@ -19,13 +19,13 @@ import org.springframework.dao.DuplicateKeyException; * @date 20/5/14 */ public class ControllerStateListener implements StateChangeListener { - private final static Logger LOGGER = LoggerFactory.getLogger(ControllerStateListener.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ControllerStateListener.class); - private Long clusterId; + private final Long clusterId; - private ZkConfigImpl zkConfig; + private final ZkConfigImpl zkConfig; - private ControllerDao controllerDao; + private final ControllerDao controllerDao; public ControllerStateListener(Long clusterId, ZkConfigImpl zkConfig, ControllerDao controllerDao) { this.clusterId = clusterId; @@ -35,8 +35,11 @@ public class ControllerStateListener implements StateChangeListener { @Override public void init() { + if (!checkNodeExist()) { + LOGGER.warn("kafka-controller data not exist, clusterId:{}.", clusterId); + return; + } processControllerChange(); - return; } @Override @@ -49,12 +52,21 @@ public class ControllerStateListener implements StateChangeListener { break; } } catch (Exception e) { - LOGGER.error("process controller state change failed, clusterId:{} state:{} path:{}.", - clusterId, state, path, e); + LOGGER.error("process controller state change failed, clusterId:{} state:{} path:{}.", clusterId, state, path, e); } } - private void processControllerChange(){ + private boolean checkNodeExist() { + try { + return zkConfig.checkPathExists(ZkPathUtil.CONTROLLER_ROOT_NODE); + } catch (Exception e) { + LOGGER.error("init kafka-controller data failed, clusterId:{}.", clusterId, e); + } + + return false; + } + + private void processControllerChange() { LOGGER.warn("init controllerData or controller change, clusterId:{}.", clusterId); ControllerData controllerData = null; try {