未验证 提交 2675ebf2 编写于 作者: L liuxuxin 提交者: GitHub

Add workload manager (#2520)

Add workload manager
上级 338c7f05
......@@ -165,7 +165,7 @@
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${start-class}</mainClass>
</transformer>
......
......@@ -223,7 +223,7 @@
</goals>
</pluginExecutionFilter>
<action>
<ignore />
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
......
......@@ -139,7 +139,7 @@
<sonar.exclusions>**/generated-sources</sonar.exclusions>
<!-- By default, the argLine is empty-->
<gson.version>2.8.6</gson.version>
<argLine />
<argLine/>
</properties>
<!--
if we claim dependencies in dependencyManagement, then we do not claim
......@@ -599,7 +599,7 @@
<id>enforce-version-convergence</id>
<configuration>
<rules>
<dependencyConvergence />
<dependencyConvergence/>
</rules>
</configuration>
<goals>
......@@ -645,7 +645,7 @@
</requireJavaVersion>
<!-- Disabled for now as it breaks the ability to build single modules -->
<!--reactorModuleConvergence/-->
<banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies" />
<banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"/>
</rules>
</configuration>
</execution>
......
......@@ -251,6 +251,25 @@
<level>INFO</level>
</filter>
</appender>
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="QUERY_RECORD">
<file>${IOTDB_HOME}/logs/log_query_record.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${IOTDB_HOME}/logs/log-query-record-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>300MB</maxFileSize>
</triggeringPolicy>
<append>true</append>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d [%t] %-5p %C:%L - %m %n</pattern>
<charset>utf-8</charset>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
</appender>
<root level="info">
<appender-ref ref="FILEDEBUG"/>
<appender-ref ref="FILEWARN"/>
......@@ -280,4 +299,7 @@
<logger level="info" name="QUERY_FREQUENCY">
<appender-ref ref="QUERY_FREQUENCY"/>
</logger>
<logger level="info" name="QUERY_RECORD">
<appender-ref ref="QUERY_RECORD"/>
</logger>
</configuration>
......@@ -23,6 +23,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.stream.Collectors;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
......@@ -40,6 +42,9 @@ import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
import org.apache.iotdb.db.query.workloadmanager.WorkloadManager;
import org.apache.iotdb.db.query.workloadmanager.queryrecord.GroupByQueryRecord;
import org.apache.iotdb.db.query.workloadmanager.queryrecord.QueryRecord;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
......@@ -91,16 +96,42 @@ public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
List<StorageGroupProcessor> list = StorageEngine.getInstance()
.mergeLock(paths.stream().map(p -> (PartialPath) p).collect(Collectors.toList()));
WorkloadManager manager = WorkloadManager.getInstance();
Map<String, List<Integer>> deviceQueryIdxMap = new HashMap<>();
try {
for (int i = 0; i < paths.size(); i++) {
PartialPath path = (PartialPath) paths.get(i);
allDataReaderList
.add(getReaderByTime(path, groupByTimePlan, dataTypes.get(i), context, null));
// Map the device id to the corresponding path indexes
if (deviceQueryIdxMap.containsKey(path.getDevice())) {
deviceQueryIdxMap.get(path.getDevice()).add(i);
} else {
deviceQueryIdxMap.put(path.getDevice(), new ArrayList<>());
deviceQueryIdxMap.get(path.getDevice()).add(i);
}
}
// Add the query records to the workload manager
for(String device: deviceQueryIdxMap.keySet()) {
List<Integer> pathIndexes = deviceQueryIdxMap.get(device);
List<String> sensors = new ArrayList<>();
List<String> ops = new ArrayList<>();
for(int idx: pathIndexes) {
PartialPath path = (PartialPath) paths.get(idx);
sensors.add(path.getMeasurement());
ops.add(groupByTimePlan.getDeduplicatedAggregations().get(idx));
}
QueryRecord record = new GroupByQueryRecord(device, sensors, ops, groupByTimePlan.getStartTime(),
groupByTimePlan.getEndTime(), groupByTimePlan.getInterval(), groupByTimePlan.getSlidingStep());
manager.addRecord(record);
}
} finally {
StorageEngine.getInstance().mergeUnLock(list);
}
}
protected TimeGenerator getTimeGenerator(IExpression expression, QueryContext context,
......
......@@ -37,6 +37,9 @@ import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.workloadmanager.WorkloadManager;
import org.apache.iotdb.db.query.workloadmanager.queryrecord.GroupByQueryRecord;
import org.apache.iotdb.db.query.workloadmanager.queryrecord.QueryRecord;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
......@@ -91,6 +94,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
.mergeLock(paths.stream().map(p -> (PartialPath) p).collect(Collectors.toList()));
try {
// init resultIndexes, group result indexes by path
WorkloadManager manager = WorkloadManager.getInstance();
Map<String, List<Integer>> deviceQueryIdxMap = new HashMap<>();
for (int i = 0; i < paths.size(); i++) {
PartialPath path = (PartialPath) paths.get(i);
if (!pathExecutors.containsKey(path)) {
......@@ -105,6 +110,28 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {
.getAggrResultByName(groupByTimePlan.getDeduplicatedAggregations().get(i),
dataTypes.get(i), ascending);
pathExecutors.get(path).addAggregateResult(aggrResult);
// Map the device id to the corresponding query indexes
if (deviceQueryIdxMap.containsKey(path.getDevice())) {
deviceQueryIdxMap.get(path.getDevice()).add(i);
} else {
deviceQueryIdxMap.put(path.getDevice(), new ArrayList<>());
deviceQueryIdxMap.get(path.getDevice()).add(i);
}
}
// Add the query records to the workload manager
for(String device: deviceQueryIdxMap.keySet()) {
List<Integer> pathIndexes = deviceQueryIdxMap.get(device);
List<String> sensors = new ArrayList<>();
List<String> ops = new ArrayList<>();
for(int idx: pathIndexes) {
PartialPath path = (PartialPath) paths.get(idx);
sensors.add(path.getMeasurement());
ops.add(groupByTimePlan.getDeduplicatedAggregations().get(idx));
}
QueryRecord record = new GroupByQueryRecord(device, sensors, ops, groupByTimePlan.getStartTime(),
groupByTimePlan.getEndTime(), groupByTimePlan.getInterval(), groupByTimePlan.getSlidingStep());
manager.addRecord(record);
}
} finally {
StorageEngine.getInstance().mergeUnLock(list);
......
......@@ -47,6 +47,9 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
import org.apache.iotdb.db.query.workloadmanager.WorkloadManager;
import org.apache.iotdb.db.query.workloadmanager.queryrecord.AggregationQueryRecord;
import org.apache.iotdb.db.query.workloadmanager.queryrecord.QueryRecord;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
......@@ -102,11 +105,44 @@ public class AggregationExecutor {
// TODO-Cluster: group the paths by storage group to reduce communications
List<StorageGroupProcessor> list = StorageEngine.getInstance()
.mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
WorkloadManager manager = WorkloadManager.getInstance();
// DeviceID -> MeasurementID -> List<Aggregation Operation>
Map<String, Map<String, List<String>>> deviceSensorMap = new HashMap<>();
try {
for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
aggregateOneSeries(entry, aggregateResultList,
aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()), timeFilter,
context);
// Record the query in the map
if (!deviceSensorMap.containsKey(entry.getKey().getDevice())) {
deviceSensorMap.put(entry.getKey().getDevice(), new HashMap<>());
}
Map<String, List<String>> measurementOpMap = deviceSensorMap.get(entry.getKey().getDevice());
if (!measurementOpMap.containsKey(entry.getKey().getMeasurement())) {
measurementOpMap.put(entry.getKey().getMeasurement(), new ArrayList<>());
}
List<Integer> aggrIndexes = entry.getValue();
List<String> ops = measurementOpMap.get(entry.getKey().getMeasurement());
for(int idx = 0; idx < aggrIndexes.size(); ++idx) {
ops.add(aggregationPlan.getDeduplicatedAggregations().get(aggrIndexes.get(idx)));
}
}
// Put the query records into the manager
for (String device: deviceSensorMap.keySet()) {
Map<String, List<String>> measurementOpMap = deviceSensorMap.get(device);
List<String> curDeviceMeasurements = new ArrayList<>();
List<String> curDeviceOps = new ArrayList<>();
for (String measurement: measurementOpMap.keySet()) {
List<String> ops = measurementOpMap.get(measurement);
for(int i = 0; i < ops.size(); ++i) {
curDeviceMeasurements.add(measurement);
curDeviceOps.add(ops.get(i));
}
}
QueryRecord record = new AggregationQueryRecord(device, curDeviceMeasurements, curDeviceOps);
manager.addRecord(record);
}
} finally {
StorageEngine.getInstance().mergeUnLock(list);
......@@ -325,6 +361,38 @@ public class AggregationExecutor {
.getAggrResultByName(aggregations.get(i), type, ascending);
aggregateResults.add(result);
}
// Workload collection
WorkloadManager manager = WorkloadManager.getInstance();
// DeviceID -> MeasurementID -> List<Aggregation Operation>
Map<String, Map<String, List<String>>> deviceSensorMap = new HashMap<>();
// Record the query in the map
for(int i = 0; i < selectedSeries.size(); ++i) {
if (!deviceSensorMap.containsKey(selectedSeries.get(i).getDevice())) {
deviceSensorMap.put(selectedSeries.get(i).getDevice(), new HashMap<>());
}
Map<String, List<String>> measurementOpMap = deviceSensorMap.get(selectedSeries.get(i).getDevice());
if (!measurementOpMap.containsKey(selectedSeries.get(i).getMeasurement())) {
measurementOpMap.put(selectedSeries.get(i).getMeasurement(), new ArrayList<>());
}
List<String> opList = measurementOpMap.get(selectedSeries.get(i).getMeasurement());
opList.add(aggregations.get(i));
}
// Put the query records to the manager
for (String device: deviceSensorMap.keySet()) {
Map<String, List<String>> measurementOpMap = deviceSensorMap.get(device);
List<String> curDeviceMeasurements = new ArrayList<>();
List<String> curDeviceOps = new ArrayList<>();
for (String measurement: measurementOpMap.keySet()) {
List<String> ops = measurementOpMap.get(measurement);
for(int i = 0; i < ops.size(); ++i) {
curDeviceMeasurements.add(measurement);
curDeviceOps.add(ops.get(i));
}
}
QueryRecord record = new AggregationQueryRecord(device, curDeviceMeasurements, curDeviceOps);
manager.addRecord(record);
}
aggregateWithValueFilter(aggregateResults, timestampGenerator, readersOfSelectedSeries);
return constructDataSet(aggregateResults, queryPlan);
}
......
package org.apache.iotdb.db.query.workloadmanager;
import org.apache.iotdb.db.query.workloadmanager.queryrecord.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WorkloadManager {
List<QueryRecord> records = new ArrayList<>();
private final Logger QUERY_RECORD_LOGGER = LoggerFactory.getLogger("QUERY_RECORD");
private final int RECORDS_NUM_THRESHOLD = 300;
private final ExecutorService flushExecutor = Executors.newFixedThreadPool(1);
private WorkloadManager() {
}
private static class WorkloadManagerHolder {
private static final WorkloadManager INSTANCE = new WorkloadManager();
}
private class QueryRecordFlushTask implements Runnable{
List<QueryRecord> records;
Logger QUERY_RECORD_LOGGER;
private QueryRecordFlushTask(List<QueryRecord> r, Logger l) {
records = r;
QUERY_RECORD_LOGGER = l;
}
@Override
public void run() {
for (QueryRecord record : records) {
QUERY_RECORD_LOGGER.info(record.getSql());
}
}
}
public static WorkloadManager getInstance() {
return WorkloadManagerHolder.INSTANCE;
}
public synchronized void addAggregationRecord(String device, List<String> sensors, List<String> ops) {
// add aggregation record
QueryRecord record = new AggregationQueryRecord(device, sensors, ops);
this.addRecord(record);
}
public synchronized void addGroupByQueryRecord(String device, List<String> sensors, List<String> ops,
long startTime, long endTime, long interval, long slidingStep) {
QueryRecord record = new GroupByQueryRecord(device, sensors, ops, startTime, endTime, interval, slidingStep);
this.addRecord(record);
}
public synchronized void addRecord(QueryRecord record) {
records.add(record);
if (records.size() > RECORDS_NUM_THRESHOLD) {
flushExecutor.execute(new QueryRecordFlushTask(records, QUERY_RECORD_LOGGER));
this.records = new ArrayList<>();
}
}
}
package org.apache.iotdb.db.query.workloadmanager.queryrecord;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class AggregationQueryRecord extends QueryRecord {
public AggregationQueryRecord(String device, List<String> sensorList, List<String> opList) {
sensors = new ArrayList<>(sensorList);
ops = new ArrayList<>(opList);
this.timestamp = new Date().getTime();
this.device = device;
this.recordType = QueryRecordType.AGGREGATION;
recoverSql();
recoverSqlWithTimestamp();
}
private void recoverSql() {
StringBuilder sb = new StringBuilder();
sb.append("SELECT ");
for (int i = 0; i < sensors.size(); ++i) {
sb.append(ops.get(i) + "(" + sensors.get(i) + ")");
if (i != sensors.size() - 1)
sb.append(", ");
}
sb.append(" FROM ");
sb.append(device);
sql = sb.toString();
}
private void recoverSqlWithTimestamp() {
Date d = new Date();
d.setTime(timestamp);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
sqlWithTimestamp = df.format(d) + " " + sql;
}
public String getSql() {
return sql;
}
@Override
public String getSqlWithTimestamp() {
return sqlWithTimestamp;
}
}
package org.apache.iotdb.db.query.workloadmanager.queryrecord;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class GroupByQueryRecord extends QueryRecord {
long startTime;
long endTime;
long interval;
long slidingStep;
public GroupByQueryRecord(String device, List<String> sensors, List<String> ops, long startTime, long endTime, long interval, long slidingStep) {
this.sensors = new ArrayList<>(sensors);
this.ops = new ArrayList<>(ops);
this.device = device;
this.startTime = startTime;
this.endTime = endTime;
this.interval = interval;
this.slidingStep = slidingStep;
this.recordType = QueryRecordType.GROUP_BY;
this.timestamp = new Date().getTime();
recoverSql();
recoverSqlWithTimestamp();
}
public GroupByQueryRecord(String device, List<String> sensors, List<String> ops, long startTime, long endTime, long interval) {
this(device, sensors, ops, startTime, endTime, interval, interval);
}
private void recoverSql() {
StringBuilder sb = new StringBuilder();
sb.append("SELECT ");
for(int i = 0; i < sensors.size(); ++i) {
sb.append(ops.get(i) + "(" + sensors.get(i) + ")");
if (i != sensors.size() - 1) {
sb.append(", ");
}
}
sb.append(" FROM " + device);
sb.append(" GROUP BY ([");
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date dStart = new Date();
dStart.setTime(startTime);
Date dEnd = new Date();
dEnd.setTime(endTime);
sb.append(df.format(dStart));
sb.append(", ");
sb.append(df.format(dEnd));
sb.append("), " + interval + "ms");
if (interval != slidingStep) {
sb.append(", " + slidingStep + "ms");
}
sb.append(")");
sql = sb.toString();
}
private void recoverSqlWithTimestamp() {
Date d = new Date();
d.setTime(timestamp);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
sqlWithTimestamp = df.format(d) + " " + sql;
}
@Override
public String getSql() {
return sql;
}
@Override
public String getSqlWithTimestamp() {
return sqlWithTimestamp;
}
}
\ No newline at end of file
package org.apache.iotdb.db.query.workloadmanager.queryrecord;
import java.util.List;
public abstract class QueryRecord {
protected int hashcode = 0;
protected String device;
protected List<String> sensors;
protected List<String> ops;
protected QueryRecordType recordType;
String sql;
String sqlWithTimestamp;
long timestamp;
public abstract String getSql();
public abstract String getSqlWithTimestamp();
public QueryRecordType getRecordType() {
return recordType;
}
public List<String> getSensors() {
return sensors;
}
public String getDevice() {
return device;
}
}
package org.apache.iotdb.db.query.workloadmanager.queryrecord;
public enum QueryRecordType {
GROUP_BY, AGGREGATION
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册