提交 6eb95cc7 编写于 作者: J JackieTien97 提交者: Jialin Qiao

fix deadlock after showing latest timeseries

上级 b2b6d325
......@@ -67,7 +67,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.utils.RandomDeleteCache;
import org.apache.iotdb.db.utils.TestOnly;
......@@ -830,8 +829,8 @@ public class MManager {
}
}
private List<ShowTimeSeriesResult> showTimeseriesWithIndex(ShowTimeSeriesPlan plan)
throws MetadataException {
private List<ShowTimeSeriesResult> showTimeseriesWithIndex(ShowTimeSeriesPlan plan,
QueryContext context) throws MetadataException {
lock.readLock().lock();
try {
if (!tagIndex.containsKey(plan.getKey())) {
......@@ -861,10 +860,8 @@ public class MManager {
// if ordered by heat, we sort all the timeseries by the descending order of the last insert timestamp
if (plan.isOrderByHeat()) {
QueryContext queryContext = new QueryContext(
QueryResourceManager.getInstance().assignQueryId(true));
allMatchedNodes = allMatchedNodes.stream().sorted(Comparator
.comparingLong((MeasurementMNode mNode) -> MTree.getLastTimeStamp(mNode, queryContext))
.comparingLong((MeasurementMNode mNode) -> MTree.getLastTimeStamp(mNode, context))
.reversed().thenComparing(MNode::getFullPath)).collect(toList());
} else {
// otherwise, we just sort them by the alphabetical order
......@@ -926,13 +923,13 @@ public class MManager {
return true;
}
public List<ShowTimeSeriesResult> showTimeseries(ShowTimeSeriesPlan plan)
public List<ShowTimeSeriesResult> showTimeseries(ShowTimeSeriesPlan plan, QueryContext context)
throws MetadataException {
// show timeseries with index
if (plan.getKey() != null && plan.getValue() != null) {
return showTimeseriesWithIndex(plan);
return showTimeseriesWithIndex(plan, context);
} else {
return showTimeseriesWithoutIndex(plan);
return showTimeseriesWithoutIndex(plan, context);
}
}
......@@ -941,13 +938,13 @@ public class MManager {
*
* @param plan show time series query plan
*/
private List<ShowTimeSeriesResult> showTimeseriesWithoutIndex(ShowTimeSeriesPlan plan)
throws MetadataException {
private List<ShowTimeSeriesResult> showTimeseriesWithoutIndex(ShowTimeSeriesPlan plan,
QueryContext context) throws MetadataException {
lock.readLock().lock();
List<String[]> ans;
try {
if (plan.isOrderByHeat()) {
ans = mtree.getAllMeasurementSchemaByHeatOrder(plan);
ans = mtree.getAllMeasurementSchemaByHeatOrder(plan, context);
} else {
ans = mtree.getAllMeasurementSchema(plan);
}
......
......@@ -66,7 +66,6 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
......@@ -644,17 +643,14 @@ public class MTree implements Serializable {
*
* <p>result: [name, alias, storage group, dataType, encoding, compression, offset]
*/
List<String[]> getAllMeasurementSchemaByHeatOrder(ShowTimeSeriesPlan plan)
throws MetadataException {
List<String[]> getAllMeasurementSchemaByHeatOrder(ShowTimeSeriesPlan plan,
QueryContext queryContext) throws MetadataException {
String[] nodes = MetaUtils.getNodeNames(plan.getPath().getFullPath());
if (nodes.length == 0 || !nodes[0].equals(root.getName())) {
throw new IllegalPathException(plan.getPath().getFullPath());
}
List<String[]> allMatchedNodes = new ArrayList<>();
QueryContext queryContext = new QueryContext(
QueryResourceManager.getInstance().assignQueryId(true));
findPath(root, nodes, 1, allMatchedNodes, false, true, queryContext);
Stream<String[]> sortedStream = allMatchedNodes.stream().sorted(
......
......@@ -173,7 +173,7 @@ public class PlanExecutor implements IPlanExecutor {
} else if (queryPlan instanceof AuthorPlan) {
return processAuthorQuery((AuthorPlan) queryPlan);
} else if (queryPlan instanceof ShowPlan) {
return processShowQuery((ShowPlan) queryPlan);
return processShowQuery((ShowPlan) queryPlan, context);
} else {
throw new QueryProcessException(String.format("Unrecognized query plan %s", queryPlan));
}
......@@ -259,7 +259,8 @@ public class PlanExecutor implements IPlanExecutor {
(storageGroupName, partitionId) ->
storageGroupName.equals(((DeletePartitionPlan) plan).getStorageGroupName())
&& p.getPartitionId().contains(partitionId);
StorageEngine.getInstance().removePartitions(((DeletePartitionPlan) plan).getStorageGroupName(), filter);
StorageEngine.getInstance()
.removePartitions(((DeletePartitionPlan) plan).getStorageGroupName(), filter);
return true;
case CREATE_SCHEMA_SNAPSHOT:
operateCreateSnapshot();
......@@ -348,7 +349,7 @@ public class PlanExecutor implements IPlanExecutor {
return new AlignByDeviceDataSet(plan, context, router);
}
protected QueryDataSet processShowQuery(ShowPlan showPlan)
protected QueryDataSet processShowQuery(ShowPlan showPlan, QueryContext context)
throws QueryProcessException, MetadataException {
switch (showPlan.getShowContentType()) {
case TTL:
......@@ -360,7 +361,7 @@ public class PlanExecutor implements IPlanExecutor {
case VERSION:
return processShowVersion();
case TIMESERIES:
return processShowTimeseries((ShowTimeSeriesPlan) showPlan);
return processShowTimeseries((ShowTimeSeriesPlan) showPlan, context);
case STORAGE_GROUP:
return processShowStorageGroup();
case DEVICES:
......@@ -507,15 +508,15 @@ public class PlanExecutor implements IPlanExecutor {
return listDataSet;
}
private QueryDataSet processShowTimeseries(ShowTimeSeriesPlan showTimeSeriesPlan)
throws MetadataException {
List<ShowTimeSeriesResult> timeseriesList = showTimeseries(showTimeSeriesPlan);
return QueryUtils.getQueryDataSet(timeseriesList, showTimeSeriesPlan);
private QueryDataSet processShowTimeseries(ShowTimeSeriesPlan showTimeSeriesPlan,
QueryContext context) throws MetadataException {
List<ShowTimeSeriesResult> timeseriesList = showTimeseries(showTimeSeriesPlan, context);
return QueryUtils.getQueryDataSet(timeseriesList, showTimeSeriesPlan, context);
}
protected List<ShowTimeSeriesResult> showTimeseries(ShowTimeSeriesPlan plan)
protected List<ShowTimeSeriesResult> showTimeseries(ShowTimeSeriesPlan plan, QueryContext context)
throws MetadataException {
return IoTDB.metaManager.showTimeseries(plan);
return IoTDB.metaManager.showTimeseries(plan, context);
}
protected List<StorageGroupMNode> getAllStorageGroupNodes() {
......@@ -846,8 +847,10 @@ public class PlanExecutor implements IPlanExecutor {
}
protected MeasurementSchema[] getSeriesSchemas(InsertPlan insertPlan)
throws MetadataException {
return mManager.getSeriesSchemasAndReadLockDevice(insertPlan.getDeviceId(), insertPlan.getMeasurements(), insertPlan);
throws MetadataException {
return mManager
.getSeriesSchemasAndReadLockDevice(insertPlan.getDeviceId(), insertPlan.getMeasurements(),
insertPlan);
}
@Override
......
......@@ -27,6 +27,7 @@ import java.util.List;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
......@@ -42,13 +43,15 @@ public class ShowTimeseriesDataSet extends QueryDataSet {
private final ShowTimeSeriesPlan plan;
private List<RowRecord> result = new ArrayList<>();
private int index = 0;
private QueryContext context;
public boolean hasLimit = true;
public ShowTimeseriesDataSet(List<Path> paths, List<TSDataType> dataTypes,
ShowTimeSeriesPlan showTimeSeriesPlan) {
ShowTimeSeriesPlan showTimeSeriesPlan, QueryContext context) {
super(paths, dataTypes);
this.plan = showTimeSeriesPlan;
this.context = context;
}
@Override
......@@ -57,7 +60,7 @@ public class ShowTimeseriesDataSet extends QueryDataSet {
plan.setOffset(plan.getOffset() + plan.getLimit());
try {
List<ShowTimeSeriesResult> showTimeSeriesResults = MManager.getInstance()
.showTimeseries(plan);
.showTimeseries(plan, context);
result = transferShowTimeSeriesResultToRecordList(showTimeSeriesResults);
index = 0;
} catch (MetadataException e) {
......
......@@ -36,6 +36,7 @@ import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.query.dataset.ShowTimeseriesDataSet;
import org.apache.iotdb.db.query.filter.TsFileFilter;
......@@ -136,12 +137,12 @@ public class QueryUtils {
}
public static QueryDataSet getQueryDataSet(List<ShowTimeSeriesResult> timeseriesList,
ShowTimeSeriesPlan showTimeSeriesPlan) {
ShowTimeSeriesPlan showTimeSeriesPlan, QueryContext context) {
List<Path> paths = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
constructPathAndDataTypes(paths, dataTypes, timeseriesList);
ShowTimeseriesDataSet showTimeseriesDataSet = new ShowTimeseriesDataSet(paths, dataTypes,
showTimeSeriesPlan);
showTimeSeriesPlan, context);
showTimeseriesDataSet.hasLimit = showTimeSeriesPlan.hasLimit();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册