未验证 提交 922075b1 编写于 作者: H Hang Ji 提交者: GitHub

[IOTDB-1417] UDF meta plans for cluster (#3503)

上级 abfa6954
......@@ -29,10 +29,12 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.ClearCachePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateFunctionPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateSnapshotPlan;
import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan.LoadConfigurationPlanType;
......@@ -90,6 +92,8 @@ public class PartitionUtils {
// DataAuthPlan is global because all nodes must have all user info
|| plan instanceof DataAuthPlan
|| plan instanceof CreateTemplatePlan
|| plan instanceof CreateFunctionPlan
|| plan instanceof DropFunctionPlan
|| plan instanceof CreateSnapshotPlan;
}
......
......@@ -38,6 +38,7 @@ import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
import org.apache.iotdb.db.qp.physical.sys.ClearCachePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateFunctionPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateIndexPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateSnapshotPlan;
......@@ -47,6 +48,7 @@ import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
import org.apache.iotdb.db.qp.physical.sys.DropIndexPlan;
import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
......@@ -392,6 +394,12 @@ public abstract class PhysicalPlan {
case CLEARCACHE:
plan = new ClearCachePlan();
break;
case CREATE_FUNCTION:
plan = new CreateFunctionPlan();
break;
case DROP_FUNCTION:
plan = new DropFunctionPlan();
break;
default:
throw new IOException("unrecognized log type " + type);
}
......@@ -453,7 +461,9 @@ public abstract class PhysicalPlan {
SHOW_CONTINUOUS_QUERIES,
MERGE,
CREATE_SNAPSHOT,
CLEARCACHE
CLEARCACHE,
CREATE_FUNCTION,
DROP_FUNCTION
}
public long getIndex() {
......
......@@ -19,18 +19,26 @@
package org.apache.iotdb.db.qp.physical.sys;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class CreateFunctionPlan extends PhysicalPlan {
private final boolean isTemporary;
private final String udfName;
private final String className;
private boolean isTemporary;
private String udfName;
private String className;
public CreateFunctionPlan() {
super(false, OperatorType.CREATE_FUNCTION);
}
public CreateFunctionPlan(boolean isTemporary, String udfName, String className) {
super(false, OperatorType.CREATE_FUNCTION);
......@@ -51,8 +59,39 @@ public class CreateFunctionPlan extends PhysicalPlan {
return className;
}
public void setTemporary(boolean temporary) {
isTemporary = temporary;
}
public void setUdfName(String udfName) {
this.udfName = udfName;
}
public void setClassName(String className) {
this.className = className;
}
@Override
public List<PartialPath> getPaths() {
return new ArrayList<>();
}
@Override
public void serialize(DataOutputStream outputStream) throws IOException {
outputStream.writeByte((byte) PhysicalPlanType.CREATE_FUNCTION.ordinal());
outputStream.writeBoolean(isTemporary);
putString(outputStream, udfName);
putString(outputStream, className);
outputStream.writeLong(index);
}
@Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
isTemporary = buffer.get() == 1;
udfName = readString(buffer);
className = readString(buffer);
this.index = buffer.getLong();
}
}
......@@ -19,16 +19,24 @@
package org.apache.iotdb.db.qp.physical.sys;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class DropFunctionPlan extends PhysicalPlan {
private final String udfName;
private String udfName;
public DropFunctionPlan() {
super(false, OperatorType.DROP_FUNCTION);
}
public DropFunctionPlan(String udfName) {
super(false, OperatorType.DROP_FUNCTION);
......@@ -43,4 +51,19 @@ public class DropFunctionPlan extends PhysicalPlan {
public List<PartialPath> getPaths() {
return new ArrayList<>();
}
@Override
public void serialize(DataOutputStream outputStream) throws IOException {
outputStream.writeByte((byte) PhysicalPlanType.DROP_FUNCTION.ordinal());
putString(outputStream, udfName);
outputStream.writeLong(index);
}
@Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
udfName = readString(buffer);
this.index = buffer.getLong();
}
}
......@@ -49,6 +49,7 @@ import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowContinuousQueriesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowFunctionsPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType;
import org.apache.iotdb.db.qp.physical.sys.ShowTriggersPlan;
......@@ -1449,4 +1450,13 @@ public class PhysicalPlanTest {
(ShowContinuousQueriesPlan) processor.parseSQLToPhysicalPlan(sql);
Assert.assertTrue(plan.isQuery());
}
@Test
public void testShowFunction() throws QueryProcessException {
String sql = "SHOW FUNCTIONS";
ShowFunctionsPlan plan = (ShowFunctionsPlan) processor.parseSQLToPhysicalPlan(sql);
Assert.assertTrue(plan.isQuery());
Assert.assertEquals(ShowContentType.FUNCTIONS, plan.getShowContentType());
}
}
......@@ -55,6 +55,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
......
......@@ -18,6 +18,13 @@
*/
package org.apache.iotdb.db.sql;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.Planner;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateFunctionPlan;
import org.apache.iotdb.db.qp.physical.sys.DropFunctionPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowFunctionsPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
......@@ -46,6 +53,7 @@ public abstract class Cases {
protected Statement[] readStatements;
protected Connection[] readConnections;
protected Session session;
private final Planner processor = new Planner();
/** initialize the writeStatement,writeConnection, readStatements and the readConnections. */
public abstract void init() throws Exception;
......@@ -324,6 +332,64 @@ public abstract class Cases {
session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
}
@Test
public void testCreateFunctionPlan1() {
try {
PhysicalPlan plan =
processor.parseSQLToPhysicalPlan(
"create function udf as \"org.apache.iotdb.db.query.udf.example.Adder\"");
if (plan.isQuery() || !(plan instanceof CreateFunctionPlan)) {
Assert.fail();
}
CreateFunctionPlan createFunctionPlan = (CreateFunctionPlan) plan;
Assert.assertEquals("udf", createFunctionPlan.getUdfName());
Assert.assertEquals(
"org.apache.iotdb.db.query.udf.example.Adder", createFunctionPlan.getClassName());
Assert.assertFalse(createFunctionPlan.isTemporary());
} catch (QueryProcessException e) {
Assert.fail(e.toString());
}
}
@Test
public void testCreateFunctionPlan2() { // create temporary function
try {
PhysicalPlan plan =
processor.parseSQLToPhysicalPlan(
"create temporary function udf as \"org.apache.iotdb.db.query.udf.example.Adder\"");
if (plan.isQuery() || !(plan instanceof CreateFunctionPlan)) {
Assert.fail();
}
CreateFunctionPlan createFunctionPlan = (CreateFunctionPlan) plan;
Assert.assertEquals("udf", createFunctionPlan.getUdfName());
Assert.assertEquals(
"org.apache.iotdb.db.query.udf.example.Adder", createFunctionPlan.getClassName());
Assert.assertTrue(createFunctionPlan.isTemporary());
} catch (QueryProcessException e) {
Assert.fail(e.toString());
}
}
@Test
public void testDropFunctionPlan() { // drop function
try {
DropFunctionPlan dropFunctionPlan =
(DropFunctionPlan) processor.parseSQLToPhysicalPlan("drop function udf");
Assert.assertEquals("udf", dropFunctionPlan.getUdfName());
} catch (QueryProcessException e) {
Assert.fail(e.toString());
}
}
@Test
public void testShowFunction() throws QueryProcessException {
String sql = "SHOW FUNCTIONS";
ShowFunctionsPlan plan = (ShowFunctionsPlan) processor.parseSQLToPhysicalPlan(sql);
Assert.assertTrue(plan.isQuery());
Assert.assertEquals(ShowPlan.ShowContentType.FUNCTIONS, plan.getShowContentType());
}
// test https://issues.apache.org/jira/browse/IOTDB-1407
@Test
public void showTimeseriesTagsTest() throws SQLException {
......@@ -408,9 +474,11 @@ public abstract class Cases {
}
@Test
public void testApplyClearCache() {
public void testApplyClearCache() throws InterruptedException {
String sql = "CLEAR CACHE";
try {
// Wait for 3S so that the leader can be elected
Thread.sleep(3000);
writeStatement.execute(sql);
} catch (SQLException e) {
Assert.assertNull(e);
......@@ -418,9 +486,11 @@ public abstract class Cases {
}
@Test
public void testApplyMerge() {
public void testApplyMerge() throws InterruptedException {
String sql = "MERGE";
try {
// Wait for 3S so that the leader can be elected
Thread.sleep(3000);
writeStatement.execute(sql);
} catch (SQLException e) {
Assert.assertNull(e);
......@@ -428,9 +498,11 @@ public abstract class Cases {
}
@Test
public void testCreateSnapshot() {
public void testCreateSnapshot() throws InterruptedException {
String sql = "CREATE SNAPSHOT FOR SCHEMA";
try {
// Wait for 3S so that the leader can be elected
Thread.sleep(3000);
writeStatement.execute(sql);
} catch (SQLException e) {
Assert.assertNull(e);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册