未验证 提交 a3c1b68d 编写于 作者: P Potato 提交者: GitHub

Fix semantic incompatibility when AtomicReference replaces volatile (#10418)

Signed-off-by: NOneSizeFitQuorum <tanxinyu@apache.org>
上级 dcb1f73f
......@@ -208,7 +208,7 @@ public class ConfigManager implements IManager {
private static final CommonConfig COMMON_CONF = CommonDescriptor.getInstance().getConfig();
/** Manage PartitionTable read/write requests through the ConsensusLayer. */
private AtomicReference<ConsensusManager> consensusManager;
private final AtomicReference<ConsensusManager> consensusManager = new AtomicReference<>();
/** Manage cluster node. */
private final NodeManager nodeManager;
......@@ -305,12 +305,11 @@ public class ConfigManager implements IManager {
}
public void initConsensusManager() throws IOException {
ConsensusManager consensusManager = new ConsensusManager(this, this.stateMachine);
this.consensusManager = new AtomicReference<>(consensusManager);
this.consensusManager.set(new ConsensusManager(this, this.stateMachine));
}
public void close() throws IOException {
if (consensusManager != null) {
if (consensusManager.get() != null) {
consensusManager.get().close();
}
if (partitionManager != null) {
......@@ -1122,7 +1121,7 @@ public class ConfigManager implements IManager {
public TSStatus createPeerForConsensusGroup(List<TConfigNodeLocation> configNodeLocations) {
for (int i = 0; i < 30; i++) {
try {
if (consensusManager == null) {
if (consensusManager.get() == null) {
Thread.sleep(1000);
} else {
// When add non Seed-ConfigNode to the ConfigNodeGroup, the parameter should be emptyList
......
......@@ -64,7 +64,7 @@ public abstract class Procedure<Env> implements Comparable<Procedure<Env>> {
private volatile long timeout = NO_TIMEOUT;
private volatile long lastUpdate;
private AtomicReference<byte[]> result = null;
private final AtomicReference<byte[]> result = new AtomicReference<>();
private volatile boolean locked = false;
private boolean lockedWhenLoading = false;
......@@ -173,7 +173,7 @@ public abstract class Procedure<Env> implements Comparable<Procedure<Env>> {
}
// result
if (result != null) {
if (result.get() != null) {
stream.writeInt(result.get().length);
stream.write(result.get());
} else {
......@@ -652,7 +652,7 @@ public abstract class Procedure<Env> implements Comparable<Procedure<Env>> {
* @param result the serialized result that will be passed to the client
*/
protected void setResult(byte[] result) {
this.result = new AtomicReference<>(result);
this.result.set(result);
}
/**
......
......@@ -710,7 +710,7 @@ public class ProcedureExecutor<Env> {
private class WorkerThread extends StoppableThread {
private final AtomicLong startTime = new AtomicLong(Long.MAX_VALUE);
private AtomicReference<Procedure<Env>> activeProcedure;
private final AtomicReference<Procedure<Env>> activeProcedure = new AtomicReference<>();
protected long keepAliveTime = -1;
public WorkerThread(ThreadGroup threadGroup) {
......@@ -736,19 +736,19 @@ public class ProcedureExecutor<Env> {
if (procedure == null) {
continue;
}
this.activeProcedure = new AtomicReference<>(procedure);
this.activeProcedure.set(procedure);
int activeCount = activeExecutorCount.incrementAndGet();
startTime.set(System.currentTimeMillis());
executeProcedure(procedure);
activeCount = activeExecutorCount.decrementAndGet();
LOG.trace("Halt pid={}, activeCount={}", procedure.getProcId(), activeCount);
this.activeProcedure = null;
this.activeProcedure.set(null);
lastUpdated = System.currentTimeMillis();
startTime.set(lastUpdated);
}
} catch (Throwable throwable) {
if (this.activeProcedure != null) {
if (this.activeProcedure.get() != null) {
LOG.warn("Worker terminated {}", this.activeProcedure.get(), throwable);
}
} finally {
......@@ -804,7 +804,7 @@ public class ProcedureExecutor<Env> {
// check if any of the worker is stuck
int stuckCount = 0;
for (WorkerThread worker : workerThreads) {
if (worker.activeProcedure == null
if (worker.activeProcedure.get() == null
|| worker.getCurrentRunTime() < DEFAULT_WORKER_STUCK_THRESHOLD) {
continue;
}
......
......@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
class RequestMessage implements Message {
private final IConsensusRequest actualRequest;
private AtomicReference<ByteString> serializedContent = new AtomicReference<>();
private final AtomicReference<ByteString> serializedContent = new AtomicReference<>();
RequestMessage(IConsensusRequest request) {
this.actualRequest = request;
......@@ -45,9 +45,8 @@ class RequestMessage implements Message {
if (serializedContent.get() == null) {
synchronized (this) {
if (serializedContent.get() == null) {
serializedContent =
new AtomicReference<>(
UnsafeByteOperations.unsafeWrap(actualRequest.serializeToByteBuffer()));
serializedContent.set(
UnsafeByteOperations.unsafeWrap(actualRequest.serializeToByteBuffer()));
}
}
}
......
......@@ -38,7 +38,7 @@ class ResponseMessage implements Message {
*/
private final Object contentHolder;
private AtomicReference<ByteString> serializedData = new AtomicReference<>();
private final AtomicReference<ByteString> serializedData = new AtomicReference<>();
private final Logger logger = LoggerFactory.getLogger(ResponseMessage.class);
ResponseMessage(Object content) {
......@@ -57,8 +57,7 @@ class ResponseMessage implements Message {
assert contentHolder instanceof TSStatus;
TSStatus status = (TSStatus) contentHolder;
try {
serializedData =
new AtomicReference<>(ByteString.copyFrom(Utils.serializeTSStatus(status)));
serializedData.set(ByteString.copyFrom(Utils.serializeTSStatus(status)));
} catch (TException e) {
logger.warn("serialize TSStatus failed {}", status);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册