未验证 提交 d65778f3 编写于 作者: H huangli 提交者: GitHub

[ISSUE #2883] [Part B] Improve produce performance in M/S mode. (#2885)

* Optimise lock in WaitNotifyObject

* Remove lock in HAService

* Remove lock in GroupCommitService
上级 cba3e05d
...@@ -19,8 +19,8 @@ package org.apache.rocketmq.store; ...@@ -19,8 +19,8 @@ package org.apache.rocketmq.store;
import java.net.Inet6Address; import java.net.Inet6Address;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
...@@ -1403,48 +1403,55 @@ public class CommitLog { ...@@ -1403,48 +1403,55 @@ public class CommitLog {
* GroupCommit Service * GroupCommit Service
*/ */
class GroupCommitService extends FlushCommitLogService { class GroupCommitService extends FlushCommitLogService {
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();
private final PutMessageSpinLock lock = new PutMessageSpinLock();
public synchronized void putRequest(final GroupCommitRequest request) { public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) { lock.lock();
try {
this.requestsWrite.add(request); this.requestsWrite.add(request);
} finally {
lock.unlock();
} }
this.wakeup(); this.wakeup();
} }
private void swapRequests() { private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite; lock.lock();
this.requestsWrite = this.requestsRead; try {
this.requestsRead = tmp; LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
} finally {
lock.unlock();
}
} }
private void doCommit() { private void doCommit() {
synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) {
if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) {
for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of
// There may be a message in the next file, so a maximum of // two times the flush
// two times the flush boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); for (int i = 0; i < 2 && !flushOK; i++) {
for (int i = 0; i < 2 && !flushOK; i++) { CommitLog.this.mappedFileQueue.flush(0);
CommitLog.this.mappedFileQueue.flush(0); flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
} }
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
if (storeTimestamp > 0) { }
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear(); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
} else { if (storeTimestamp > 0) {
// Because of individual messages is set to not sync flush, it CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
} }
this.requestsRead = new LinkedList<>();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
} }
} }
......
...@@ -25,7 +25,6 @@ import java.nio.channels.SelectionKey; ...@@ -25,7 +25,6 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
...@@ -39,6 +38,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; ...@@ -39,6 +38,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.store.CommitLog; import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.PutMessageSpinLock;
import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.PutMessageStatus;
public class HAService { public class HAService {
...@@ -254,12 +254,16 @@ public class HAService { ...@@ -254,12 +254,16 @@ public class HAService {
class GroupTransferService extends ServiceThread { class GroupTransferService extends ServiceThread {
private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject(); private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>(); private final PutMessageSpinLock lock = new PutMessageSpinLock();
private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>(); private volatile LinkedList<CommitLog.GroupCommitRequest> requestsWrite = new LinkedList<>();
private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();
public synchronized void putRequest(final CommitLog.GroupCommitRequest request) { public void putRequest(final CommitLog.GroupCommitRequest request) {
synchronized (this.requestsWrite) { lock.lock();
try {
this.requestsWrite.add(request); this.requestsWrite.add(request);
} finally {
lock.unlock();
} }
this.wakeup(); this.wakeup();
} }
...@@ -269,32 +273,35 @@ public class HAService { ...@@ -269,32 +273,35 @@ public class HAService {
} }
private void swapRequests() { private void swapRequests() {
List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite; lock.lock();
this.requestsWrite = this.requestsRead; try {
this.requestsRead = tmp; LinkedList<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
} finally {
lock.unlock();
}
} }
private void doWaitTransfer() { private void doWaitTransfer() {
synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) {
if (!this.requestsRead.isEmpty()) { for (CommitLog.GroupCommitRequest req : this.requestsRead) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) { boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(); + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) { while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
this.notifyTransferObject.waitForRunning(1000); this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
} }
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT); if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
} }
this.requestsRead.clear(); req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
} }
this.requestsRead = new LinkedList<>();
} }
} }
......
...@@ -20,40 +20,43 @@ import org.apache.rocketmq.common.constant.LoggerName; ...@@ -20,40 +20,43 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
public class WaitNotifyObject { public class WaitNotifyObject {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
protected final HashMap<Long/* thread id */, Boolean/* notified */> waitingThreadTable = protected final ConcurrentHashMap<Long/* thread id */, AtomicBoolean/* notified */> waitingThreadTable =
new HashMap<Long, Boolean>(16); new ConcurrentHashMap<Long, AtomicBoolean>(16);
protected volatile boolean hasNotified = false; protected AtomicBoolean hasNotified = new AtomicBoolean(false);
public void wakeup() { public void wakeup() {
synchronized (this) { boolean needNotify = hasNotified.compareAndSet(false, true);
if (!this.hasNotified) { if (needNotify) {
this.hasNotified = true; synchronized (this) {
this.notify(); this.notify();
} }
} }
} }
protected void waitForRunning(long interval) { protected void waitForRunning(long interval) {
if (this.hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
synchronized (this) { synchronized (this) {
if (this.hasNotified) {
this.hasNotified = false;
this.onWaitEnd();
return;
}
try { try {
if (this.hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
this.wait(interval); this.wait(interval);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("Interrupted", e); log.error("Interrupted", e);
} finally { } finally {
this.hasNotified = false; this.hasNotified.set(false);
this.onWaitEnd(); this.onWaitEnd();
} }
} }
...@@ -63,15 +66,14 @@ public class WaitNotifyObject { ...@@ -63,15 +66,14 @@ public class WaitNotifyObject {
} }
public void wakeupAll() { public void wakeupAll() {
synchronized (this) { boolean needNotify = false;
boolean needNotify = false; for (Map.Entry<Long,AtomicBoolean> entry : this.waitingThreadTable.entrySet()) {
if (entry.getValue().compareAndSet(false, true)) {
for (Map.Entry<Long,Boolean> entry : this.waitingThreadTable.entrySet()) { needNotify = true;
needNotify = needNotify || !entry.getValue();
entry.setValue(true);
} }
}
if (needNotify) { if (needNotify) {
synchronized (this) {
this.notifyAll(); this.notifyAll();
} }
} }
...@@ -79,20 +81,22 @@ public class WaitNotifyObject { ...@@ -79,20 +81,22 @@ public class WaitNotifyObject {
public void allWaitForRunning(long interval) { public void allWaitForRunning(long interval) {
long currentThreadId = Thread.currentThread().getId(); long currentThreadId = Thread.currentThread().getId();
AtomicBoolean notified = this.waitingThreadTable.computeIfAbsent(currentThreadId, k -> new AtomicBoolean(false));
if (notified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
synchronized (this) { synchronized (this) {
Boolean notified = this.waitingThreadTable.get(currentThreadId);
if (notified != null && notified) {
this.waitingThreadTable.put(currentThreadId, false);
this.onWaitEnd();
return;
}
try { try {
if (notified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
this.wait(interval); this.wait(interval);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("Interrupted", e); log.error("Interrupted", e);
} finally { } finally {
this.waitingThreadTable.put(currentThreadId, false); notified.set(false);
this.onWaitEnd(); this.onWaitEnd();
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册