diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index f0a73bd289bca5d39b7c28c51caf5ffdd4a6d146..5bce013da80cacb78a3065250584153a33f744c7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -47,6 +47,9 @@ public class BrokerConfig { private boolean autoCreateSubscriptionGroup = true; private String messageStorePlugIn = ""; + /** + * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default value is 1. + */ private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4; private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2; private int adminBrokerThreadPoolNums = 16; diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 7841febf7086374fb16ad988db555231d529f2f9..7b2926318a2dcd44f6acff02ed23941366bcca78 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -23,8 +23,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -63,12 +61,7 @@ public class CommitLog { private volatile long confirmOffset = -1L; private volatile long beginTimeInLock = 0; - - //true: Can lock, false : in lock. - private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true); - - private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync - + private final PutMessageLock putMessageLock; public CommitLog(final DefaultMessageStore defaultMessageStore) { this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); @@ -88,6 +81,8 @@ public class CommitLog { return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); } }; + this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); + } public boolean load() { @@ -577,7 +572,7 @@ public class CommitLog { MappedFile unlockMappedFile = null; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); - lockForPutMessage(); //spin... + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; @@ -626,7 +621,7 @@ public class CommitLog { eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { - releasePutMessageLock(); + putMessageLock.unlock(); } if (eclipseTimeInLock > 500) { @@ -861,7 +856,7 @@ public class CommitLog { } public boolean appendData(long startOffset, byte[] data) { - lockForPutMessage(); //spin... + putMessageLock.lock(); try { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset); if (null == mappedFile) { @@ -871,7 +866,7 @@ public class CommitLog { return mappedFile.appendMessage(data); } finally { - releasePutMessageLock(); + putMessageLock.unlock(); } } @@ -906,28 +901,7 @@ public class CommitLog { return diff; } - /** - * Spin util acquired the lock. - */ - private void lockForPutMessage() { - if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) { - putMessageNormalLock.lock(); - } else { - boolean flag; - do { - flag = this.putMessageSpinLock.compareAndSet(true, false); - } - while (!flag); - } - } - private void releasePutMessageLock() { - if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) { - putMessageNormalLock.unlock(); - } else { - this.putMessageSpinLock.compareAndSet(false, true); - } - } public static class GroupCommitRequest { private final long nextOffset; diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java new file mode 100644 index 0000000000000000000000000000000000000000..a03e41adf7291f60de527709d3d59a7385c0c655 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + +/** + * Used when trying to put message + */ +public interface PutMessageLock { + void lock(); + void unlock(); +} diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java new file mode 100644 index 0000000000000000000000000000000000000000..9198f1c6f51703224d7fa8e36a14d7a98ad3df5c --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + + +import java.util.concurrent.locks.ReentrantLock; + +/** + * Exclusive lock implementation to put message + */ +public class PutMessageReentrantLock implements PutMessageLock { + private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync + + @Override + public void lock() { + putMessageNormalLock.lock(); + } + + @Override + public void unlock() { + putMessageNormalLock.unlock(); + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java new file mode 100644 index 0000000000000000000000000000000000000000..baa809db9e03926dda207bcad02970d0960faed2 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Spin lock Implementation to put message, suggest using this witb low race conditions + * + */ +public class PutMessageSpinLock implements PutMessageLock { + //true: Can lock, false : in lock. + private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true); + + @Override + public void lock() { + boolean flag; + do { + flag = this.putMessageSpinLock.compareAndSet(true, false); + } + while (!flag); + } + + @Override + public void unlock() { + this.putMessageSpinLock.compareAndSet(false, true); + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 29f800c205d3340ba4454e602a86d0163ea00080..19ed2113dfaef48da58fb04b9b2a40fc58648d06 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -52,6 +52,10 @@ public class MessageStoreConfig { @ImportantField private int commitIntervalCommitLog = 200; + /** + * introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting message.
+ * By default it is set to false indicating using spin lock when putting message. + */ private boolean useReentrantLockWhenPutMessage = false; // Whether schedule flush,default is real-time