提交 56255be4 编写于 作者: S Steve Yurong Su

[IOTDB-5968] Pipe: pipe task does not work properly after cluster reboot (#10046)

* fix: dead lock issue found in NodeManager.java

* fix: dead lock issue found in SimpleProgressIndex.java

* fix: logic error of 'pipe task start' found in PipeTaskAgent.java , which may cause data consistency issues

* fix: NPE error from ClusterSyncInfoFetcher.java

* improvement: reduce some exception log print

* improvement: introduce some log print for progress index reporting and recovering

(cherry picked from commit 5d78f1a9)
上级 18e485a9
......@@ -206,8 +206,10 @@ public class NodeManager {
}
private TRuntimeConfiguration getRuntimeConfiguration() {
getPipeManager().getPipePluginCoordinator().lock();
// getPipeTaskCoordinator.lock() should be called outside the getPipePluginCoordinator().lock()
// to avoid deadlock
getPipeManager().getPipeTaskCoordinator().lock();
getPipeManager().getPipePluginCoordinator().lock();
getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
getUDFManager().getUdfInfo().acquireUDFTableLock();
......@@ -226,8 +228,11 @@ public class NodeManager {
} finally {
getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
getUDFManager().getUdfInfo().releaseUDFTableLock();
getPipeManager().getPipeTaskCoordinator().unlock();
getPipeManager().getPipePluginCoordinator().unlock();
// getPipeTaskCoordinator.unlock() should be called outside the
// getPipePluginCoordinator().unlock()
// to avoid deadlock
getPipeManager().getPipeTaskCoordinator().unlock();
}
}
......
......@@ -223,22 +223,23 @@ public class PipeTaskInfo implements SnapshotProcessor {
dataRegionGroupId,
new PipeTaskMeta(
new MinimumProgressIndex(), newDataRegionLeader));
} else {
LOGGER.warn(
"The pipe task meta does not contain the data region group {} or the data region group has already been removed",
dataRegionGroupId);
}
// else:
// "The pipe task meta does not contain the data region group {} or
// the data region group has already been removed"
}
}));
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
public TSStatus handleMetaChanges(PipeHandleMetaChangePlan plan) {
LOGGER.info("Handling pipe meta changes ...");
pipeMetaKeeper.clear();
plan.getPipeMetaList()
.forEach(
pipeMeta -> {
pipeMetaKeeper.addPipeMeta(pipeMeta.getStaticMeta().getPipeName(), pipeMeta);
LOGGER.info("Recording pipe meta: {}", pipeMeta);
});
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
......
......@@ -100,10 +100,10 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur
}
@Override
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeHandleLeaderChangeProcedure: executeFromHandleOnDataNodes");
pushPipeMetaToDataNodes(env);
pushPipeMetaToDataNodesIgnoreException(env);
}
@Override
......@@ -142,10 +142,10 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur
}
@Override
protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeHandleLeaderChangeProcedure: rollbackFromCreateOnDataNodes");
pushPipeMetaToDataNodes(env);
pushPipeMetaToDataNodesIgnoreException(env);
}
@Override
......
......@@ -138,9 +138,20 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV
.getValue()
.getProgressIndex()
.isAfter(runtimeMetaFromDataNode.getProgressIndex())) {
runtimeMetaOnConfigNode
.getValue()
.updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex());
LOGGER.info(
"Updating progress index for (pipe name: {}, consensus group id: {}) ... Progress index on config node: {}, progress index from data node: {}",
pipeMetaOnConfigNode.getStaticMeta().getPipeName(),
runtimeMetaOnConfigNode.getKey(),
runtimeMetaOnConfigNode.getValue().getProgressIndex(),
runtimeMetaFromDataNode.getProgressIndex());
LOGGER.info(
"Progress index for (pipe name: {}, consensus group id: {}) is updated to {}",
pipeMetaOnConfigNode.getStaticMeta().getPipeName(),
runtimeMetaOnConfigNode.getKey(),
runtimeMetaOnConfigNode
.getValue()
.updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex()));
needWriteConsensusOnConfigNodes = true;
}
......@@ -149,6 +160,7 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV
pipeTaskMetaOnConfigNode.clearExceptionMessages();
for (final PipeRuntimeException exception :
runtimeMetaFromDataNode.getExceptionMessages()) {
pipeTaskMetaOnConfigNode.trackExceptionMessage(exception);
if (exception instanceof PipeRuntimeCriticalException) {
......@@ -159,6 +171,7 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV
.get()
.equals(PipeStatus.STOPPED)) {
pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
needWriteConsensusOnConfigNodes = true;
needPushPipeMetaToDataNodes = true;
......@@ -181,6 +194,7 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV
.forEach(
status -> {
status.set(PipeStatus.STOPPED);
needWriteConsensusOnConfigNodes = true;
needPushPipeMetaToDataNodes = true;
......@@ -224,14 +238,14 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV
}
@Override
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeHandleMetaChangeProcedure: executeFromHandleOnDataNodes");
if (!needPushPipeMetaToDataNodes) {
return;
}
pushPipeMetaToDataNodes(env);
pushPipeMetaToDataNodesIgnoreException(env);
}
@Override
......
......@@ -66,10 +66,10 @@ public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 {
}
@Override
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeMetaSyncProcedure: executeFromOperateOnDataNodes");
pushPipeMetaToDataNodes(env);
pushPipeMetaToDataNodesIgnoreException(env);
}
@Override
......@@ -94,10 +94,10 @@ public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 {
}
@Override
protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("PipeMetaSyncProcedure: rollbackFromOperateOnDataNodes");
pushPipeMetaToDataNodes(env);
// do nothing
}
@Override
......
......@@ -200,6 +200,14 @@ public abstract class AbstractOperatePipeProcedureV2
}
}
protected void pushPipeMetaToDataNodesIgnoreException(ConfigNodeProcedureEnv env) {
try {
pushPipeMetaToDataNodes(env);
} catch (Throwable throwable) {
LOGGER.info("Failed to push pipe meta list to data nodes, will retry later.", throwable);
}
}
@Override
public void serialize(DataOutputStream stream) throws IOException {
super.serialize(stream);
......
......@@ -165,7 +165,7 @@ public class SimpleProgressIndex implements ProgressIndex {
// thatSimpleProgressIndex.memtableFlushOrderId
return this;
} finally {
lock.writeLock().lock();
lock.writeLock().unlock();
}
}
......
......@@ -52,8 +52,9 @@ public class PipeTaskMeta {
return progressIndex.get();
}
public void updateProgressIndex(ProgressIndex updateIndex) {
progressIndex.updateAndGet(index -> index.updateToMinimumIsAfterProgressIndex(updateIndex));
public ProgressIndex updateProgressIndex(ProgressIndex updateIndex) {
return progressIndex.updateAndGet(
index -> index.updateToMinimumIsAfterProgressIndex(updateIndex));
}
public int getLeaderDataNodeId() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.pipe.agent.runtime;
/** HeartbeatScheduler is used to schedule the heartbeat of the pipe. */
public class HeartbeatScheduler {}
......@@ -163,13 +163,19 @@ class PipeAgentLauncher {
PipeAgent.task()
.handlePipeMetaChanges(
getAllPipeInfoResp.getAllPipeInfo().stream()
.map(PipeMeta::deserialize)
.map(
byteBuffer -> {
final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer);
LOGGER.info(
"Pulled pipe meta from config node: {}, recovering ...", pipeMeta);
return pipeMeta;
})
.collect(Collectors.toList()));
} catch (Throwable throwable) {
} catch (Exception e) {
LOGGER.info(
"Failed to get pipe task meta from config node. Ignore the exception, "
+ "because config node may not be ready yet, and meta will be pushed by config node later.",
throwable);
e);
}
}
}
......@@ -49,6 +49,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
......@@ -94,11 +95,11 @@ public class PipeTaskAgent {
// if pipe meta does not exist on data node, create a new pipe
if (metaOnDataNode == null) {
createPipe(metaFromConfigNode);
if (metaFromConfigNode.getRuntimeMeta().getStatus().get() == PipeStatus.RUNNING) {
if (createPipe(metaFromConfigNode)) {
// if the status recorded in config node is RUNNING, start the pipe
startPipe(pipeName, metaFromConfigNode.getStaticMeta().getCreationTime());
}
// if the status is STOPPED or DROPPED, do nothing
// if the status recorded in config node is STOPPED or DROPPED, do nothing
continue;
}
......@@ -109,8 +110,7 @@ public class PipeTaskAgent {
// first check if pipe static meta has changed, if so, drop the pipe and create a new one
if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
dropPipe(pipeName);
createPipe(metaFromConfigNode);
if (metaFromConfigNode.getRuntimeMeta().getStatus().get() == PipeStatus.RUNNING) {
if (createPipe(metaFromConfigNode)) {
startPipe(pipeName, metaFromConfigNode.getStaticMeta().getCreationTime());
}
// if the status is STOPPED or DROPPED, do nothing
......@@ -245,9 +245,17 @@ public class PipeTaskAgent {
////////////////////////// Manage by Pipe Name //////////////////////////
private void createPipe(PipeMeta pipeMeta) {
final String pipeName = pipeMeta.getStaticMeta().getPipeName();
final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
/**
* Create a new pipe. If the pipe already exists, do nothing and return false. Otherwise, create
* the pipe and return true.
*
* @param pipeMetaFromConfigNode pipe meta from config node
* @return true if the pipe is created successfully and should be started, false if the pipe
* already exists or is created but should not be started
*/
private boolean createPipe(PipeMeta pipeMetaFromConfigNode) {
final String pipeName = pipeMetaFromConfigNode.getStaticMeta().getPipeName();
final long creationTime = pipeMetaFromConfigNode.getStaticMeta().getCreationTime();
final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
if (existedPipeMeta != null) {
......@@ -260,7 +268,7 @@ public class PipeTaskAgent {
pipeName,
creationTime,
existedPipeMeta.getRuntimeMeta().getStatus().get().name());
return;
return false;
case DROPPED:
LOGGER.info(
"Pipe {} (creation time = {}) has already been dropped, but the pipe task meta has not been cleaned up. "
......@@ -284,16 +292,25 @@ public class PipeTaskAgent {
}
// create pipe tasks and trigger create() method for each pipe task
final Map<TConsensusGroupId, PipeTask> pipeTasks = new PipeBuilder(pipeMeta).build();
final Map<TConsensusGroupId, PipeTask> pipeTasks =
new PipeBuilder(pipeMetaFromConfigNode).build();
for (PipeTask pipeTask : pipeTasks.values()) {
pipeTask.create();
}
pipeTaskManager.addPipeTasks(pipeMeta.getStaticMeta(), pipeTasks);
pipeTaskManager.addPipeTasks(pipeMetaFromConfigNode.getStaticMeta(), pipeTasks);
// No matter the pipe status from config node is RUNNING or STOPPED, we always set the status
// of pipe meta to STOPPED when it is created. The STOPPED status should always be the initial
// status of a pipe, which makes the status transition logic simpler.
final AtomicReference<PipeStatus> pipeStatusFromConfigNode =
pipeMetaFromConfigNode.getRuntimeMeta().getStatus();
final boolean needToStartPipe = pipeStatusFromConfigNode.get() == PipeStatus.RUNNING;
pipeStatusFromConfigNode.set(PipeStatus.STOPPED);
pipeMetaKeeper.addPipeMeta(pipeName, pipeMetaFromConfigNode);
// add pipe meta to pipe meta keeper
// note that we do not need to set the status of pipe meta here, because the status of pipe meta
// is already set to STOPPED when it is created
pipeMetaKeeper.addPipeMeta(pipeName, pipeMeta);
// If the pipe status from config node is RUNNING, we will start the pipe later.
return needToStartPipe;
}
private void dropPipe(String pipeName, long creationTime) {
......@@ -551,7 +568,8 @@ public class PipeTaskAgent {
public synchronized void collectPipeMetaList(THeartbeatReq req, THeartbeatResp resp)
throws TException {
if (!req.isNeedPipeMetaList()) {
// do nothing if data node is removing or removed, or request does not need pipe meta list
if (PipeAgent.runtime().isShutdown() || !req.isNeedPipeMetaList()) {
return;
}
......@@ -559,6 +577,7 @@ public class PipeTaskAgent {
try {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());
LOGGER.info("Reporting pipe meta: {}", pipeMeta);
}
} catch (IOException e) {
throw new TException(e);
......
......@@ -46,15 +46,13 @@ public class TsFileEpochManager {
// this would not happen, but just in case
if (!filePath2Epoch.containsKey(filePath)) {
LOGGER.warn(
String.format("PipeEngine: can not find TsFileEpoch for TsFile %s, create it", filePath));
LOGGER.info(
String.format("Pipe: can not find TsFileEpoch for TsFile %s, creating it", filePath));
filePath2Epoch.put(filePath, new TsFileEpoch(filePath));
}
return new PipeRealtimeCollectEvent(
event,
// TODO: we have to make sure that the TsFileInsertionEvent is the last event of the
// TsFileEpoch's life cycle
filePath2Epoch.remove(filePath),
resource.getDevices().stream()
.collect(Collectors.toMap(device -> device, device -> EMPTY_MEASUREMENT_ARRAY)),
......
......@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.exception.sync.PipeSinkException;
import org.apache.iotdb.commons.sync.pipe.PipeInfo;
import org.apache.iotdb.commons.sync.pipe.PipeMessage;
import org.apache.iotdb.commons.sync.pipesink.PipeSink;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.db.client.ConfigNodeClient;
......@@ -41,7 +40,6 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/** Only fetch read request. For write request, return SUCCESS directly. */
public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
......@@ -111,16 +109,7 @@ public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
@Override
public List<PipeInfo> getAllPipeInfos() {
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TGetAllPipeInfoResp resp = configNodeClient.getAllPipeInfo();
return resp.getAllPipeInfo().stream()
.map(PipeInfo::deserializePipeInfo)
.collect(Collectors.toList());
} catch (Exception e) {
LOGGER.error("Get AllPipeInfos error because {}", e.getMessage(), e);
return Collections.emptyList();
}
return Collections.emptyList();
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册