未验证 提交 31f636f4 编写于 作者: J Jackie Tien 提交者: GitHub

[IOTDB-2413] Fix file handler not released bug (#4889)

上级 fcd211a7
......@@ -406,6 +406,10 @@ RESAMPLE
: R E S A M P L E
;
RESOURCE
: R E S O U R C E
;
REVOKE
: R E V O K E
;
......
......@@ -58,7 +58,7 @@ dclStatement
utilityStatement
: merge | fullMerge | flush | clearCache | settle
| setSystemStatus | showVersion | showFlushInfo | showLockInfo
| setSystemStatus | showVersion | showFlushInfo | showLockInfo | showQueryResource
| showQueryProcesslist | killQuery | grantWatermarkEmbedding | revokeWatermarkEmbedding
| loadConfiguration | loadTimeseries | loadFile | removeFile | unloadFile;
......@@ -600,6 +600,11 @@ showLockInfo
;
// Show Query Resource
showQueryResource
: SHOW QUERY RESOURCE
;
// Show Query Processlist
showQueryProcesslist
: SHOW QUERY PROCESSLIST
......
......@@ -21,7 +21,6 @@ package org.apache.iotdb.db.engine.storagegroup.timeindex;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
......@@ -81,8 +80,7 @@ public class FileTimeIndex implements ITimeIndex {
@Override
public Set<String> getDevices(String tsFilePath) {
try {
TsFileSequenceReader fileReader = FileReaderManager.getInstance().get(tsFilePath, true);
try (TsFileSequenceReader fileReader = new TsFileSequenceReader(tsFilePath)) {
return new HashSet<>(fileReader.getAllDevices());
} catch (IOException e) {
logger.error("Can't read file {} from disk ", tsFilePath, e);
......
......@@ -190,6 +190,8 @@ public class SQLConstant {
public static final int TOK_SCHEMA_TEMPLATE_ACTIVATE = 114;
public static final int TOK_SCHEMA_TEMPLATE_UNSET = 115;
public static final int TOK_SHOW_QUERY_RESOURCE = 116;
public static final Map<Integer, String> tokenNames = new HashMap<>();
public static String[] getSingleRootArray() {
......@@ -267,6 +269,8 @@ public class SQLConstant {
tokenNames.put(TOK_SCHEMA_TEMPLATE_SET, "TOK_SCHEMA_TEMPLATE_SET");
tokenNames.put(TOK_SCHEMA_TEMPLATE_ACTIVATE, "TOK_SCHEMA_TEMPLATE_ACTIVATE");
tokenNames.put(TOK_SCHEMA_TEMPLATE_UNSET, "TOK_SCHEMA_TEMPLATE_UNSET");
tokenNames.put(TOK_SHOW_QUERY_RESOURCE, "TOK_SHOW_QUERY_RESOURCE");
}
public static boolean isReservedPath(PartialPath pathStr) {
......
......@@ -121,6 +121,8 @@ import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.control.QueryTimeManager;
import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
import org.apache.iotdb.db.query.dataset.ListDataSet;
......@@ -172,6 +174,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
......@@ -220,6 +223,7 @@ public class PlanExecutor implements IPlanExecutor {
private static final Logger logger = LoggerFactory.getLogger(PlanExecutor.class);
private static final Logger AUDIT_LOGGER =
LoggerFactory.getLogger(IoTDBConstant.AUDIT_LOGGER_NAME);
private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
// for data query
protected IQueryRouter queryRouter;
// for administration
......@@ -387,6 +391,8 @@ public class PlanExecutor implements IPlanExecutor {
case SETTLE:
settle((SettlePlan) plan);
return true;
case SHOW_QUERY_RESOURCE:
return processShowQueryResource();
default:
throw new UnsupportedOperationException(
String.format("operation %s is not supported", plan.getOperatorType()));
......@@ -2121,6 +2127,14 @@ public class PlanExecutor implements IPlanExecutor {
IoTDBDescriptor.getInstance().loadHotModifiedProps();
}
private boolean processShowQueryResource() {
DEBUG_LOGGER.info(String.format("**********%s**********\n\n", new Date()));
FileReaderManager.getInstance().writeFileReferenceInfo();
QueryResourceManager.getInstance().writeQueryFileInfo();
DEBUG_LOGGER.info("\n****************************************************\n\n");
return true;
}
private QueryDataSet processShowQueryProcesslist() {
ListDataSet listDataSet =
new ListDataSet(
......
......@@ -192,6 +192,8 @@ public abstract class Operator {
UNSET_TEMPLATE,
PRUNE_TEMPLATE,
APPEND_TEMPLATE,
DROP_TEMPLATE
DROP_TEMPLATE,
SHOW_QUERY_RESOURCE
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.qp.logical.sys;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowQueryResourcePlan;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
public class ShowQueryResourceOperate extends Operator {
public ShowQueryResourceOperate(int tokenIntType) {
super(tokenIntType);
setOperatorType(OperatorType.SHOW_QUERY_RESOURCE);
}
@Override
public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator)
throws QueryProcessException {
return new ShowQueryResourcePlan();
}
}
......@@ -116,6 +116,7 @@ public class ShowPlan extends PhysicalPlan {
QUERY_PROCESSLIST,
TRIGGERS,
LOCK_INFO,
CONTINUOUS_QUERY
CONTINUOUS_QUERY,
QUERY_RESOURCE
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.qp.physical.sys;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import java.util.Collections;
import java.util.List;
public class ShowQueryResourcePlan extends PhysicalPlan {
public ShowQueryResourcePlan() {
super(Operator.OperatorType.SHOW_QUERY_RESOURCE);
}
@Override
public List<PartialPath> getPaths() {
return Collections.emptyList();
}
}
......@@ -53,30 +53,9 @@ import org.apache.iotdb.db.qp.logical.crud.SpecialClauseComponent;
import org.apache.iotdb.db.qp.logical.crud.UDAFQueryOperator;
import org.apache.iotdb.db.qp.logical.crud.UDTFQueryOperator;
import org.apache.iotdb.db.qp.logical.crud.WhereComponent;
import org.apache.iotdb.db.qp.logical.sys.ActivateTemplateOperator;
import org.apache.iotdb.db.qp.logical.sys.AlterTimeSeriesOperator;
import org.apache.iotdb.db.qp.logical.sys.*;
import org.apache.iotdb.db.qp.logical.sys.AlterTimeSeriesOperator.AlterType;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
import org.apache.iotdb.db.qp.logical.sys.ClearCacheOperator;
import org.apache.iotdb.db.qp.logical.sys.CountOperator;
import org.apache.iotdb.db.qp.logical.sys.CreateAlignedTimeSeriesOperator;
import org.apache.iotdb.db.qp.logical.sys.CreateContinuousQueryOperator;
import org.apache.iotdb.db.qp.logical.sys.CreateFunctionOperator;
import org.apache.iotdb.db.qp.logical.sys.CreateSnapshotOperator;
import org.apache.iotdb.db.qp.logical.sys.CreateTemplateOperator;
import org.apache.iotdb.db.qp.logical.sys.CreateTimeSeriesOperator;
import org.apache.iotdb.db.qp.logical.sys.CreateTriggerOperator;
import org.apache.iotdb.db.qp.logical.sys.DataAuthOperator;
import org.apache.iotdb.db.qp.logical.sys.DeletePartitionOperator;
import org.apache.iotdb.db.qp.logical.sys.DeleteStorageGroupOperator;
import org.apache.iotdb.db.qp.logical.sys.DeleteTimeSeriesOperator;
import org.apache.iotdb.db.qp.logical.sys.DropContinuousQueryOperator;
import org.apache.iotdb.db.qp.logical.sys.DropFunctionOperator;
import org.apache.iotdb.db.qp.logical.sys.DropTriggerOperator;
import org.apache.iotdb.db.qp.logical.sys.FlushOperator;
import org.apache.iotdb.db.qp.logical.sys.KillQueryOperator;
import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator;
import org.apache.iotdb.db.qp.logical.sys.LoadConfigurationOperator.LoadConfigurationOperatorType;
import org.apache.iotdb.db.qp.logical.sys.LoadDataOperator;
import org.apache.iotdb.db.qp.logical.sys.LoadFilesOperator;
......@@ -1998,6 +1977,12 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
}
}
// Show Query Resource
@Override
public Operator visitShowQueryResource(IoTDBSqlParser.ShowQueryResourceContext ctx) {
return new ShowQueryResourceOperate(SQLConstant.TOK_SHOW_QUERY_RESOURCE);
}
// Show Query Processlist
@Override
......
......@@ -41,6 +41,7 @@ public class FileReaderManager {
private static final Logger logger = LoggerFactory.getLogger(FileReaderManager.class);
private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor");
private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
/** max number of file streams being cached, must be lower than 65535. */
private static final int MAX_CACHED_FILE_SIZE = 30000;
......@@ -241,6 +242,17 @@ public class FileReaderManager {
|| (!isClosed && unclosedFileReaderMap.containsKey(tsFile.getTsFilePath()));
}
public synchronized void writeFileReferenceInfo() {
DEBUG_LOGGER.info("[closedReferenceMap]\n");
for (Map.Entry<String, AtomicInteger> entry : closedReferenceMap.entrySet()) {
DEBUG_LOGGER.info(String.format("\t%s: %d\n", entry.getKey(), entry.getValue().get()));
}
DEBUG_LOGGER.info("[unclosedReferenceMap]\n");
for (Map.Entry<String, AtomicInteger> entry : unclosedReferenceMap.entrySet()) {
DEBUG_LOGGER.info(String.format("\t%s: %d", entry.getKey(), entry.getValue().get()));
}
}
private static class FileReaderManagerHelper {
private static final FileReaderManager INSTANCE = new FileReaderManager();
......
......@@ -21,9 +21,13 @@ package org.apache.iotdb.db.query.control;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
......@@ -33,6 +37,8 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class QueryFileManager {
private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG");
/** Map<queryId, Map<filePath,filePath>> */
private Map<Long, Map<TsFileResource, TsFileResource>> sealedFilePathsMap;
......@@ -126,4 +132,27 @@ public class QueryFileManager {
return k;
});
}
public void writeQueryFileInfo() {
DEBUG_LOGGER.info("[Query Sealed File Info]\n");
for (Map.Entry<Long, Map<TsFileResource, TsFileResource>> entry :
sealedFilePathsMap.entrySet()) {
long queryId = entry.getKey();
Set<TsFileResource> tsFileResources = entry.getValue().keySet();
DEBUG_LOGGER.info(String.format("\t[queryId: %d]\n", queryId));
for (TsFileResource tsFileResource : tsFileResources) {
DEBUG_LOGGER.info(String.format("\t\t%s\n", tsFileResource.getTsFile().getAbsolutePath()));
}
}
DEBUG_LOGGER.info("[Query Unsealed File Info]\n");
for (Map.Entry<Long, Map<TsFileResource, TsFileResource>> entry :
unsealedFilePathsMap.entrySet()) {
long queryId = entry.getKey();
Set<TsFileResource> tsFileResources = entry.getValue().keySet();
DEBUG_LOGGER.info(String.format("\t[queryId: %d]\n", queryId));
for (TsFileResource tsFileResource : tsFileResources) {
DEBUG_LOGGER.info(String.format("\t\t%s\n", tsFileResource.getTsFile().getAbsolutePath()));
}
}
}
}
......@@ -228,6 +228,10 @@ public class QueryResourceManager {
cachedQueryDataSourcesMap.remove(queryId);
}
public void writeQueryFileInfo() {
filePathsManager.writeQueryFileInfo();
}
public QueryFileManager getQueryFileManager() {
return filePathsManager;
}
......
......@@ -95,6 +95,7 @@ public class TsFileResourceManager {
throw new RuntimeException("Can't degrade any more");
}
long memoryReduce = tsFileResource.degradeTimeIndex();
logger.info("Degrade tsfile resource {}", tsFileResource.getTsFilePath());
releaseTimeIndexMemCost(memoryReduce);
// add the polled tsFileResource to the priority queue
sealedTsFileResources.add(tsFileResource);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册