提交 6e2dcead 编写于 作者: L lta

add author and loaddata plan codec

上级 1f626c4b
......@@ -20,6 +20,7 @@ package org.apache.iotdb.db.qp.physical.sys;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.tsfile.read.common.Path;
......@@ -54,4 +55,22 @@ public class LoadDataPlan extends PhysicalPlan {
public String getMeasureType() {
return measureType;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof LoadDataPlan)) {
return false;
}
LoadDataPlan that = (LoadDataPlan) o;
return Objects.equals(getInputFilePath(), that.getInputFilePath()) &&
Objects.equals(getMeasureType(), that.getMeasureType());
}
@Override
public int hashCode() {
return Objects.hash(getInputFilePath(), getMeasureType());
}
}
......@@ -37,6 +37,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
import org.apache.iotdb.db.qp.physical.sys.MetadataPlan;
import org.apache.iotdb.db.utils.ByteBufferUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
......@@ -312,7 +313,7 @@ public class CodecInstances {
if (permissionListLen != -1) {
permissions = new HashSet<>(permissionListLen);
for(int i =0 ; i < permissionListLen; i ++) {
permissions.add(buffer.getInt());
permissions.add(buffer.getInt());
}
}
AuthorPlan authorPlan = null;
......@@ -327,4 +328,35 @@ public class CodecInstances {
}
};
static final Codec<LoadDataPlan> loadDataPlanCodec = new Codec<LoadDataPlan>() {
ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<>();
@Override
public byte[] encode(LoadDataPlan plan) {
int type = SystemLogOperator.LOADDATA;
if (localBuffer.get() == null) {
localBuffer.set(ByteBuffer.allocate(config.getMaxLogEntrySize()));
}
ByteBuffer buffer = localBuffer.get();
buffer.clear();
buffer.put((byte) type);
ByteBufferUtils.putString(buffer, plan.getInputFilePath());
ByteBufferUtils.putString(buffer, plan.getMeasureType());
return Arrays.copyOfRange(buffer.array(), 0, buffer.position());
}
@Override
public LoadDataPlan decode(byte[] bytes) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
buffer.get(); // read and skip an int representing "type"
String inputFilePath = ByteBufferUtils.readString(buffer);
String measureType = ByteBufferUtils.readString(buffer);
return new LoadDataPlan(inputFilePath, measureType);
}
};
}
......@@ -39,7 +39,8 @@ public enum PhysicalPlanCodec {
UPDATEPLAN(SystemLogOperator.UPDATE, CodecInstances.updatePlanCodec),
DELETEPLAN(SystemLogOperator.DELETE, CodecInstances.deletePlanCodec),
METADATAPLAN(SystemLogOperator.METADATA, CodecInstances.metadataPlanCodec),
AUTHORPLAN(SystemLogOperator.AUTHOR, CodecInstances.authorPlanCodec);
AUTHORPLAN(SystemLogOperator.AUTHOR, CodecInstances.authorPlanCodec),
LOADDATAPLAN(SystemLogOperator.LOADDATA, CodecInstances.loadDataPlanCodec);
private static final HashMap<Integer, PhysicalPlanCodec> codecMap = new HashMap<>();
......
......@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
import org.apache.iotdb.db.qp.physical.sys.MetadataPlan;
public class PhysicalPlanLogTransfer {
......@@ -46,7 +47,9 @@ public class PhysicalPlanLogTransfer {
codec = (Codec<PhysicalPlan>) PhysicalPlanCodec.fromOpcode(SystemLogOperator.METADATA).codec;
} else if (plan instanceof AuthorPlan) {
codec = (Codec<PhysicalPlan>) PhysicalPlanCodec.fromOpcode(SystemLogOperator.AUTHOR).codec;
} else {
} else if (plan instanceof LoadDataPlan) {
codec = (Codec<PhysicalPlan>) PhysicalPlanCodec.fromOpcode(SystemLogOperator.LOADDATA).codec;
} else{
throw new UnsupportedOperationException(
"SystemLogOperator given is not supported. " + plan.getOperatorType());
}
......
......@@ -30,4 +30,5 @@ public class SystemLogOperator {
public static final int DELETE = 2;
public static final int METADATA = 3;
public static final int AUTHOR = 4;
public static final int LOADDATA = 5;
}
......@@ -31,6 +31,7 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
import org.apache.iotdb.db.qp.physical.sys.MetadataPlan;
import org.apache.iotdb.db.qp.utils.MemIntQpExecutor;
import org.apache.iotdb.tsfile.read.common.Path;
......@@ -44,25 +45,30 @@ public class PhysicalPlanLogTransferTest {
private DeletePlan deletePlan = new DeletePlan(50, new Path("root.vehicle.device"));
private UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0",
new Path("root.vehicle.device.sensor"));
private LoadDataPlan loadDataPlan = new LoadDataPlan("/tmp/data/vehicle","sensor");
@Test
public void operatorToLog()
throws IOException, ArgsErrorException, ProcessorException, QueryProcessorException {
/** Insert Plan test **/
byte[] insertPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(insertPlan);
Codec<InsertPlan> insertPlanCodec = CodecInstances.multiInsertPlanCodec;
byte[] insertPlanProperty = insertPlanCodec.encode(insertPlan);
assertEquals(true, Arrays.equals(insertPlanProperty, insertPlanBytesTest));
/** Delete Plan test **/
byte[] deletePlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(deletePlan);
Codec<DeletePlan> deletePlanCodec = CodecInstances.deletePlanCodec;
byte[] deletePlanProperty = deletePlanCodec.encode(deletePlan);
assertEquals(true, Arrays.equals(deletePlanProperty, deletePlanBytesTest));
/** Update Plan test **/
byte[] updatePlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(updatePlan);
Codec<UpdatePlan> updatePlanCodec = CodecInstances.updatePlanCodec;
byte[] updatePlanProperty = updatePlanCodec.encode(updatePlan);
assertEquals(true, Arrays.equals(updatePlanProperty, updatePlanBytesTest));
/** Metadata Plan test **/
String metadataStatement = "create timeseries root.vehicle.d1.s1 with datatype=INT32,encoding=RLE";
MetadataPlan metadataPlan = (MetadataPlan) processor.parseSQLToPhysicalPlan(metadataStatement);
byte[] metadataPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(metadataPlan);
......@@ -70,6 +76,7 @@ public class PhysicalPlanLogTransferTest {
byte[] metadataPlanProperty = metadataPlanCodec.encode(metadataPlan);
assertEquals(true, Arrays.equals(metadataPlanProperty, metadataPlanBytesTest));
/** Author Plan test **/
String sql = "grant role xm privileges 'SET_STORAGE_GROUP','DELETE_TIMESERIES' on root.vehicle.device.sensor";
AuthorPlan authorPlan = (AuthorPlan) processor.parseSQLToPhysicalPlan(sql);
byte[] authorPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(authorPlan);
......@@ -77,27 +84,37 @@ public class PhysicalPlanLogTransferTest {
byte[] authorPlanProperty = authorPlanCodec.encode(authorPlan);
assertEquals(true, Arrays.equals(authorPlanProperty, authorPlanBytesTest));
/** LoadData Plan test **/
byte[] loadDataPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(loadDataPlan);
Codec<LoadDataPlan> loadDataPlanCodec = CodecInstances.loadDataPlanCodec;
byte[] loadDataPlanProperty = loadDataPlanCodec.encode(loadDataPlan);
assertEquals(true, Arrays.equals(loadDataPlanProperty, loadDataPlanBytesTest));
}
@Test
public void logToOperator()
throws IOException, ArgsErrorException, ProcessorException, QueryProcessorException, AuthException {
/** Insert Plan test **/
byte[] insertPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(insertPlan);
InsertPlan insertPlanTest = (InsertPlan) PhysicalPlanLogTransfer
.logToOperator(insertPlanBytesTest);
assertEquals(true, insertPlanTest.equals(insertPlan));
/** Delete Plan test **/
byte[] deletePlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(deletePlan);
DeletePlan deletePlanTest = (DeletePlan) PhysicalPlanLogTransfer
.logToOperator(deletePlanBytesTest);
assertEquals(true, deletePlanTest.equals(deletePlan));
/** Update Plan test **/
byte[] updatePlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(updatePlan);
UpdatePlan updatePlanTest = (UpdatePlan) PhysicalPlanLogTransfer
.logToOperator(updatePlanBytesTest);
assertEquals(true, updatePlanTest.equals(updatePlan));
/** Metadata Plan test **/
String metadataStatement = "create timeseries root.vehicle.d1.s1 with datatype=INT32,encoding=RLE";
MetadataPlan metadataPlan = (MetadataPlan) processor.parseSQLToPhysicalPlan(metadataStatement);
byte[] metadataPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(metadataPlan);
......@@ -105,11 +122,19 @@ public class PhysicalPlanLogTransferTest {
.logToOperator(metadataPlanBytesTest);
assertEquals(true, metadataPlanTest.equals(metadataPlan));
/** Author Plan test **/
String sql = "grant role xm privileges 'SET_STORAGE_GROUP','DELETE_TIMESERIES' on root.vehicle.device.sensor";
AuthorPlan authorPlan = (AuthorPlan) processor.parseSQLToPhysicalPlan(sql);
byte[] authorPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(authorPlan);
AuthorPlan authorPlanTest = (AuthorPlan) PhysicalPlanLogTransfer.logToOperator(authorPlanBytesTest);
AuthorPlan authorPlanTest = (AuthorPlan) PhysicalPlanLogTransfer
.logToOperator(authorPlanBytesTest);
assertEquals(true, authorPlanTest.equals(authorPlan));
/** LoadData Plan test **/
byte[] loadDataPlanBytesTest = PhysicalPlanLogTransfer.operatorToLog(loadDataPlan);
LoadDataPlan loadDataPlanTest = (LoadDataPlan) PhysicalPlanLogTransfer
.logToOperator(loadDataPlanBytesTest);
assertEquals(true, loadDataPlan.equals(loadDataPlanTest));
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册