From 031347db7314b511ea7356ac892001ac1349489e Mon Sep 17 00:00:00 2001 From: Jaskey Date: Sat, 27 May 2017 11:21:09 +0800 Subject: [PATCH] [ROCKETMQ-98]Fix risk of unable to release putMessage Lock forever closes apache/incubator-rocketmq#61 --- .../apache/rocketmq/common/BrokerConfig.java | 3 ++ .../org/apache/rocketmq/store/CommitLog.java | 40 +++-------------- .../apache/rocketmq/store/PutMessageLock.java | 25 +++++++++++ .../store/PutMessageReentrantLock.java | 37 ++++++++++++++++ .../rocketmq/store/PutMessageSpinLock.java | 43 +++++++++++++++++++ .../store/config/MessageStoreConfig.java | 4 ++ 6 files changed, 119 insertions(+), 33 deletions(-) create mode 100644 store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java create mode 100644 store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java create mode 100644 store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index f0a73bd2..5bce013d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -47,6 +47,9 @@ public class BrokerConfig { private boolean autoCreateSubscriptionGroup = true; private String messageStorePlugIn = ""; + /** + * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default value is 1. + */ private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4; private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2; private int adminBrokerThreadPoolNums = 16; diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 7841febf..7b292631 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -23,8 +23,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; @@ -63,12 +61,7 @@ public class CommitLog { private volatile long confirmOffset = -1L; private volatile long beginTimeInLock = 0; - - //true: Can lock, false : in lock. - private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true); - - private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync - + private final PutMessageLock putMessageLock; public CommitLog(final DefaultMessageStore defaultMessageStore) { this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); @@ -88,6 +81,8 @@ public class CommitLog { return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); } }; + this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); + } public boolean load() { @@ -577,7 +572,7 @@ public class CommitLog { MappedFile unlockMappedFile = null; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); - lockForPutMessage(); //spin... + putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; @@ -626,7 +621,7 @@ public class CommitLog { eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { - releasePutMessageLock(); + putMessageLock.unlock(); } if (eclipseTimeInLock > 500) { @@ -861,7 +856,7 @@ public class CommitLog { } public boolean appendData(long startOffset, byte[] data) { - lockForPutMessage(); //spin... + putMessageLock.lock(); try { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset); if (null == mappedFile) { @@ -871,7 +866,7 @@ public class CommitLog { return mappedFile.appendMessage(data); } finally { - releasePutMessageLock(); + putMessageLock.unlock(); } } @@ -906,28 +901,7 @@ public class CommitLog { return diff; } - /** - * Spin util acquired the lock. - */ - private void lockForPutMessage() { - if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) { - putMessageNormalLock.lock(); - } else { - boolean flag; - do { - flag = this.putMessageSpinLock.compareAndSet(true, false); - } - while (!flag); - } - } - private void releasePutMessageLock() { - if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) { - putMessageNormalLock.unlock(); - } else { - this.putMessageSpinLock.compareAndSet(false, true); - } - } public static class GroupCommitRequest { private final long nextOffset; diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java new file mode 100644 index 00000000..a03e41ad --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + +/** + * Used when trying to put message + */ +public interface PutMessageLock { + void lock(); + void unlock(); +} diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java new file mode 100644 index 00000000..9198f1c6 --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + + +import java.util.concurrent.locks.ReentrantLock; + +/** + * Exclusive lock implementation to put message + */ +public class PutMessageReentrantLock implements PutMessageLock { + private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync + + @Override + public void lock() { + putMessageNormalLock.lock(); + } + + @Override + public void unlock() { + putMessageNormalLock.unlock(); + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java new file mode 100644 index 00000000..baa809db --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Spin lock Implementation to put message, suggest using this witb low race conditions + * + */ +public class PutMessageSpinLock implements PutMessageLock { + //true: Can lock, false : in lock. + private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true); + + @Override + public void lock() { + boolean flag; + do { + flag = this.putMessageSpinLock.compareAndSet(true, false); + } + while (!flag); + } + + @Override + public void unlock() { + this.putMessageSpinLock.compareAndSet(false, true); + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 29f800c2..19ed2113 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -52,6 +52,10 @@ public class MessageStoreConfig { @ImportantField private int commitIntervalCommitLog = 200; + /** + * introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting message.
+ * By default it is set to false indicating using spin lock when putting message. + */ private boolean useReentrantLockWhenPutMessage = false; // Whether schedule flush,default is real-time -- GitLab