diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 2baa07f19739b8ef521229038326db46a44e5445..8dcf95789d8639099cbe15a6c706b8bd4e5e54b8 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -206,8 +206,10 @@ public class NodeManager { } private TRuntimeConfiguration getRuntimeConfiguration() { - getPipeManager().getPipePluginCoordinator().lock(); + // getPipeTaskCoordinator.lock() should be called outside the getPipePluginCoordinator().lock() + // to avoid deadlock getPipeManager().getPipeTaskCoordinator().lock(); + getPipeManager().getPipePluginCoordinator().lock(); getTriggerManager().getTriggerInfo().acquireTriggerTableLock(); getUDFManager().getUdfInfo().acquireUDFTableLock(); @@ -226,8 +228,11 @@ public class NodeManager { } finally { getTriggerManager().getTriggerInfo().releaseTriggerTableLock(); getUDFManager().getUdfInfo().releaseUDFTableLock(); - getPipeManager().getPipeTaskCoordinator().unlock(); getPipeManager().getPipePluginCoordinator().unlock(); + // getPipeTaskCoordinator.unlock() should be called outside the + // getPipePluginCoordinator().unlock() + // to avoid deadlock + getPipeManager().getPipeTaskCoordinator().unlock(); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 2c9485c29fc736b553aa50aab9d53320d3189e12..8aaa0948f80a0cae584019a11f8ac4515bfcc97a 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -223,22 +223,23 @@ public class PipeTaskInfo implements SnapshotProcessor { dataRegionGroupId, new PipeTaskMeta( new MinimumProgressIndex(), newDataRegionLeader)); - } else { - LOGGER.warn( - "The pipe task meta does not contain the data region group {} or the data region group has already been removed", - dataRegionGroupId); } + // else: + // "The pipe task meta does not contain the data region group {} or + // the data region group has already been removed" } })); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } public TSStatus handleMetaChanges(PipeHandleMetaChangePlan plan) { + LOGGER.info("Handling pipe meta changes ..."); pipeMetaKeeper.clear(); plan.getPipeMetaList() .forEach( pipeMeta -> { pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta); + LOGGER.info("Recording pipe meta: {}", pipeMeta); }); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java index 0c218184b5c3dcd420d109ddfd642163f51ddca5..3cbcebe6f6bb9d5ac5906708adc8275e73c43c7d 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java @@ -100,10 +100,10 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur } @Override - protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException { + protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) { LOGGER.info("PipeHandleLeaderChangeProcedure: executeFromHandleOnDataNodes"); - pushPipeMetaToDataNodes(env); + pushPipeMetaToDataNodesIgnoreException(env); } @Override @@ -142,10 +142,10 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur } @Override - protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException { + protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) { LOGGER.info("PipeHandleLeaderChangeProcedure: rollbackFromCreateOnDataNodes"); - pushPipeMetaToDataNodes(env); + pushPipeMetaToDataNodesIgnoreException(env); } @Override diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java index 8f9f9207ebc84242f5601bdf838065c55e9edef1..39948e9fbb9f717bb9286f77473d068136ec0545 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java @@ -138,9 +138,20 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV .getValue() .getProgressIndex() .isAfter(runtimeMetaFromDataNode.getProgressIndex())) { - runtimeMetaOnConfigNode - .getValue() - .updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex()); + LOGGER.info( + "Updating progress index for (pipe name: {}, consensus group id: {}) ... Progress index on config node: {}, progress index from data node: {}", + pipeMetaOnConfigNode.getStaticMeta().getPipeName(), + runtimeMetaOnConfigNode.getKey(), + runtimeMetaOnConfigNode.getValue().getProgressIndex(), + runtimeMetaFromDataNode.getProgressIndex()); + LOGGER.info( + "Progress index for (pipe name: {}, consensus group id: {}) is updated to {}", + pipeMetaOnConfigNode.getStaticMeta().getPipeName(), + runtimeMetaOnConfigNode.getKey(), + runtimeMetaOnConfigNode + .getValue() + .updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex())); + needWriteConsensusOnConfigNodes = true; } @@ -149,6 +160,7 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV pipeTaskMetaOnConfigNode.clearExceptionMessages(); for (final PipeRuntimeException exception : runtimeMetaFromDataNode.getExceptionMessages()) { + pipeTaskMetaOnConfigNode.trackExceptionMessage(exception); if (exception instanceof PipeRuntimeCriticalException) { @@ -159,6 +171,7 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV .get() .equals(PipeStatus.STOPPED)) { pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED); + needWriteConsensusOnConfigNodes = true; needPushPipeMetaToDataNodes = true; @@ -181,6 +194,7 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV .forEach( status -> { status.set(PipeStatus.STOPPED); + needWriteConsensusOnConfigNodes = true; needPushPipeMetaToDataNodes = true; @@ -224,14 +238,14 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV } @Override - protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException { + protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) { LOGGER.info("PipeHandleMetaChangeProcedure: executeFromHandleOnDataNodes"); if (!needPushPipeMetaToDataNodes) { return; } - pushPipeMetaToDataNodes(env); + pushPipeMetaToDataNodesIgnoreException(env); } @Override diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java index 167008e2ed35c1cdcf78044a7e4a65b5a4a1a199..a7faa989ab9e988e9771ab55cbb173b894729e1d 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java @@ -66,10 +66,10 @@ public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 { } @Override - protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException { + protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) { LOGGER.info("PipeMetaSyncProcedure: executeFromOperateOnDataNodes"); - pushPipeMetaToDataNodes(env); + pushPipeMetaToDataNodesIgnoreException(env); } @Override @@ -94,10 +94,10 @@ public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 { } @Override - protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException { + protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) { LOGGER.info("PipeMetaSyncProcedure: rollbackFromOperateOnDataNodes"); - pushPipeMetaToDataNodes(env); + // do nothing } @Override diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java index 15065499562b83ec0d7fe084e1b46f53f1fa8264..133a74854be3d81ed6c8b7e01f3d96e60aecb8c4 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AbstractOperatePipeProcedureV2.java @@ -200,6 +200,14 @@ public abstract class AbstractOperatePipeProcedureV2 } } + protected void pushPipeMetaToDataNodesIgnoreException(ConfigNodeProcedureEnv env) { + try { + pushPipeMetaToDataNodes(env); + } catch (Throwable throwable) { + LOGGER.info("Failed to push pipe meta list to data nodes, will retry later.", throwable); + } + } + @Override public void serialize(DataOutputStream stream) throws IOException { super.serialize(stream); diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java index eaa800968235fb3d8a9eda94f217ec2b013aa424..9ce9890ecb415b65aaa807d3017dad8e5001db13 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java @@ -165,7 +165,7 @@ public class SimpleProgressIndex implements ProgressIndex { // thatSimpleProgressIndex.memtableFlushOrderId return this; } finally { - lock.writeLock().lock(); + lock.writeLock().unlock(); } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java index 46547704d0819a24fa387b012d5bf7a4bd2b6fa2..43330fa392b1daf970c12d44d471e9bd4951af8c 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java @@ -52,8 +52,9 @@ public class PipeTaskMeta { return progressIndex.get(); } - public void updateProgressIndex(ProgressIndex updateIndex) { - progressIndex.updateAndGet(index -> index.updateToMinimumIsAfterProgressIndex(updateIndex)); + public ProgressIndex updateProgressIndex(ProgressIndex updateIndex) { + return progressIndex.updateAndGet( + index -> index.updateToMinimumIsAfterProgressIndex(updateIndex)); } public int getLeaderDataNodeId() { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java deleted file mode 100644 index de3f30f388b07123e18004f0dba8cf8bf5df1cd6..0000000000000000000000000000000000000000 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/HeartbeatScheduler.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.agent.runtime; - -/** HeartbeatScheduler is used to schedule the heartbeat of the pipe. */ -public class HeartbeatScheduler {} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java index adf15709aff6202962ce2d3f72e33ac7503bc07e..16f3716bc13ffef3c6acfa99a5ed7c993243113b 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java @@ -163,13 +163,19 @@ class PipeAgentLauncher { PipeAgent.task() .handlePipeMetaChanges( getAllPipeInfoResp.getAllPipeInfo().stream() - .map(PipeMeta::deserialize) + .map( + byteBuffer -> { + final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer); + LOGGER.info( + "Pulled pipe meta from config node: {}, recovering ...", pipeMeta); + return pipeMeta; + }) .collect(Collectors.toList())); - } catch (Throwable throwable) { + } catch (Exception e) { LOGGER.info( "Failed to get pipe task meta from config node. Ignore the exception, " + "because config node may not be ready yet, and meta will be pushed by config node later.", - throwable); + e); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java index d086e6e71ea487091a5f4fbe932f7a7890030fe0..d21d8ed5d4ee56242b8300703c391d8d7ef617ea 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** @@ -94,11 +95,11 @@ public class PipeTaskAgent { // if pipe meta does not exist on data node, create a new pipe if (metaOnDataNode == null) { - createPipe(metaFromConfigNode); - if (metaFromConfigNode.getRuntimeMeta().getStatus().get() == PipeStatus.RUNNING) { + if (createPipe(metaFromConfigNode)) { + // if the status recorded in config node is RUNNING, start the pipe startPipe(pipeName, metaFromConfigNode.getStaticMeta().getCreationTime()); } - // if the status is STOPPED or DROPPED, do nothing + // if the status recorded in config node is STOPPED or DROPPED, do nothing continue; } @@ -109,8 +110,7 @@ public class PipeTaskAgent { // first check if pipe static meta has changed, if so, drop the pipe and create a new one if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) { dropPipe(pipeName); - createPipe(metaFromConfigNode); - if (metaFromConfigNode.getRuntimeMeta().getStatus().get() == PipeStatus.RUNNING) { + if (createPipe(metaFromConfigNode)) { startPipe(pipeName, metaFromConfigNode.getStaticMeta().getCreationTime()); } // if the status is STOPPED or DROPPED, do nothing @@ -245,9 +245,17 @@ public class PipeTaskAgent { ////////////////////////// Manage by Pipe Name ////////////////////////// - private void createPipe(PipeMeta pipeMeta) { - final String pipeName = pipeMeta.getStaticMeta().getPipeName(); - final long creationTime = pipeMeta.getStaticMeta().getCreationTime(); + /** + * Create a new pipe. If the pipe already exists, do nothing and return false. Otherwise, create + * the pipe and return true. + * + * @param pipeMetaFromConfigNode pipe meta from config node + * @return true if the pipe is created successfully and should be started, false if the pipe + * already exists or is created but should not be started + */ + private boolean createPipe(PipeMeta pipeMetaFromConfigNode) { + final String pipeName = pipeMetaFromConfigNode.getStaticMeta().getPipeName(); + final long creationTime = pipeMetaFromConfigNode.getStaticMeta().getCreationTime(); final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); if (existedPipeMeta != null) { @@ -260,7 +268,7 @@ public class PipeTaskAgent { pipeName, creationTime, existedPipeMeta.getRuntimeMeta().getStatus().get().name()); - return; + return false; case DROPPED: LOGGER.info( "Pipe {} (creation time = {}) has already been dropped, but the pipe task meta has not been cleaned up. " @@ -284,16 +292,25 @@ public class PipeTaskAgent { } // create pipe tasks and trigger create() method for each pipe task - final Map pipeTasks = new PipeBuilder(pipeMeta).build(); + final Map pipeTasks = + new PipeBuilder(pipeMetaFromConfigNode).build(); for (PipeTask pipeTask : pipeTasks.values()) { pipeTask.create(); } - pipeTaskManager.addPipeTasks(pipeMeta.getStaticMeta(), pipeTasks); + pipeTaskManager.addPipeTasks(pipeMetaFromConfigNode.getStaticMeta(), pipeTasks); + + // No matter the pipe status from config node is RUNNING or STOPPED, we always set the status + // of pipe meta to STOPPED when it is created. The STOPPED status should always be the initial + // status of a pipe, which makes the status transition logic simpler. + final AtomicReference pipeStatusFromConfigNode = + pipeMetaFromConfigNode.getRuntimeMeta().getStatus(); + final boolean needToStartPipe = pipeStatusFromConfigNode.get() == PipeStatus.RUNNING; + pipeStatusFromConfigNode.set(PipeStatus.STOPPED); + + pipeMetaKeeper.addPipeMeta(pipeName, pipeMetaFromConfigNode); - // add pipe meta to pipe meta keeper - // note that we do not need to set the status of pipe meta here, because the status of pipe meta - // is already set to STOPPED when it is created - pipeMetaKeeper.addPipeMeta(pipeName, pipeMeta); + // If the pipe status from config node is RUNNING, we will start the pipe later. + return needToStartPipe; } private void dropPipe(String pipeName, long creationTime) { @@ -551,7 +568,8 @@ public class PipeTaskAgent { public synchronized void collectPipeMetaList(THeartbeatReq req, THeartbeatResp resp) throws TException { - if (!req.isNeedPipeMetaList()) { + // do nothing if data node is removing or removed, or request does not need pipe meta list + if (PipeAgent.runtime().isShutdown() || !req.isNeedPipeMetaList()) { return; } @@ -559,6 +577,7 @@ public class PipeTaskAgent { try { for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); + LOGGER.info("Reporting pipe meta: {}", pipeMeta); } } catch (IOException e) { throw new TException(e); diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java index 1dcc1fb9320e631469e14856ea9d2c00eac54081..3e09b86e74f39ffbdad253652d31fbcc685a7f05 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java @@ -46,15 +46,13 @@ public class TsFileEpochManager { // this would not happen, but just in case if (!filePath2Epoch.containsKey(filePath)) { - LOGGER.warn( - String.format("PipeEngine: can not find TsFileEpoch for TsFile %s, create it", filePath)); + LOGGER.info( + String.format("Pipe: can not find TsFileEpoch for TsFile %s, creating it", filePath)); filePath2Epoch.put(filePath, new TsFileEpoch(filePath)); } return new PipeRealtimeCollectEvent( event, - // TODO: we have to make sure that the TsFileInsertionEvent is the last event of the - // TsFileEpoch's life cycle filePath2Epoch.remove(filePath), resource.getDevices().stream() .collect(Collectors.toMap(device -> device, device -> EMPTY_MEASUREMENT_ARRAY)), diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java index 929a7ce4f4744d37d048daf4442b0ff80b10c19a..f69f55cd6f704365b615658000799b3b20343f8c 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java @@ -25,7 +25,6 @@ import org.apache.iotdb.commons.exception.sync.PipeSinkException; import org.apache.iotdb.commons.sync.pipe.PipeInfo; import org.apache.iotdb.commons.sync.pipe.PipeMessage; import org.apache.iotdb.commons.sync.pipesink.PipeSink; -import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq; import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp; import org.apache.iotdb.db.client.ConfigNodeClient; @@ -41,7 +40,6 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; -import java.util.stream.Collectors; /** Only fetch read request. For write request, return SUCCESS directly. */ public class ClusterSyncInfoFetcher implements ISyncInfoFetcher { @@ -111,16 +109,7 @@ public class ClusterSyncInfoFetcher implements ISyncInfoFetcher { @Override public List getAllPipeInfos() { - try (ConfigNodeClient configNodeClient = - CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - TGetAllPipeInfoResp resp = configNodeClient.getAllPipeInfo(); - return resp.getAllPipeInfo().stream() - .map(PipeInfo::deserializePipeInfo) - .collect(Collectors.toList()); - } catch (Exception e) { - LOGGER.error("Get AllPipeInfos error because {}", e.getMessage(), e); - return Collections.emptyList(); - } + return Collections.emptyList(); } @Override