diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index b4bf298ba80468b7cbe4b211546aeb1a11c58a15..3bbe675854f0becad7a190d2c752d435be7eb857 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -995,12 +995,12 @@ public class CommitLog { private volatile List requestsWrite = new ArrayList(); private volatile List requestsRead = new ArrayList(); - public void putRequest(final GroupCommitRequest request) { - synchronized (this) { + public synchronized void putRequest(final GroupCommitRequest request) { + synchronized (this.requestsWrite) { this.requestsWrite.add(request); - if (hasNotified.compareAndSet(false, true)) { - waitPoint.countDown(); // notify - } + } + if (hasNotified.compareAndSet(false, true)) { + waitPoint.countDown(); // notify } } @@ -1011,32 +1011,34 @@ public class CommitLog { } private void doCommit() { - if (!this.requestsRead.isEmpty()) { - for (GroupCommitRequest req : this.requestsRead) { - // There may be a message in the next file, so a maximum of - // two times the flush - boolean flushOK = false; - for (int i = 0; i < 2 && !flushOK; i++) { - flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); - - if (!flushOK) { - CommitLog.this.mappedFileQueue.flush(0); + synchronized (this.requestsRead) { + if (!this.requestsRead.isEmpty()) { + for (GroupCommitRequest req : this.requestsRead) { + // There may be a message in the next file, so a maximum of + // two times the flush + boolean flushOK = false; + for (int i = 0; i < 2 && !flushOK; i++) { + flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); + + if (!flushOK) { + CommitLog.this.mappedFileQueue.flush(0); + } } + + req.wakeupCustomer(flushOK); } - req.wakeupCustomer(flushOK); - } + long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); + if (storeTimestamp > 0) { + CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); + } - long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); - if (storeTimestamp > 0) { - CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); + this.requestsRead.clear(); + } else { + // Because of individual messages is set to not sync flush, it + // will come to this process + CommitLog.this.mappedFileQueue.flush(0); } - - this.requestsRead.clear(); - } else { - // Because of individual messages is set to not sync flush, it - // will come to this process - CommitLog.this.mappedFileQueue.flush(0); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java index 762bdb6adca0c052ff49c6f1cf3f259ec38a36cc..f507b368f07b4dd913b9f9ec857bf76f60085b10 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java @@ -253,12 +253,12 @@ public class HAService { private volatile List requestsWrite = new ArrayList<>(); private volatile List requestsRead = new ArrayList<>(); - public void putRequest(final CommitLog.GroupCommitRequest request) { - synchronized (this) { + public synchronized void putRequest(final CommitLog.GroupCommitRequest request) { + synchronized (this.requestsWrite) { this.requestsWrite.add(request); - if (hasNotified.compareAndSet(false, true)) { - waitPoint.countDown(); // notify - } + } + if (hasNotified.compareAndSet(false, true)) { + waitPoint.countDown(); // notify } } @@ -273,22 +273,24 @@ public class HAService { } private void doWaitTransfer() { - if (!this.requestsRead.isEmpty()) { - for (CommitLog.GroupCommitRequest req : this.requestsRead) { - boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); - for (int i = 0; !transferOK && i < 5; i++) { - this.notifyTransferObject.waitForRunning(1000); - transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); - } + synchronized (this.requestsRead) { + if (!this.requestsRead.isEmpty()) { + for (CommitLog.GroupCommitRequest req : this.requestsRead) { + boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); + for (int i = 0; !transferOK && i < 5; i++) { + this.notifyTransferObject.waitForRunning(1000); + transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); + } - if (!transferOK) { - log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); + if (!transferOK) { + log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); + } + + req.wakeupCustomer(transferOK); } - req.wakeupCustomer(transferOK); + this.requestsRead.clear(); } - - this.requestsRead.clear(); } }