diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index b6ed07a869e8289a8065e5970edc2e3884f328c8..223c0372372c389e403815f37c00ea1a0c2f0212 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -208,7 +208,7 @@ public class ConfigManager implements IManager { private static final CommonConfig COMMON_CONF = CommonDescriptor.getInstance().getConfig(); /** Manage PartitionTable read/write requests through the ConsensusLayer. */ - private AtomicReference consensusManager; + private final AtomicReference consensusManager = new AtomicReference<>(); /** Manage cluster node. */ private final NodeManager nodeManager; @@ -305,12 +305,11 @@ public class ConfigManager implements IManager { } public void initConsensusManager() throws IOException { - ConsensusManager consensusManager = new ConsensusManager(this, this.stateMachine); - this.consensusManager = new AtomicReference<>(consensusManager); + this.consensusManager.set(new ConsensusManager(this, this.stateMachine)); } public void close() throws IOException { - if (consensusManager != null) { + if (consensusManager.get() != null) { consensusManager.get().close(); } if (partitionManager != null) { @@ -1122,7 +1121,7 @@ public class ConfigManager implements IManager { public TSStatus createPeerForConsensusGroup(List configNodeLocations) { for (int i = 0; i < 30; i++) { try { - if (consensusManager == null) { + if (consensusManager.get() == null) { Thread.sleep(1000); } else { // When add non Seed-ConfigNode to the ConfigNodeGroup, the parameter should be emptyList diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java index 36bc8afcb4c91929abae2910f77e4348ecf47044..d0b4a016fa169fb263c73912011de8c23f714f45 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java @@ -64,7 +64,7 @@ public abstract class Procedure implements Comparable> { private volatile long timeout = NO_TIMEOUT; private volatile long lastUpdate; - private AtomicReference result = null; + private final AtomicReference result = new AtomicReference<>(); private volatile boolean locked = false; private boolean lockedWhenLoading = false; @@ -173,7 +173,7 @@ public abstract class Procedure implements Comparable> { } // result - if (result != null) { + if (result.get() != null) { stream.writeInt(result.get().length); stream.write(result.get()); } else { @@ -652,7 +652,7 @@ public abstract class Procedure implements Comparable> { * @param result the serialized result that will be passed to the client */ protected void setResult(byte[] result) { - this.result = new AtomicReference<>(result); + this.result.set(result); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index d8c5db84328da200b8f4778b727c94a70f2ff3e7..c988f210824fdac325da19de011a63682130a492 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -710,7 +710,7 @@ public class ProcedureExecutor { private class WorkerThread extends StoppableThread { private final AtomicLong startTime = new AtomicLong(Long.MAX_VALUE); - private AtomicReference> activeProcedure; + private final AtomicReference> activeProcedure = new AtomicReference<>(); protected long keepAliveTime = -1; public WorkerThread(ThreadGroup threadGroup) { @@ -736,19 +736,19 @@ public class ProcedureExecutor { if (procedure == null) { continue; } - this.activeProcedure = new AtomicReference<>(procedure); + this.activeProcedure.set(procedure); int activeCount = activeExecutorCount.incrementAndGet(); startTime.set(System.currentTimeMillis()); executeProcedure(procedure); activeCount = activeExecutorCount.decrementAndGet(); LOG.trace("Halt pid={}, activeCount={}", procedure.getProcId(), activeCount); - this.activeProcedure = null; + this.activeProcedure.set(null); lastUpdated = System.currentTimeMillis(); startTime.set(lastUpdated); } } catch (Throwable throwable) { - if (this.activeProcedure != null) { + if (this.activeProcedure.get() != null) { LOG.warn("Worker terminated {}", this.activeProcedure.get(), throwable); } } finally { @@ -804,7 +804,7 @@ public class ProcedureExecutor { // check if any of the worker is stuck int stuckCount = 0; for (WorkerThread worker : workerThreads) { - if (worker.activeProcedure == null + if (worker.activeProcedure.get() == null || worker.getCurrentRunTime() < DEFAULT_WORKER_STUCK_THRESHOLD) { continue; } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java index a150e5f123aac02d6c7d78ead3b10a235d760d21..3c02ca42e2f03f9f36323d58a5a68c3248ba0799 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RequestMessage.java @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; class RequestMessage implements Message { private final IConsensusRequest actualRequest; - private AtomicReference serializedContent = new AtomicReference<>(); + private final AtomicReference serializedContent = new AtomicReference<>(); RequestMessage(IConsensusRequest request) { this.actualRequest = request; @@ -45,9 +45,8 @@ class RequestMessage implements Message { if (serializedContent.get() == null) { synchronized (this) { if (serializedContent.get() == null) { - serializedContent = - new AtomicReference<>( - UnsafeByteOperations.unsafeWrap(actualRequest.serializeToByteBuffer())); + serializedContent.set( + UnsafeByteOperations.unsafeWrap(actualRequest.serializeToByteBuffer())); } } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java index ca9cffa896289bce634336474e51fc18ab59f07b..ce1b7c14593cea84e8fc7e613fb09a7398ed60b2 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ResponseMessage.java @@ -38,7 +38,7 @@ class ResponseMessage implements Message { */ private final Object contentHolder; - private AtomicReference serializedData = new AtomicReference<>(); + private final AtomicReference serializedData = new AtomicReference<>(); private final Logger logger = LoggerFactory.getLogger(ResponseMessage.class); ResponseMessage(Object content) { @@ -57,8 +57,7 @@ class ResponseMessage implements Message { assert contentHolder instanceof TSStatus; TSStatus status = (TSStatus) contentHolder; try { - serializedData = - new AtomicReference<>(ByteString.copyFrom(Utils.serializeTSStatus(status))); + serializedData.set(ByteString.copyFrom(Utils.serializeTSStatus(status))); } catch (TException e) { logger.warn("serialize TSStatus failed {}", status); }