提交 461b140d 编写于 作者: J JackieTien97

Revert "[IOTDB-6116] Disassociate the IoTConsensus retry logic from the forkjoinPool (#10872)"

This reverts commit b4455404.
上级 0cd30225
......@@ -22,8 +22,6 @@ package org.apache.iotdb.consensus.iot;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.RegisterManager;
......@@ -66,8 +64,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class IoTConsensus implements IConsensus {
......@@ -85,7 +81,6 @@ public class IoTConsensus implements IConsensus {
private final IoTConsensusConfig config;
private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager;
private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
private final ScheduledExecutorService retryService;
public IoTConsensus(ConsensusConfig config, Registry registry) {
this.thisNode = config.getThisNodeEndPoint();
......@@ -102,9 +97,6 @@ public class IoTConsensus implements IConsensus {
new IClientManager.Factory<TEndPoint, SyncIoTConsensusServiceClient>()
.createClientManager(
new SyncIoTConsensusServiceClientPoolFactory(config.getIoTConsensusConfig()));
this.retryService =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.LOG_DISPATCHER_RETRY_EXECUTOR.getName());
// init IoTConsensus memory manager
IoTConsensusMemoryManager.getInstance()
.init(
......@@ -141,7 +133,6 @@ public class IoTConsensus implements IConsensus {
new Peer(consensusGroupId, thisNodeId, thisNode),
new ArrayList<>(),
registry.apply(consensusGroupId),
retryService,
clientManager,
syncClientManager,
config);
......@@ -158,13 +149,6 @@ public class IoTConsensus implements IConsensus {
clientManager.close();
syncClientManager.close();
registerManager.deregisterAll();
retryService.shutdown();
try {
retryService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn("{}: interrupted when shutting down add Executor with exception {}", this, e);
Thread.currentThread().interrupt();
}
}
@Override
......@@ -230,7 +214,6 @@ public class IoTConsensus implements IConsensus {
new Peer(groupId, thisNodeId, thisNode),
peers,
registry.apply(groupId),
retryService,
clientManager,
syncClientManager,
config);
......
......@@ -80,7 +80,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
......@@ -113,14 +112,12 @@ public class IoTConsensusServerImpl {
private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
private final IoTConsensusServerMetrics ioTConsensusServerMetrics;
private final String consensusGroupId;
private final ScheduledExecutorService retryService;
public IoTConsensusServerImpl(
String storageDir,
Peer thisNode,
List<Peer> configuration,
IStateMachine stateMachine,
ScheduledExecutorService retryService,
IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager,
IoTConsensusConfig config) {
......@@ -136,7 +133,6 @@ public class IoTConsensusServerImpl {
} else {
persistConfiguration();
}
this.retryService = retryService;
this.config = config;
this.consensusGroupId = thisNode.getGroupId().toString();
consensusReqReader = (ConsensusReqReader) stateMachine.read(new GetConsensusReqReaderPlan());
......@@ -736,10 +732,6 @@ public class IoTConsensusServerImpl {
return searchIndex;
}
public ScheduledExecutorService getRetryService() {
return retryService;
}
public boolean isReadOnly() {
return stateMachine.isReadOnly();
}
......
......@@ -29,7 +29,7 @@ import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;
public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogEntriesRes> {
......@@ -88,29 +88,31 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogEntriesRe
}
private void sleepCorrespondingTimeAndRetryAsynchronous() {
long sleepTime =
Math.min(
(long)
(thread.getConfig().getReplication().getBasicRetryWaitTimeMs()
* Math.pow(2, retryCount)),
thread.getConfig().getReplication().getMaxRetryWaitTimeMs());
thread
.getImpl()
.getRetryService()
.schedule(
() -> {
if (thread.isStopped()) {
logger.debug(
"LogDispatcherThread {} has been stopped, "
+ "we will not retrying this Batch {} after {} times",
thread.getPeer(),
batch,
retryCount);
} else {
thread.sendBatchAsync(batch, this);
}
},
sleepTime,
TimeUnit.MILLISECONDS);
// TODO handle forever retry
CompletableFuture.runAsync(
() -> {
try {
long defaultSleepTime =
(long)
(thread.getConfig().getReplication().getBasicRetryWaitTimeMs()
* Math.pow(2, retryCount));
Thread.sleep(
Math.min(
defaultSleepTime, thread.getConfig().getReplication().getMaxRetryWaitTimeMs()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Unexpected interruption during retry pending batch");
}
if (thread.isStopped()) {
logger.debug(
"LogDispatcherThread {} has been stopped, "
+ "we will not retrying this Batch {} after {} times",
thread.getPeer(),
batch,
retryCount);
} else {
thread.sendBatchAsync(batch, this);
}
});
}
}
......@@ -299,10 +299,6 @@ public class LogDispatcher {
return stopped;
}
public IoTConsensusServerImpl getImpl() {
return impl;
}
@Override
public void run() {
logger.info("{}: Dispatcher for {} starts", impl.getThisNode(), peer);
......
......@@ -93,6 +93,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
......@@ -125,6 +126,7 @@ class RatisConsensus implements IConsensus {
/** TODO make it configurable */
private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20);
private final ExecutorService addExecutor;
private final ScheduledExecutorService diskGuardian;
private final long triggerSnapshotThreshold;
......@@ -154,6 +156,7 @@ class RatisConsensus implements IConsensus {
this.ratisMetricSet = new RatisMetricSet();
this.triggerSnapshotThreshold = this.config.getImpl().getTriggerSnapshotFileSize();
addExecutor = IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.RATIS_ADD.getName());
diskGuardian =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.RATIS_BG_DISK_GUARDIAN.getName());
......@@ -186,8 +189,10 @@ class RatisConsensus implements IConsensus {
@Override
public void stop() throws IOException {
addExecutor.shutdown();
diskGuardian.shutdown();
try {
addExecutor.awaitTermination(5, TimeUnit.SECONDS);
diskGuardian.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn("{}: interrupted when shutting down add Executor with exception {}", this, e);
......@@ -195,8 +200,8 @@ class RatisConsensus implements IConsensus {
} finally {
clientManager.close();
server.close();
MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
}
MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
}
private boolean shouldRetry(RaftClientReply reply) {
......
......@@ -1070,7 +1070,7 @@ public class IoTDBConfig {
// IoTConsensus Config
private int maxLogEntriesNumPerBatch = 1024;
private int maxSizePerBatch = 16 * 1024 * 1024;
private int maxPendingBatchesNum = 5;
private int maxPendingBatchesNum = 12;
private double maxMemoryRatioForQueue = 0.6;
/** Pipe related */
......
......@@ -103,13 +103,13 @@ public enum ThreadName {
IOT_CONSENSUS_RPC_PROCESSOR("IoTConsensusRPC-Processor"),
ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL("AsyncDataNodeIoTConsensusServiceClientPool"),
LOG_DISPATCHER("LogDispatcher"),
LOG_DISPATCHER_RETRY_EXECUTOR("LogDispatcherRetryExecutor"),
// -------------------------- Ratis --------------------------
// NOTICE: The thread name of ratis cannot be edited here!
// We list the thread name here just for distinguishing what module the thread belongs to.
RAFT_SERVER_PROXY_EXECUTOR("\\d+-impl-thread"),
RAFT_SERVER_EXECUTOR("\\d+-server-thread"),
RAFT_SERVER_CLIENT_EXECUTOR("\\d+-client-thread"),
RATIS_ADD("Ratis-Add"),
SEGMENT_RAFT_WORKER("SegmentedRaftLogWorker"),
STATE_MACHINE_UPDATER("StateMachineUpdater"),
FOLLOWER_STATE("FollowerState"),
......@@ -236,8 +236,7 @@ public enum ThreadName {
IOT_CONSENSUS_RPC_SERVICE,
IOT_CONSENSUS_RPC_PROCESSOR,
ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL,
LOG_DISPATCHER,
LOG_DISPATCHER_RETRY_EXECUTOR));
LOG_DISPATCHER));
private static final Set<ThreadName> ratisThreadNames =
new HashSet<>(
......@@ -245,6 +244,7 @@ public enum ThreadName {
RAFT_SERVER_PROXY_EXECUTOR,
RAFT_SERVER_EXECUTOR,
RAFT_SERVER_CLIENT_EXECUTOR,
RATIS_ADD,
SEGMENT_RAFT_WORKER,
STATE_MACHINE_UPDATER,
FOLLOWER_STATE,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册