提交 0b39fcad 编写于 作者: Y yukon

[ROCKETMQ-145][HOTFIX] Resolve concureent issue in HAService and GroupCommitService

上级 11653ce2
...@@ -1101,14 +1101,14 @@ public class CommitLog { ...@@ -1101,14 +1101,14 @@ public class CommitLog {
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
public void putRequest(final GroupCommitRequest request) { public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this) { synchronized (this.requestsWrite) {
this.requestsWrite.add(request); this.requestsWrite.add(request);
}
if (hasNotified.compareAndSet(false, true)) { if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify waitPoint.countDown(); // notify
} }
} }
}
private void swapRequests() { private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite; List<GroupCommitRequest> tmp = this.requestsWrite;
...@@ -1117,6 +1117,7 @@ public class CommitLog { ...@@ -1117,6 +1117,7 @@ public class CommitLog {
} }
private void doCommit() { private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) { if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) { for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of // There may be a message in the next file, so a maximum of
...@@ -1145,6 +1146,7 @@ public class CommitLog { ...@@ -1145,6 +1146,7 @@ public class CommitLog {
CommitLog.this.mappedFileQueue.flush(0); CommitLog.this.mappedFileQueue.flush(0);
} }
} }
}
public void run() { public void run() {
CommitLog.log.info(this.getServiceName() + " service started"); CommitLog.log.info(this.getServiceName() + " service started");
......
...@@ -253,14 +253,14 @@ public class HAService { ...@@ -253,14 +253,14 @@ public class HAService {
private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>(); private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>();
private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>(); private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>();
public void putRequest(final CommitLog.GroupCommitRequest request) { public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
synchronized (this) { synchronized (this.requestsWrite) {
this.requestsWrite.add(request); this.requestsWrite.add(request);
}
if (hasNotified.compareAndSet(false, true)) { if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify waitPoint.countDown(); // notify
} }
} }
}
public void notifyTransferSome() { public void notifyTransferSome() {
this.notifyTransferObject.wakeup(); this.notifyTransferObject.wakeup();
...@@ -273,6 +273,7 @@ public class HAService { ...@@ -273,6 +273,7 @@ public class HAService {
} }
private void doWaitTransfer() { private void doWaitTransfer() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) { if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) { for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
...@@ -291,6 +292,7 @@ public class HAService { ...@@ -291,6 +292,7 @@ public class HAService {
this.requestsRead.clear(); this.requestsRead.clear();
} }
} }
}
public void run() { public void run() {
log.info(this.getServiceName() + " service started"); log.info(this.getServiceName() + " service started");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册