From 461b140dc4f708f4a1d516e18dc2673da4db803f Mon Sep 17 00:00:00 2001 From: JackieTien97 Date: Tue, 22 Aug 2023 20:43:26 +0800 Subject: [PATCH] Revert "[IOTDB-6116] Disassociate the IoTConsensus retry logic from the forkjoinPool (#10872)" This reverts commit b445540475a046b657045fa0f76d0b71ab983875. --- .../iotdb/consensus/iot/IoTConsensus.java | 17 ------ .../consensus/iot/IoTConsensusServerImpl.java | 8 --- .../iot/client/DispatchLogHandler.java | 52 ++++++++++--------- .../iot/logdispatcher/LogDispatcher.java | 4 -- .../iotdb/consensus/ratis/RatisConsensus.java | 7 ++- .../org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../iotdb/commons/concurrent/ThreadName.java | 6 +-- 7 files changed, 37 insertions(+), 59 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 3afc4590c7..047d557ad5 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -22,8 +22,6 @@ package org.apache.iotdb.consensus.iot; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; -import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; -import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.RegisterManager; @@ -66,8 +64,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class IoTConsensus implements IConsensus { @@ -85,7 +81,6 @@ public class IoTConsensus implements IConsensus { private final IoTConsensusConfig config; private final IClientManager clientManager; private final IClientManager syncClientManager; - private final ScheduledExecutorService retryService; public IoTConsensus(ConsensusConfig config, Registry registry) { this.thisNode = config.getThisNodeEndPoint(); @@ -102,9 +97,6 @@ public class IoTConsensus implements IConsensus { new IClientManager.Factory() .createClientManager( new SyncIoTConsensusServiceClientPoolFactory(config.getIoTConsensusConfig())); - this.retryService = - IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( - ThreadName.LOG_DISPATCHER_RETRY_EXECUTOR.getName()); // init IoTConsensus memory manager IoTConsensusMemoryManager.getInstance() .init( @@ -141,7 +133,6 @@ public class IoTConsensus implements IConsensus { new Peer(consensusGroupId, thisNodeId, thisNode), new ArrayList<>(), registry.apply(consensusGroupId), - retryService, clientManager, syncClientManager, config); @@ -158,13 +149,6 @@ public class IoTConsensus implements IConsensus { clientManager.close(); syncClientManager.close(); registerManager.deregisterAll(); - retryService.shutdown(); - try { - retryService.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - logger.warn("{}: interrupted when shutting down add Executor with exception {}", this, e); - Thread.currentThread().interrupt(); - } } @Override @@ -230,7 +214,6 @@ public class IoTConsensus implements IConsensus { new Peer(groupId, thisNodeId, thisNode), peers, registry.apply(groupId), - retryService, clientManager, syncClientManager, config); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index 92f3584705..0b76a690db 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -80,7 +80,6 @@ import java.util.LinkedList; import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -113,14 +112,12 @@ public class IoTConsensusServerImpl { private final IClientManager syncClientManager; private final IoTConsensusServerMetrics ioTConsensusServerMetrics; private final String consensusGroupId; - private final ScheduledExecutorService retryService; public IoTConsensusServerImpl( String storageDir, Peer thisNode, List configuration, IStateMachine stateMachine, - ScheduledExecutorService retryService, IClientManager clientManager, IClientManager syncClientManager, IoTConsensusConfig config) { @@ -136,7 +133,6 @@ public class IoTConsensusServerImpl { } else { persistConfiguration(); } - this.retryService = retryService; this.config = config; this.consensusGroupId = thisNode.getGroupId().toString(); consensusReqReader = (ConsensusReqReader) stateMachine.read(new GetConsensusReqReaderPlan()); @@ -736,10 +732,6 @@ public class IoTConsensusServerImpl { return searchIndex; } - public ScheduledExecutorService getRetryService() { - return retryService; - } - public boolean isReadOnly() { return stateMachine.isReadOnly(); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java index f69ea0c2d7..94ba349c6d 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java @@ -29,7 +29,7 @@ import org.apache.thrift.async.AsyncMethodCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletableFuture; public class DispatchLogHandler implements AsyncMethodCallback { @@ -88,29 +88,31 @@ public class DispatchLogHandler implements AsyncMethodCallback { - if (thread.isStopped()) { - logger.debug( - "LogDispatcherThread {} has been stopped, " - + "we will not retrying this Batch {} after {} times", - thread.getPeer(), - batch, - retryCount); - } else { - thread.sendBatchAsync(batch, this); - } - }, - sleepTime, - TimeUnit.MILLISECONDS); + // TODO handle forever retry + CompletableFuture.runAsync( + () -> { + try { + long defaultSleepTime = + (long) + (thread.getConfig().getReplication().getBasicRetryWaitTimeMs() + * Math.pow(2, retryCount)); + Thread.sleep( + Math.min( + defaultSleepTime, thread.getConfig().getReplication().getMaxRetryWaitTimeMs())); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("Unexpected interruption during retry pending batch"); + } + if (thread.isStopped()) { + logger.debug( + "LogDispatcherThread {} has been stopped, " + + "we will not retrying this Batch {} after {} times", + thread.getPeer(), + batch, + retryCount); + } else { + thread.sendBatchAsync(batch, this); + } + }); } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index 30a2cc2d22..ef88d5a816 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -299,10 +299,6 @@ public class LogDispatcher { return stopped; } - public IoTConsensusServerImpl getImpl() { - return impl; - } - @Override public void run() { logger.info("{}: Dispatcher for {} starts", impl.getThisNode(), peer); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index d73964c700..055db6c0bb 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -93,6 +93,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -125,6 +126,7 @@ class RatisConsensus implements IConsensus { /** TODO make it configurable */ private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20); + private final ExecutorService addExecutor; private final ScheduledExecutorService diskGuardian; private final long triggerSnapshotThreshold; @@ -154,6 +156,7 @@ class RatisConsensus implements IConsensus { this.ratisMetricSet = new RatisMetricSet(); this.triggerSnapshotThreshold = this.config.getImpl().getTriggerSnapshotFileSize(); + addExecutor = IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.RATIS_ADD.getName()); diskGuardian = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( ThreadName.RATIS_BG_DISK_GUARDIAN.getName()); @@ -186,8 +189,10 @@ class RatisConsensus implements IConsensus { @Override public void stop() throws IOException { + addExecutor.shutdown(); diskGuardian.shutdown(); try { + addExecutor.awaitTermination(5, TimeUnit.SECONDS); diskGuardian.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { logger.warn("{}: interrupted when shutting down add Executor with exception {}", this, e); @@ -195,8 +200,8 @@ class RatisConsensus implements IConsensus { } finally { clientManager.close(); server.close(); - MetricService.getInstance().removeMetricSet(this.ratisMetricSet); } + MetricService.getInstance().removeMetricSet(this.ratisMetricSet); } private boolean shouldRetry(RaftClientReply reply) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 7ffd5a97d5..c7d0dedaf5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1070,7 +1070,7 @@ public class IoTDBConfig { // IoTConsensus Config private int maxLogEntriesNumPerBatch = 1024; private int maxSizePerBatch = 16 * 1024 * 1024; - private int maxPendingBatchesNum = 5; + private int maxPendingBatchesNum = 12; private double maxMemoryRatioForQueue = 0.6; /** Pipe related */ diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index ebc7a2ef51..78426603fc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -103,13 +103,13 @@ public enum ThreadName { IOT_CONSENSUS_RPC_PROCESSOR("IoTConsensusRPC-Processor"), ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL("AsyncDataNodeIoTConsensusServiceClientPool"), LOG_DISPATCHER("LogDispatcher"), - LOG_DISPATCHER_RETRY_EXECUTOR("LogDispatcherRetryExecutor"), // -------------------------- Ratis -------------------------- // NOTICE: The thread name of ratis cannot be edited here! // We list the thread name here just for distinguishing what module the thread belongs to. RAFT_SERVER_PROXY_EXECUTOR("\\d+-impl-thread"), RAFT_SERVER_EXECUTOR("\\d+-server-thread"), RAFT_SERVER_CLIENT_EXECUTOR("\\d+-client-thread"), + RATIS_ADD("Ratis-Add"), SEGMENT_RAFT_WORKER("SegmentedRaftLogWorker"), STATE_MACHINE_UPDATER("StateMachineUpdater"), FOLLOWER_STATE("FollowerState"), @@ -236,8 +236,7 @@ public enum ThreadName { IOT_CONSENSUS_RPC_SERVICE, IOT_CONSENSUS_RPC_PROCESSOR, ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL, - LOG_DISPATCHER, - LOG_DISPATCHER_RETRY_EXECUTOR)); + LOG_DISPATCHER)); private static final Set ratisThreadNames = new HashSet<>( @@ -245,6 +244,7 @@ public enum ThreadName { RAFT_SERVER_PROXY_EXECUTOR, RAFT_SERVER_EXECUTOR, RAFT_SERVER_CLIENT_EXECUTOR, + RATIS_ADD, SEGMENT_RAFT_WORKER, STATE_MACHINE_UPDATER, FOLLOWER_STATE, -- GitLab