提交 b1fcf1b8 编写于 作者: J Jaskey 提交者: dongeforever

[ROCKETMQ-98]Fix risk of unable to release putMessage Lock forever closes...

[ROCKETMQ-98]Fix risk of unable to release putMessage Lock forever closes apache/incubator-rocketmq#61
上级 0adad6f0
......@@ -47,6 +47,9 @@ public class BrokerConfig {
private boolean autoCreateSubscriptionGroup = true;
private String messageStorePlugIn = "";
/**
* thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default value is 1.
*/
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int adminBrokerThreadPoolNums = 16;
......
......@@ -23,8 +23,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
......@@ -63,12 +61,7 @@ public class CommitLog {
private volatile long confirmOffset = -1L;
private volatile long beginTimeInLock = 0;
//true: Can lock, false : in lock.
private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
private final PutMessageLock putMessageLock;
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
......@@ -88,6 +81,8 @@ public class CommitLog {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}
public boolean load() {
......@@ -577,7 +572,7 @@ public class CommitLog {
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
lockForPutMessage(); //spin...
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
......@@ -626,7 +621,7 @@ public class CommitLog {
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
releasePutMessageLock();
putMessageLock.unlock();
}
if (eclipseTimeInLock > 500) {
......@@ -861,7 +856,7 @@ public class CommitLog {
}
public boolean appendData(long startOffset, byte[] data) {
lockForPutMessage(); //spin...
putMessageLock.lock();
try {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
if (null == mappedFile) {
......@@ -871,7 +866,7 @@ public class CommitLog {
return mappedFile.appendMessage(data);
} finally {
releasePutMessageLock();
putMessageLock.unlock();
}
}
......@@ -906,28 +901,7 @@ public class CommitLog {
return diff;
}
/**
* Spin util acquired the lock.
*/
private void lockForPutMessage() {
if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
putMessageNormalLock.lock();
} else {
boolean flag;
do {
flag = this.putMessageSpinLock.compareAndSet(true, false);
}
while (!flag);
}
}
private void releasePutMessageLock() {
if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
putMessageNormalLock.unlock();
} else {
this.putMessageSpinLock.compareAndSet(false, true);
}
}
public static class GroupCommitRequest {
private final long nextOffset;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
/**
* Used when trying to put message
*/
public interface PutMessageLock {
void lock();
void unlock();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import java.util.concurrent.locks.ReentrantLock;
/**
* Exclusive lock implementation to put message
*/
public class PutMessageReentrantLock implements PutMessageLock {
private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
@Override
public void lock() {
putMessageNormalLock.lock();
}
@Override
public void unlock() {
putMessageNormalLock.unlock();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Spin lock Implementation to put message, suggest using this witb low race conditions
*
*/
public class PutMessageSpinLock implements PutMessageLock {
//true: Can lock, false : in lock.
private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
@Override
public void lock() {
boolean flag;
do {
flag = this.putMessageSpinLock.compareAndSet(true, false);
}
while (!flag);
}
@Override
public void unlock() {
this.putMessageSpinLock.compareAndSet(false, true);
}
}
......@@ -52,6 +52,10 @@ public class MessageStoreConfig {
@ImportantField
private int commitIntervalCommitLog = 200;
/**
* introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting message.<br/>
* By default it is set to false indicating using spin lock when putting message.
*/
private boolean useReentrantLockWhenPutMessage = false;
// Whether schedule flush,default is real-time
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册