未验证 提交 62f009c8 编写于 作者: Y yhwang-hbl 提交者: GitHub

[IOTDB-873] Add count devices DDL (#1705)

上级 168cbbd4
......@@ -79,6 +79,7 @@ statement
| TRACING ON #tracingOn
| TRACING OFF #tracingOff
| COUNT TIMESERIES prefixPath? (GROUP BY LEVEL OPERATOR_EQ INT)? #countTimeseries
| COUNT DEVICES prefixPath? #countDevices
| COUNT NODES prefixPath LEVEL OPERATOR_EQ INT #countNodes
| LOAD CONFIGURATION (MINUS GLOBAL)? #loadConfigurationStatement
| {hasSingleQuoteString = true;} LOAD stringLiteral autoCreateSchema?#loadFiles
......
......@@ -840,6 +840,18 @@ public class MManager {
}
}
/**
* To calculate the count of devices for given prefix path.
*/
public int getDevicesNum(PartialPath prefixPath) throws MetadataException {
lock.readLock().lock();
try {
return mtree.getDevicesNum(prefixPath);
} finally {
lock.readLock().unlock();
}
}
/**
* To calculate the count of nodes in the given level for given prefix path.
*
......
......@@ -649,6 +649,19 @@ public class MTree implements Serializable {
return getCount(root, nodes, 1);
}
/**
* Get the count of devices under the given prefix path.
*
* @param prefixPath a prefix path or a full path, may contain '*'.
*/
int getDevicesNum(PartialPath prefixPath) throws MetadataException {
String[] nodes = prefixPath.getNodes();
if (nodes.length == 0 || !nodes[0].equals(root.getName())) {
throw new IllegalPathException(prefixPath.getFullPath());
}
return getDevicesCount(root, nodes, 1);
}
/**
* Get the count of nodes in the given level under the given prefix path.
*/
......@@ -695,6 +708,33 @@ public class MTree implements Serializable {
}
}
/**
* Traverse the MTree to get the count of devices.
*/
private int getDevicesCount(MNode node, String[] nodes, int idx) throws MetadataException {
String nodeReg = MetaUtils.getNodeRegByIdx(idx, nodes);
int cnt = 0;
if (!(PATH_WILDCARD).equals(nodeReg)) {
if (node.hasChild(nodeReg)) {
if (node.getChild(nodeReg) instanceof MeasurementMNode && idx >= nodes.length) {
cnt++;
} else {
cnt += getDevicesCount(node.getChild(nodeReg), nodes, idx + 1);
}
}
} else {
boolean deviceAdded = false;
for (MNode child : node.getChildren().values()) {
if (child instanceof MeasurementMNode && !deviceAdded) {
cnt++;
deviceAdded = true;
}
cnt += getDevicesCount(child, nodes, idx + 1);
}
}
return cnt;
}
/**
* Traverse the MTree to get the count of timeseries in the given level.
*
......
......@@ -158,6 +158,8 @@ public class SQLConstant {
public static final int TOK_CREATE_SCHEMA_SNAPSHOT = 89;
public static final int TOK_TRACING = 91;
public static final int TOK_COUNT_DEVICES = 92;
public static final Map<Integer, String> tokenSymbol = new HashMap<>();
public static final Map<Integer, String> tokenNames = new HashMap<>();
public static final Map<Integer, Integer> reverseWords = new HashMap<>();
......
......@@ -383,6 +383,8 @@ public class PlanExecutor implements IPlanExecutor {
return processCountTimeSeries((CountPlan) showPlan);
case COUNT_NODE_TIMESERIES:
return processCountNodeTimeSeries((CountPlan) showPlan);
case COUNT_DEVICES:
return processCountDevices((CountPlan) showPlan);
case COUNT_NODES:
return processCountNodes((CountPlan) showPlan);
case MERGE_STATUS:
......@@ -428,6 +430,24 @@ public class PlanExecutor implements IPlanExecutor {
return listDataSet;
}
private QueryDataSet processCountDevices(CountPlan countPlan) throws MetadataException {
int num = getDevicesNum(countPlan.getPath());
SingleDataSet singleDataSet =
new SingleDataSet(
Collections.singletonList(new PartialPath(COLUMN_DEVICES, false)),
Collections.singletonList(TSDataType.INT32));
Field field = new Field(TSDataType.INT32);
field.setIntV(num);
RowRecord record = new RowRecord(0);
record.addField(field);
singleDataSet.setRecord(record);
return singleDataSet;
}
private int getDevicesNum(PartialPath path) throws MetadataException {
return IoTDB.metaManager.getDevicesNum(path);
}
protected int getPathsNum(PartialPath path) throws MetadataException {
return IoTDB.metaManager.getAllTimeseriesCount(path);
}
......
......@@ -50,7 +50,7 @@ public class ShowPlan extends PhysicalPlan {
public enum ShowContentType {
DYNAMIC_PARAMETER, FLUSH_TASK_INFO, TTL, VERSION, TIMESERIES, STORAGE_GROUP, CHILD_PATH, DEVICES,
COUNT_TIMESERIES, COUNT_NODE_TIMESERIES, COUNT_NODES, MERGE_STATUS
COUNT_TIMESERIES, COUNT_NODE_TIMESERIES, COUNT_NODES, MERGE_STATUS, COUNT_DEVICES
}
}
......@@ -84,6 +84,7 @@ import org.apache.iotdb.db.qp.strategy.SqlBaseParser.AsElementContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.AttributeClauseContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.AttributeClausesContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.ConstantContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CountDevicesContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CountNodesContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CountTimeseriesContext;
import org.apache.iotdb.db.qp.strategy.SqlBaseParser.CreateRoleContext;
......@@ -220,6 +221,14 @@ public class LogicalGenerator extends SqlBaseBaseListener {
}
}
@Override
public void enterCountDevices(CountDevicesContext ctx) {
super.enterCountDevices(ctx);
PrefixPathContext pathContext = ctx.prefixPath();
PartialPath path = (pathContext != null ? parsePrefixPath(pathContext) : new PartialPath(SQLConstant.getSingleRootArray()));
initializedOperator = new CountOperator(SQLConstant.TOK_COUNT_DEVICES, path);
}
@Override
public void enterFlush(FlushContext ctx) {
super.enterFlush(ctx);
......
......@@ -239,6 +239,9 @@ public class PhysicalGenerator {
case SQLConstant.TOK_DEVICES:
return new ShowDevicesPlan(
ShowContentType.DEVICES, ((ShowDevicesOperator) operator).getPath());
case SQLConstant.TOK_COUNT_DEVICES:
return new CountPlan(
ShowContentType.COUNT_DEVICES, ((CountOperator) operator).getPath());
case SQLConstant.TOK_COUNT_NODE_TIMESERIES:
return new CountPlan(
ShowContentType.COUNT_NODE_TIMESERIES,
......
......@@ -103,6 +103,10 @@ class StaticResps {
Arrays.asList(COLUMN_COLUMN, COLUMN_COUNT),
Arrays.asList(TSDataType.TEXT.toString(), TSDataType.TEXT.toString()));
static final TSExecuteStatementResp COUNT_DEVICES = getNoTimeExecuteResp(
Collections.singletonList(COLUMN_COUNT),
Collections.singletonList(TSDataType.INT32.toString()));
static final TSExecuteStatementResp COUNT_NODES = getNoTimeExecuteResp(
Collections.singletonList(COLUMN_COUNT),
Collections.singletonList(TSDataType.INT32.toString()));
......
......@@ -726,6 +726,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return StaticResps.COUNT_NODES;
case COUNT_TIMESERIES:
return StaticResps.COUNT_TIMESERIES;
case COUNT_DEVICES:
return StaticResps.COUNT_DEVICES;
case MERGE_STATUS:
return StaticResps.MERGE_STATUS_RESP;
default:
......
......@@ -27,16 +27,19 @@ import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.fail;
/**
* Notice that, all test begins with "IoTDB" is integration test. All test which will start the IoTDB server should be
* defined as integration test.
* Notice that, all test begins with "IoTDB" is integration test. All test which will start the
* IoTDB server should be defined as integration test.
*/
public class IoTDBMetadataFetchIT {
private DatabaseMetaData databaseMetaData;
private static final Logger logger = LoggerFactory.getLogger(IoTDBMetadataFetchIT.class);
private static void insertSQL() throws ClassNotFoundException, SQLException {
Class.forName(Config.JDBC_DRIVER_NAME);
......@@ -46,6 +49,7 @@ public class IoTDBMetadataFetchIT {
String[] insertSqls = new String[]{"SET STORAGE GROUP TO root.ln.wf01.wt01",
"CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE = BOOLEAN, ENCODING = PLAIN",
"CREATE TIMESERIES root.ln.wf01.wt01.status.s1 WITH DATATYPE = BOOLEAN, ENCODING = PLAIN",
"CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE = FLOAT, ENCODING = RLE, "
+ "compressor = SNAPPY, MAX_POINT_NUMBER = 3"};
......@@ -53,7 +57,7 @@ public class IoTDBMetadataFetchIT {
statement.execute(sql);
}
} catch (Exception e) {
e.printStackTrace();
logger.error("insertSQL() failed", e);
fail(e.getMessage());
}
}
......@@ -85,20 +89,23 @@ public class IoTDBMetadataFetchIT {
"show timeseries root.a.b", // nonexistent timeseries, thus returning ""
};
String[] standards = new String[]{
"root.ln.wf01.wt01.status,null,root.ln.wf01.wt01,BOOLEAN,PLAIN,SNAPPY,\n",
"root.ln.wf01.wt01.status,null,root.ln.wf01.wt01,BOOLEAN,PLAIN,SNAPPY,\n"
+ "root.ln.wf01.wt01.status.s1,null,root.ln.wf01.wt01,BOOLEAN,PLAIN,SNAPPY,\n",
"root.ln.wf01.wt01.status,null,root.ln.wf01.wt01,BOOLEAN,PLAIN,SNAPPY,\n"
+ "root.ln.wf01.wt01.status.s1,null,root.ln.wf01.wt01,BOOLEAN,PLAIN,SNAPPY,\n"
+ "root.ln.wf01.wt01.temperature,null,root.ln.wf01.wt01,FLOAT,RLE,SNAPPY,\n",
"root.ln.wf01.wt01.status,null,root.ln.wf01.wt01,BOOLEAN,PLAIN,SNAPPY,\n"
+ "root.ln.wf01.wt01.status.s1,null,root.ln.wf01.wt01,BOOLEAN,PLAIN,SNAPPY,\n"
+ "root.ln.wf01.wt01.temperature,null,root.ln.wf01.wt01,FLOAT,RLE,SNAPPY,\n",
"root.ln.wf01.wt01.status,null,root.ln.wf01.wt01,BOOLEAN,PLAIN,SNAPPY,\n"
+ "root.ln.wf01.wt01.temperature,null,root.ln.wf01.wt01,FLOAT,RLE,SNAPPY,\n",
"",
+ "root.ln.wf01.wt01.status.s1,null,root.ln.wf01.wt01,BOOLEAN,PLAIN,SNAPPY,\n"
+ "root.ln.wf01.wt01.temperature,null,root.ln.wf01.wt01,FLOAT,RLE,SNAPPY,\n",
""};
""
};
for (int n = 0; n < sqls.length; n++) {
String sql = sqls[n];
String standard = standards[n];
......@@ -118,7 +125,7 @@ public class IoTDBMetadataFetchIT {
}
Assert.assertEquals(standard, builder.toString());
} catch (SQLException e) {
e.printStackTrace();
logger.error("showTimeseriesTest() failed", e);
fail(e.getMessage());
}
}
......@@ -152,7 +159,7 @@ public class IoTDBMetadataFetchIT {
}
Assert.assertEquals(builder.toString(), standard);
} catch (SQLException e) {
e.printStackTrace();
logger.error("showStorageGroupTest() failed", e);
fail(e.getMessage());
}
}
......@@ -170,7 +177,7 @@ public class IoTDBMetadataFetchIT {
showTimeseriesInJson();
} catch (Exception e) {
e.printStackTrace();
logger.error("databaseMetaDataTest() failed", e);
fail(e.getMessage());
} finally {
if (connection != null) {
......@@ -188,8 +195,8 @@ public class IoTDBMetadataFetchIT {
String sql = "show version";
try {
boolean hasResultSet = statement.execute(sql);
if(hasResultSet) {
try(ResultSet resultSet = statement.getResultSet()) {
if (hasResultSet) {
try (ResultSet resultSet = statement.getResultSet()) {
resultSet.next();
Assert.assertEquals(resultSet.getString(1), IoTDBConstant.VERSION);
}
......@@ -207,7 +214,7 @@ public class IoTDBMetadataFetchIT {
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
String[] sqls = new String[]{"show devices root.ln"};
String[] standards = new String[]{"root.ln.wf01.wt01,\n"};
String[] standards = new String[]{"root.ln.wf01.wt01,\n" + "root.ln.wf01.wt01.status,\n"};
for (int n = 0; n < sqls.length; n++) {
String sql = sqls[n];
String standard = standards[n];
......@@ -227,7 +234,7 @@ public class IoTDBMetadataFetchIT {
}
Assert.assertEquals(builder.toString(), standard);
} catch (SQLException e) {
e.printStackTrace();
logger.error("showDevices() failed", e);
fail(e.getMessage());
}
}
......@@ -261,7 +268,7 @@ public class IoTDBMetadataFetchIT {
}
Assert.assertEquals(builder.toString(), standard);
} catch (SQLException e) {
e.printStackTrace();
logger.error("showChildPaths() failed", e);
fail(e.getMessage());
}
}
......@@ -275,6 +282,40 @@ public class IoTDBMetadataFetchIT {
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
String[] sqls = new String[]{"COUNT TIMESERIES root.ln", "COUNT TIMESERIES"};
String[] standards = new String[]{"3,\n", "3,\n"};
for (int n = 0; n < sqls.length; n++) {
String sql = sqls[n];
String standard = standards[n];
StringBuilder builder = new StringBuilder();
try {
boolean hasResultSet = statement.execute(sql);
if (hasResultSet) {
try (ResultSet resultSet = statement.getResultSet()) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
while (resultSet.next()) {
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
builder.append(resultSet.getString(i)).append(",");
}
builder.append("\n");
}
}
}
Assert.assertEquals(standard, builder.toString());
} catch (SQLException e) {
logger.error("showCountTimeSeries() failed", e);
fail(e.getMessage());
}
}
}
}
@Test
public void showCountDevices() throws SQLException, ClassNotFoundException {
Class.forName(Config.JDBC_DRIVER_NAME);
try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
String[] sqls = new String[]{"COUNT DEVICES root.ln", "COUNT DEVICES"};
String[] standards = new String[]{"2,\n", "2,\n"};
for (int n = 0; n < sqls.length; n++) {
String sql = sqls[n];
......@@ -295,7 +336,7 @@ public class IoTDBMetadataFetchIT {
}
Assert.assertEquals(standard, builder.toString());
} catch (SQLException e) {
e.printStackTrace();
logger.error("showCountDevices() failed", e);
fail(e.getMessage());
}
}
......@@ -309,7 +350,7 @@ public class IoTDBMetadataFetchIT {
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
String[] sqls = new String[]{"COUNT TIMESERIES root group by level=1"};
String[] standards = new String[]{"root.ln,2,\n"};
String[] standards = new String[]{"root.ln,3,\n"};
for (int n = 0; n < sqls.length; n++) {
String sql = sqls[n];
String standard = standards[n];
......@@ -329,7 +370,7 @@ public class IoTDBMetadataFetchIT {
}
Assert.assertEquals(builder.toString(), standard);
} catch (SQLException e) {
e.printStackTrace();
logger.error("showCountTimeSeriesGroupBy() failed", e);
fail(e.getMessage());
}
}
......@@ -363,7 +404,7 @@ public class IoTDBMetadataFetchIT {
}
Assert.assertEquals(builder.toString(), standard);
} catch (SQLException e) {
e.printStackTrace();
logger.error("showCountNodes() failed", e);
fail(e.getMessage());
}
}
......@@ -371,8 +412,6 @@ public class IoTDBMetadataFetchIT {
}
/**
* show metadata in json
*/
......@@ -394,10 +433,12 @@ public class IoTDBMetadataFetchIT {
+ "\t\t\t\t\t\t\"Encoding\":\"RLE\"\n"
+ "\t\t\t\t\t},\n"
+ "\t\t\t\t\t\"status\":{\n"
+ "\t\t\t\t\t\t\"StorageGroup\":\"root.ln.wf01.wt01\",\n"
+ "\t\t\t\t\t\t\"DataType\":\"BOOLEAN\",\n"
+ "\t\t\t\t\t\t\"Compressor\":\"SNAPPY\",\n"
+ "\t\t\t\t\t\t\"Encoding\":\"PLAIN\"\n"
+ "\t\t\t\t\t\t\"s1\":{\n"
+ "\t\t\t\t\t\t\t\"StorageGroup\":\"root.ln.wf01.wt01\",\n"
+ "\t\t\t\t\t\t\t\"DataType\":\"BOOLEAN\",\n"
+ "\t\t\t\t\t\t\t\"Compressor\":\"SNAPPY\",\n"
+ "\t\t\t\t\t\t\t\"Encoding\":\"PLAIN\"\n"
+ "\t\t\t\t\t\t}\n"
+ "\t\t\t\t\t}\n"
+ "\t\t\t\t}\n"
+ "\t\t\t}\n"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册