未验证 提交 2f26fc1c 编写于 作者: J Jiang Tian 提交者: GitHub

[IOTDB-30]Bug fix: use two maps to avoid getting wrong FileReader (#78)

* Bug fix: use two maps to avoid getting wrong FileReader

* fix by comments

* fix a null pointer problem
上级 07c8fce1
......@@ -848,7 +848,6 @@ public class FileNodeManager implements IStatistic, IService {
fileNodePath = standardizeDir(fileNodePath) + processorName;
FileUtils.deleteDirectory(new File(fileNodePath));
cleanBufferWrite(processorName);
cleanBufferWrite(processorName);
MultiFileLogNodeManager.getInstance()
......
......@@ -47,22 +47,34 @@ public class FileReaderManager implements IService {
private static final int MAX_CACHED_FILE_SIZE = 30000;
/**
* the key of fileReaderMap is the file path and the value of fileReaderMap is the corresponding
* reader.
* the key of closedFileReaderMap is the file path and the value of closedFileReaderMap
* is the corresponding reader.
*/
private ConcurrentHashMap<String, TsFileSequenceReader> fileReaderMap;
private ConcurrentHashMap<String, TsFileSequenceReader> closedFileReaderMap;
/**
* the key of unclosedFileReaderMap is the file path and the value of unclosedFileReaderMap
* is the corresponding reader.
*/
private ConcurrentHashMap<String, TsFileSequenceReader> unclosedFileReaderMap;
/**
* the key of fileReaderMap is the file path and the value of fileReaderMap is the file's
* reference count.
* the key of closedFileReaderMap is the file path and the value of closedFileReaderMap
* is the file's reference count.
*/
private ConcurrentHashMap<String, AtomicInteger> closedReferenceMap;
/**
* the key of unclosedFileReaderMap is the file path and the value of unclosedFileReaderMap
* is the file's reference count.
*/
private ConcurrentHashMap<String, AtomicInteger> referenceMap;
private ConcurrentHashMap<String, AtomicInteger> unclosedReferenceMap;
private ScheduledExecutorService executorService;
private FileReaderManager() {
fileReaderMap = new ConcurrentHashMap<>();
referenceMap = new ConcurrentHashMap<>();
closedFileReaderMap = new ConcurrentHashMap<>();
unclosedFileReaderMap = new ConcurrentHashMap<>();
closedReferenceMap = new ConcurrentHashMap<>();
unclosedReferenceMap = new ConcurrentHashMap<>();
executorService = IoTDBThreadPoolFactory.newScheduledThreadPool(1,
"opended-files-manager");
......@@ -79,76 +91,99 @@ public class FileReaderManager implements IService {
executorService.scheduleAtFixedRate(() -> {
synchronized (this) {
for (Map.Entry<String, TsFileSequenceReader> entry : fileReaderMap.entrySet()) {
TsFileSequenceReader reader = entry.getValue();
int referenceNum = referenceMap.get(entry.getKey()).get();
if (referenceNum == 0) {
try {
reader.close();
} catch (IOException e) {
LOGGER.error("Can not close TsFileSequenceReader {} !", reader.getFileName(), e);
}
fileReaderMap.remove(entry.getKey());
referenceMap.remove(entry.getKey());
}
}
clearMap(unclosedFileReaderMap, unclosedReferenceMap);
clearMap(closedFileReaderMap, closedReferenceMap);
}
}, 0, examinePeriod, TimeUnit.MILLISECONDS);
}
private void clearMap(Map<String, TsFileSequenceReader> readerMap,
Map<String, AtomicInteger> refMap) {
for (Map.Entry<String, TsFileSequenceReader> entry : readerMap.entrySet()) {
TsFileSequenceReader reader = entry.getValue();
int referenceNum = refMap.get(entry.getKey()).get();
if (referenceNum == 0) {
try {
reader.close();
} catch (IOException e) {
LOGGER.error("Can not close TsFileSequenceReader {} !", reader.getFileName(), e);
}
readerMap.remove(entry.getKey());
refMap.remove(entry.getKey());
}
}
}
/**
* Get the reader of the file(tsfile or unseq tsfile) indicated by filePath. If the reader already
* exists, just get it from fileReaderMap. Otherwise a new reader will be created.
* exists, just get it from closedFileReaderMap or unclosedFileReaderMap depending on isClosed .
* Otherwise a new reader will be created and cached.
*
* @param filePath the path of the file, of which the reader is desired.
* @param isUnClosed whether the corresponding file still receives insertions or not.
* @param isClosed whether the corresponding file still receives insertions or not.
* @return the reader of the file specified by filePath.
* @throws IOException when reader cannot be created.
*/
public synchronized TsFileSequenceReader get(String filePath, boolean isUnClosed)
public synchronized TsFileSequenceReader get(String filePath, boolean isClosed)
throws IOException {
if (!fileReaderMap.containsKey(filePath)) {
Map<String, TsFileSequenceReader> readerMap = !isClosed ? unclosedFileReaderMap
: closedFileReaderMap;
if (!readerMap.containsKey(filePath)) {
if (fileReaderMap.size() >= MAX_CACHED_FILE_SIZE) {
LOGGER.warn("Query has opened {} files !", fileReaderMap.size());
if (readerMap.size() >= MAX_CACHED_FILE_SIZE) {
LOGGER.warn("Query has opened {} files !", readerMap.size());
}
TsFileSequenceReader tsFileReader = isUnClosed ? new UnClosedTsFileReader(filePath)
TsFileSequenceReader tsFileReader = !isClosed ? new UnClosedTsFileReader(filePath)
: new TsFileSequenceReader(filePath);
fileReaderMap.put(filePath, tsFileReader);
readerMap.put(filePath, tsFileReader);
return tsFileReader;
}
return fileReaderMap.get(filePath);
return readerMap.get(filePath);
}
/**
* Increase the reference count of the reader specified by filePath. Only when the reference count
* of a reader equals zero, the reader can be closed and removed.
*/
public synchronized void increaseFileReaderReference(String filePath) {
referenceMap.computeIfAbsent(filePath, k -> new AtomicInteger()).getAndIncrement();
public synchronized void increaseFileReaderReference(String filePath, boolean isClosed) {
if (!isClosed) {
unclosedReferenceMap.computeIfAbsent(filePath, k -> new AtomicInteger()).getAndIncrement();
} else {
closedReferenceMap.computeIfAbsent(filePath, k -> new AtomicInteger()).getAndIncrement();
}
}
/**
* Decrease the reference count of the reader specified by filePath. This method is latch-free.
* Only when the reference count of a reader equals zero, the reader can be closed and removed.
*/
public synchronized void decreaseFileReaderReference(String filePath) {
referenceMap.get(filePath).getAndDecrement();
public synchronized void decreaseFileReaderReference(String filePath, boolean isClosed) {
if (!isClosed && unclosedReferenceMap.containsKey(filePath)) {
unclosedReferenceMap.get(filePath).getAndDecrement();
} else if (closedReferenceMap.containsKey(filePath)){
closedReferenceMap.get(filePath).getAndDecrement();
}
}
/**
* This method is used when the given file path is deleted.
*/
public synchronized void closeFileAndRemoveReader(String filePath) throws IOException {
if (fileReaderMap.containsKey(filePath)) {
referenceMap.remove(filePath);
fileReaderMap.get(filePath).close();
fileReaderMap.remove(filePath);
public synchronized void closeFileAndRemoveReader(String filePath)
throws IOException {
if (unclosedFileReaderMap.containsKey(filePath)) {
unclosedReferenceMap.remove(filePath);
unclosedFileReaderMap.get(filePath).close();
unclosedFileReaderMap.remove(filePath);
}
if (closedFileReaderMap.containsKey(filePath)) {
closedReferenceMap.remove(filePath);
closedFileReaderMap.get(filePath).close();
closedFileReaderMap.remove(filePath);
}
}
......@@ -157,18 +192,24 @@ public class FileReaderManager implements IService {
* integration tests will not conflict with each other.
*/
public synchronized void closeAndRemoveAllOpenedReaders() throws IOException {
for (Map.Entry<String, TsFileSequenceReader> entry : fileReaderMap.entrySet()) {
for (Map.Entry<String, TsFileSequenceReader> entry : closedFileReaderMap.entrySet()) {
entry.getValue().close();
closedReferenceMap.remove(entry.getKey());
closedFileReaderMap.remove(entry.getKey());
}
for (Map.Entry<String, TsFileSequenceReader> entry : unclosedFileReaderMap.entrySet()) {
entry.getValue().close();
referenceMap.remove(entry.getKey());
fileReaderMap.remove(entry.getKey());
unclosedReferenceMap.remove(entry.getKey());
unclosedFileReaderMap.remove(entry.getKey());
}
}
/**
* This method is only for unit tests.
*/
public synchronized boolean contains(String filePath) {
return fileReaderMap.containsKey(filePath);
public synchronized boolean contains(String filePath, boolean isClosed) {
return (isClosed && closedFileReaderMap.containsKey(filePath))
|| (!isClosed && unclosedFileReaderMap.containsKey(filePath));
}
@Override
......
......@@ -40,11 +40,13 @@ public class OpenedFilePathsManager {
/**
* Map<jobId, Set<filePaths>>
*/
private ConcurrentHashMap<Long, Set<String>> filePathsMap;
private ConcurrentHashMap<Long, Set<String>> closedFilePathsMap;
private ConcurrentHashMap<Long, Set<String>> unclosedFilePathsMap;
private OpenedFilePathsManager() {
jobIdContainer = new ThreadLocal<>();
filePathsMap = new ConcurrentHashMap<>();
closedFilePathsMap = new ConcurrentHashMap<>();
unclosedFilePathsMap = new ConcurrentHashMap<>();
}
public static OpenedFilePathsManager getInstance() {
......@@ -56,27 +58,29 @@ public class OpenedFilePathsManager {
*/
public void setJobIdForCurrentRequestThread(long jobId) {
jobIdContainer.set(jobId);
filePathsMap.put(jobId, new HashSet<>());
closedFilePathsMap.put(jobId, new HashSet<>());
unclosedFilePathsMap.put(jobId, new HashSet<>());
}
/**
* Add the unique file paths to filePathsMap.
* Add the unique file paths to closedFilePathsMap and unclosedFilePathsMap.
*/
public void addUsedFilesForCurrentRequestThread(long jobId, QueryDataSource dataSource) {
for (IntervalFileNode intervalFileNode : dataSource.getSeqDataSource().getSealedTsFiles()) {
String sealedFilePath = intervalFileNode.getFilePath();
addFilePathToMap(jobId, sealedFilePath);
addFilePathToMap(jobId, sealedFilePath, true);
}
if (dataSource.getSeqDataSource().hasUnsealedTsFile()) {
String unSealedFilePath = dataSource.getSeqDataSource().getUnsealedTsFile().getFilePath();
addFilePathToMap(jobId, unSealedFilePath);
addFilePathToMap(jobId, unSealedFilePath, false);
}
for (OverflowInsertFile overflowInsertFile : dataSource.getOverflowSeriesDataSource()
.getOverflowInsertFileList()) {
String overflowFilePath = overflowInsertFile.getFilePath();
addFilePathToMap(jobId, overflowFilePath);
// overflow is unclosed by default
addFilePathToMap(jobId, overflowFilePath, false);
}
}
......@@ -89,22 +93,29 @@ public class OpenedFilePathsManager {
long jobId = jobIdContainer.get();
jobIdContainer.remove();
for (String filePath : filePathsMap.get(jobId)) {
FileReaderManager.getInstance().decreaseFileReaderReference(filePath);
for (String filePath : closedFilePathsMap.get(jobId)) {
FileReaderManager.getInstance().decreaseFileReaderReference(filePath, false);
}
filePathsMap.remove(jobId);
closedFilePathsMap.remove(jobId);
for (String filePath : unclosedFilePathsMap.get(jobId)) {
FileReaderManager.getInstance().decreaseFileReaderReference(filePath, true);
}
unclosedFilePathsMap.remove(jobId);
}
}
/**
* Increase the usage reference of filePath of job id. Before the invoking of this method,
* <code>this.setJobIdForCurrentRequestThread</code> has been invoked, so <code>filePathsMap.get(jobId)</code> must
* not return null.
* <code>this.setJobIdForCurrentRequestThread</code> has been invoked,
* so <code>closedFilePathsMap.get(jobId)</code> or <code>unclosedFilePathsMap.get(jobId)</code>
* must not return null.
*/
public void addFilePathToMap(long jobId, String filePath) {
if (!filePathsMap.get(jobId).contains(filePath)) {
filePathsMap.get(jobId).add(filePath);
FileReaderManager.getInstance().increaseFileReaderReference(filePath);
public void addFilePathToMap(long jobId, String filePath, boolean isClosed) {
ConcurrentHashMap<Long, Set<String>> pathMap = !isClosed ? unclosedFilePathsMap :
closedFilePathsMap;
if (!pathMap.get(jobId).contains(filePath)) {
pathMap.get(jobId).add(filePath);
FileReaderManager.getInstance().increaseFileReaderReference(filePath, isClosed);
}
}
......
......@@ -81,7 +81,7 @@ public class SeriesReaderFactory {
// store only one opened file stream into manager, to avoid too many opened files
TsFileSequenceReader unClosedTsFileReader = FileReaderManager.getInstance()
.get(overflowInsertFile.getFilePath(), true);
.get(overflowInsertFile.getFilePath(), false);
ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(unClosedTsFileReader);
......@@ -161,7 +161,7 @@ public class SeriesReaderFactory {
QueryContext context)
throws IOException {
TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
.get(fileNode.getFilePath(), false);
.get(fileNode.getFilePath(), true);
ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileSequenceReader);
MetadataQuerier metadataQuerier = new MetadataQuerierByFileImpl(tsFileSequenceReader);
List<ChunkMetaData> metaDataList = metadataQuerier
......
......@@ -171,7 +171,7 @@ public class SealedTsFilesReader implements IReader {
// to avoid too many opened files
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
.get(fileNode.getFilePath(), false);
.get(fileNode.getFilePath(), true);
MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
List<ChunkMetaData> metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
......
......@@ -50,7 +50,7 @@ public class UnSealedTsFileReader implements IReader {
TsFileSequenceReader unClosedTsFileReader = FileReaderManager.getInstance()
.get(unsealedTsFile.getFilePath(),
true);
false);
ChunkLoader chunkLoader = new ChunkLoaderImpl(unClosedTsFileReader);
if (filter == null) {
......
......@@ -567,7 +567,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
recordANewQuery(statement, plan);
return resp;
} catch (Exception e) {
LOGGER.error("{}: Internal server error: {}", IoTDBConstant.GLOBAL_DB_NAME, e.getMessage());
LOGGER.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
}
}
......@@ -605,7 +605,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
resp.setQueryDataSet(result);
return resp;
} catch (Exception e) {
LOGGER.error("{}: Internal server error: {}", IoTDBConstant.GLOBAL_DB_NAME, e.getMessage());
LOGGER.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return getTSFetchResultsResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
}
}
......@@ -623,7 +623,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
LOGGER.error("meet error while executing update statement.", e);
return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
} catch (Exception e) {
LOGGER.error("{}: server Internal Error: {}", IoTDBConstant.GLOBAL_DB_NAME, e.getMessage());
LOGGER.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return getTSExecuteStatementResp(TS_StatusCode.ERROR_STATUS, e.getMessage());
}
}
......
......@@ -65,9 +65,10 @@ public class FileReaderManagerTest {
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(1L);
for (int i = 1; i <= 6; i++) {
OpenedFilePathsManager.getInstance().addFilePathToMap(1L, filePath + i);
manager.get(filePath + i, true);
Assert.assertTrue(manager.contains(filePath + i));
OpenedFilePathsManager.getInstance().addFilePathToMap(1L, filePath + i,
false);
manager.get(filePath + i, false);
Assert.assertTrue(manager.contains(filePath + i, false));
}
} catch (IOException e) {
......@@ -82,9 +83,10 @@ public class FileReaderManagerTest {
OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(2L);
for (int i = 4; i <= MAX_FILE_SIZE; i++) {
OpenedFilePathsManager.getInstance().addFilePathToMap(2L, filePath + i);
manager.get(filePath + i, true);
Assert.assertTrue(manager.contains(filePath + i));
OpenedFilePathsManager.getInstance().addFilePathToMap(2L, filePath + i,
false);
manager.get(filePath + i, false);
Assert.assertTrue(manager.contains(filePath + i, false));
}
} catch (IOException e) {
......@@ -98,11 +100,11 @@ public class FileReaderManagerTest {
t2.join();
for (int i = 1; i <= MAX_FILE_SIZE; i++) {
Assert.assertTrue(manager.contains(filePath + i));
Assert.assertTrue(manager.contains(filePath + i, false));
}
for (int i = 1; i <= MAX_FILE_SIZE; i++) {
manager.decreaseFileReaderReference(filePath + i);
manager.decreaseFileReaderReference(filePath + i, true);
}
// the code below is not valid because the cacheFileReaderClearPeriod config in this class is not valid
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册