diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index cce6481b8daa119def971b3c4c2f95d4b3e6502a..43b01f08d8e2a6f3f0418e7ef996a428535ecc4c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -19,8 +19,8 @@ package org.apache.rocketmq.store; import java.net.Inet6Address; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -1403,48 +1403,55 @@ public class CommitLog { * GroupCommit Service */ class GroupCommitService extends FlushCommitLogService { - private volatile List requestsWrite = new ArrayList(); - private volatile List requestsRead = new ArrayList(); + private volatile LinkedList requestsWrite = new LinkedList(); + private volatile LinkedList requestsRead = new LinkedList(); + private final PutMessageSpinLock lock = new PutMessageSpinLock(); public synchronized void putRequest(final GroupCommitRequest request) { - synchronized (this.requestsWrite) { + lock.lock(); + try { this.requestsWrite.add(request); + } finally { + lock.unlock(); } this.wakeup(); } private void swapRequests() { - List tmp = this.requestsWrite; - this.requestsWrite = this.requestsRead; - this.requestsRead = tmp; + lock.lock(); + try { + LinkedList tmp = this.requestsWrite; + this.requestsWrite = this.requestsRead; + this.requestsRead = tmp; + } finally { + lock.unlock(); + } } private void doCommit() { - synchronized (this.requestsRead) { - if (!this.requestsRead.isEmpty()) { - for (GroupCommitRequest req : this.requestsRead) { - // There may be a message in the next file, so a maximum of - // two times the flush - boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); - for (int i = 0; i < 2 && !flushOK; i++) { - CommitLog.this.mappedFileQueue.flush(0); - flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); - } - - req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); + if (!this.requestsRead.isEmpty()) { + for (GroupCommitRequest req : this.requestsRead) { + // There may be a message in the next file, so a maximum of + // two times the flush + boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); + for (int i = 0; i < 2 && !flushOK; i++) { + CommitLog.this.mappedFileQueue.flush(0); + flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); } - long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); - if (storeTimestamp > 0) { - CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); - } + req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); + } - this.requestsRead.clear(); - } else { - // Because of individual messages is set to not sync flush, it - // will come to this process - CommitLog.this.mappedFileQueue.flush(0); + long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); + if (storeTimestamp > 0) { + CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } + + this.requestsRead = new LinkedList<>(); + } else { + // Because of individual messages is set to not sync flush, it + // will come to this process + CommitLog.this.mappedFileQueue.flush(0); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java index 34c51eb9e26fc5df4efbc8bef58d360f759a5fe5..d4d410996789d6de64d7b66fcf3804b25af3ba41 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java @@ -25,7 +25,6 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -39,6 +38,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.store.CommitLog; import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.PutMessageSpinLock; import org.apache.rocketmq.store.PutMessageStatus; public class HAService { @@ -254,12 +254,16 @@ public class HAService { class GroupTransferService extends ServiceThread { private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject(); - private volatile List requestsWrite = new ArrayList<>(); - private volatile List requestsRead = new ArrayList<>(); + private final PutMessageSpinLock lock = new PutMessageSpinLock(); + private volatile LinkedList requestsWrite = new LinkedList<>(); + private volatile LinkedList requestsRead = new LinkedList<>(); - public synchronized void putRequest(final CommitLog.GroupCommitRequest request) { - synchronized (this.requestsWrite) { + public void putRequest(final CommitLog.GroupCommitRequest request) { + lock.lock(); + try { this.requestsWrite.add(request); + } finally { + lock.unlock(); } this.wakeup(); } @@ -269,32 +273,35 @@ public class HAService { } private void swapRequests() { - List tmp = this.requestsWrite; - this.requestsWrite = this.requestsRead; - this.requestsRead = tmp; + lock.lock(); + try { + LinkedList tmp = this.requestsWrite; + this.requestsWrite = this.requestsRead; + this.requestsRead = tmp; + } finally { + lock.unlock(); + } } private void doWaitTransfer() { - synchronized (this.requestsRead) { - if (!this.requestsRead.isEmpty()) { - for (CommitLog.GroupCommitRequest req : this.requestsRead) { - boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); - long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now() + if (!this.requestsRead.isEmpty()) { + for (CommitLog.GroupCommitRequest req : this.requestsRead) { + boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); + long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now() + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(); - while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) { - this.notifyTransferObject.waitForRunning(1000); - transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); - } - - if (!transferOK) { - log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); - } + while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) { + this.notifyTransferObject.waitForRunning(1000); + transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); + } - req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT); + if (!transferOK) { + log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); } - this.requestsRead.clear(); + req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } + + this.requestsRead = new LinkedList<>(); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java index 75b7597feade1c34092d8f6fee8d0d12f4126fe2..d5ed65f5f760f177127a56c196563c89d904d5e9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java @@ -20,40 +20,43 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; public class WaitNotifyObject { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); - protected final HashMap waitingThreadTable = - new HashMap(16); + protected final ConcurrentHashMap waitingThreadTable = + new ConcurrentHashMap(16); - protected volatile boolean hasNotified = false; + protected AtomicBoolean hasNotified = new AtomicBoolean(false); public void wakeup() { - synchronized (this) { - if (!this.hasNotified) { - this.hasNotified = true; + boolean needNotify = hasNotified.compareAndSet(false, true); + if (needNotify) { + synchronized (this) { this.notify(); } } } protected void waitForRunning(long interval) { + if (this.hasNotified.compareAndSet(true, false)) { + this.onWaitEnd(); + return; + } synchronized (this) { - if (this.hasNotified) { - this.hasNotified = false; - this.onWaitEnd(); - return; - } - try { + if (this.hasNotified.compareAndSet(true, false)) { + this.onWaitEnd(); + return; + } this.wait(interval); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { - this.hasNotified = false; + this.hasNotified.set(false); this.onWaitEnd(); } } @@ -63,15 +66,14 @@ public class WaitNotifyObject { } public void wakeupAll() { - synchronized (this) { - boolean needNotify = false; - - for (Map.Entry entry : this.waitingThreadTable.entrySet()) { - needNotify = needNotify || !entry.getValue(); - entry.setValue(true); + boolean needNotify = false; + for (Map.Entry entry : this.waitingThreadTable.entrySet()) { + if (entry.getValue().compareAndSet(false, true)) { + needNotify = true; } - - if (needNotify) { + } + if (needNotify) { + synchronized (this) { this.notifyAll(); } } @@ -79,20 +81,22 @@ public class WaitNotifyObject { public void allWaitForRunning(long interval) { long currentThreadId = Thread.currentThread().getId(); + AtomicBoolean notified = this.waitingThreadTable.computeIfAbsent(currentThreadId, k -> new AtomicBoolean(false)); + if (notified.compareAndSet(true, false)) { + this.onWaitEnd(); + return; + } synchronized (this) { - Boolean notified = this.waitingThreadTable.get(currentThreadId); - if (notified != null && notified) { - this.waitingThreadTable.put(currentThreadId, false); - this.onWaitEnd(); - return; - } - try { + if (notified.compareAndSet(true, false)) { + this.onWaitEnd(); + return; + } this.wait(interval); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { - this.waitingThreadTable.put(currentThreadId, false); + notified.set(false); this.onWaitEnd(); } }