From 0b39fcadfa2950f1dd3975e1262ccd544f350750 Mon Sep 17 00:00:00 2001 From: yukon Date: Sat, 18 Mar 2017 15:34:39 +0800 Subject: [PATCH] [ROCKETMQ-145][HOTFIX] Resolve concureent issue in HAService and GroupCommitService --- .../org/apache/rocketmq/store/CommitLog.java | 54 ++++++++++--------- .../apache/rocketmq/store/ha/HAService.java | 36 +++++++------ 2 files changed, 47 insertions(+), 43 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index d81672f9..5be8258e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -1101,12 +1101,12 @@ public class CommitLog { private volatile List requestsWrite = new ArrayList(); private volatile List requestsRead = new ArrayList(); - public void putRequest(final GroupCommitRequest request) { - synchronized (this) { + public synchronized void putRequest(final GroupCommitRequest request) { + synchronized (this.requestsWrite) { this.requestsWrite.add(request); - if (hasNotified.compareAndSet(false, true)) { - waitPoint.countDown(); // notify - } + } + if (hasNotified.compareAndSet(false, true)) { + waitPoint.countDown(); // notify } } @@ -1117,32 +1117,34 @@ public class CommitLog { } private void doCommit() { - if (!this.requestsRead.isEmpty()) { - for (GroupCommitRequest req : this.requestsRead) { - // There may be a message in the next file, so a maximum of - // two times the flush - boolean flushOK = false; - for (int i = 0; i < 2 && !flushOK; i++) { - flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); - - if (!flushOK) { - CommitLog.this.mappedFileQueue.flush(0); + synchronized (this.requestsRead) { + if (!this.requestsRead.isEmpty()) { + for (GroupCommitRequest req : this.requestsRead) { + // There may be a message in the next file, so a maximum of + // two times the flush + boolean flushOK = false; + for (int i = 0; i < 2 && !flushOK; i++) { + flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); + + if (!flushOK) { + CommitLog.this.mappedFileQueue.flush(0); + } } + + req.wakeupCustomer(flushOK); } - req.wakeupCustomer(flushOK); - } + long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); + if (storeTimestamp > 0) { + CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); + } - long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); - if (storeTimestamp > 0) { - CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); + this.requestsRead.clear(); + } else { + // Because of individual messages is set to not sync flush, it + // will come to this process + CommitLog.this.mappedFileQueue.flush(0); } - - this.requestsRead.clear(); - } else { - // Because of individual messages is set to not sync flush, it - // will come to this process - CommitLog.this.mappedFileQueue.flush(0); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java index 762bdb6a..f507b368 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java @@ -253,12 +253,12 @@ public class HAService { private volatile List requestsWrite = new ArrayList<>(); private volatile List requestsRead = new ArrayList<>(); - public void putRequest(final CommitLog.GroupCommitRequest request) { - synchronized (this) { + public synchronized void putRequest(final CommitLog.GroupCommitRequest request) { + synchronized (this.requestsWrite) { this.requestsWrite.add(request); - if (hasNotified.compareAndSet(false, true)) { - waitPoint.countDown(); // notify - } + } + if (hasNotified.compareAndSet(false, true)) { + waitPoint.countDown(); // notify } } @@ -273,22 +273,24 @@ public class HAService { } private void doWaitTransfer() { - if (!this.requestsRead.isEmpty()) { - for (CommitLog.GroupCommitRequest req : this.requestsRead) { - boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); - for (int i = 0; !transferOK && i < 5; i++) { - this.notifyTransferObject.waitForRunning(1000); - transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); - } + synchronized (this.requestsRead) { + if (!this.requestsRead.isEmpty()) { + for (CommitLog.GroupCommitRequest req : this.requestsRead) { + boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); + for (int i = 0; !transferOK && i < 5; i++) { + this.notifyTransferObject.waitForRunning(1000); + transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); + } - if (!transferOK) { - log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); + if (!transferOK) { + log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); + } + + req.wakeupCustomer(transferOK); } - req.wakeupCustomer(transferOK); + this.requestsRead.clear(); } - - this.requestsRead.clear(); } } -- GitLab