未验证 提交 d71da597 编写于 作者: H Haimei Guo 提交者: GitHub

IOTDB-851: enhance failure tolerance when recover WAL (#1665)

上级 b3ea173d
......@@ -688,10 +688,15 @@ public class MManager {
MeasurementSchema[] measurementSchemas = new MeasurementSchema[measurements.length];
for (int i = 0; i < measurementSchemas.length; i++) {
if (!deviceNode.hasChild(measurements[i])) {
throw new MetadataException(measurements[i] + " does not exist in " + deviceId);
if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
measurementSchemas[i] = null;
} else {
throw new MetadataException(measurements[i] + " does not exist in " + deviceId);
}
} else {
measurementSchemas[i] = ((MeasurementMNode) deviceNode.getChild(measurements[i]))
.getSchema();
}
measurementSchemas[i] = ((MeasurementMNode) deviceNode.getChild(measurements[i]))
.getSchema();
}
return measurementSchemas;
} finally {
......
......@@ -165,11 +165,17 @@ public class LogReplayer {
}
if (plan instanceof InsertRowPlan) {
InsertRowPlan tPlan = (InsertRowPlan) plan;
//only infer type when users pass a String value
//WAL already serializes the real data type, so no need to infer type
((InsertRowPlan) plan).setNeedInferType(false);
tPlan.setSchemasAndTransferType(schemas);
//mark failed plan manually
checkDataTypeAndMarkFailed(schemas, tPlan);
recoverMemTable.insert(tPlan);
} else {
InsertTabletPlan tPlan = (InsertTabletPlan) plan;
tPlan.setSchemas(schemas);
checkDataTypeAndMarkFailed(schemas, tPlan);
recoverMemTable.insertTablet(tPlan, 0, tPlan.getRowCount());
}
}
......@@ -179,4 +185,12 @@ public class LogReplayer {
// TODO: support update
throw new UnsupportedOperationException("Update not supported");
}
private void checkDataTypeAndMarkFailed(final MeasurementSchema[] schemas, InsertPlan tPlan) {
for (int i = 0; i < schemas.length; i++) {
if (schemas[i] == null || schemas[i].getType() != tPlan.getDataTypes()[i]) {
tPlan.markFailedMeasurementInsertion(i);
}
}
}
}
......@@ -281,4 +281,73 @@ public class IoTDBRestartIT {
EnvironmentUtils.cleanEnv();
}
@Test
public void testRecoverWALMismatchDataType() throws Exception {
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
try(Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
"root");
Statement statement = connection.createStatement()){
statement.execute("insert into root.turbine1.d1(timestamp,s1,s2) values(1,1.1,2.2)");
statement.execute("delete timeseries root.turbine1.d1.s1");
statement.execute("create timeseries root.turbine1.d1.s1 with datatype=INT32, encoding=RLE, compression=SNAPPY");
}
EnvironmentUtils.restartDaemon();
try(Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
"root");
Statement statement = connection.createStatement()){
boolean hasResultSet = statement.execute("select * from root");
assertTrue(hasResultSet);
ResultSet resultSet = statement.getResultSet();
int cnt = 0;
while (resultSet.next()) {
cnt++;
}
assertEquals(1, cnt);
}
EnvironmentUtils.cleanEnv();
}
@Test
public void testRecoverWALDeleteSchema() throws Exception {
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
try(Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
"root");
Statement statement = connection.createStatement()){
statement.execute("insert into root.turbine1.d1(timestamp,s1,s2) values(1,1.1,2.2)");
statement.execute("delete timeseries root.turbine1.d1.s1");
}
EnvironmentUtils.restartDaemon();
try(Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
"root");
Statement statement = connection.createStatement()){
boolean hasResultSet = statement.execute("select * from root");
assertTrue(hasResultSet);
ResultSet resultSet = statement.getResultSet();
int cnt = 0;
while (resultSet.next()) {
cnt++;
}
assertEquals(1, cnt);
}
EnvironmentUtils.cleanEnv();
}
}
......@@ -21,11 +21,14 @@ package org.apache.iotdb.db.writelog.recover;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
......@@ -37,11 +40,15 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
......@@ -51,6 +58,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
......@@ -90,8 +99,8 @@ public class LogReplayerTest {
IoTDB.metaManager.setStorageGroup(new PartialPath("root.sg"));
try {
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 5; j++) {
for (int i = 0; i <= 5; i++) {
for (int j = 0; j <= 5; j++) {
IoTDB.metaManager
.createTimeseries(new PartialPath("root.sg.device" + i + ".sensor" + j), TSDataType.INT64,
TSEncoding.PLAIN, TSFileDescriptor.getInstance().getConfig().getCompressor(),
......@@ -113,6 +122,7 @@ public class LogReplayerTest {
node.write(new InsertRowPlan(new PartialPath("root.sg.device" + i), i, "sensor" + i, TSDataType.INT64,
String.valueOf(i)));
}
node.write(insertTablePlan());
DeletePlan deletePlan = new DeletePlan(0, 200, new PartialPath("root.sg.device0.sensor0"));
node.write(deletePlan);
node.close();
......@@ -147,6 +157,25 @@ public class LogReplayerTest {
assertEquals(i, tsFileResource.getStartTime("root.sg.device" + i));
assertEquals(i, tsFileResource.getEndTime("root.sg.device" + i));
}
//test insert tablet
for (int i = 0; i < 2 ; i++) {
ReadOnlyMemChunk memChunk = memTable
.query("root.sg.device5", "sensor" + i, TSDataType.INT64,
TSEncoding.PLAIN, Collections.emptyMap(), Long.MIN_VALUE);
//s0 has datatype boolean, but required INT64, will return null
if (i == 0) {
assertNull(memChunk);
} else {
IPointReader iterator = memChunk.getPointReader();
iterator.hasNextTimeValuePair();
for (int time = 0; time < 100; time++) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals(time, timeValuePair.getValue().getLong());
}
}
}
} finally {
modFile.close();
MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + tsFile.getName());
......@@ -155,4 +184,49 @@ public class LogReplayerTest {
tsFile.getParentFile().delete();
}
}
/**
* insert tablet plan, time series expected datatype is INT64
* s0 is set to boolean, it will output null value
* s1 is set to INT64, it will output its value
* @return
* @throws IllegalPathException
* @throws IOException
*/
public InsertTabletPlan insertTablePlan() throws IllegalPathException, IOException {
String[] measurements = new String[2];
measurements[0] = "sensor0";
measurements[1] = "sensor1";
List<Integer> dataTypes = new ArrayList<>();
dataTypes.add(TSDataType.BOOLEAN.ordinal());
dataTypes.add(TSDataType.INT64.ordinal());
String deviceId = "root.sg.device5";
MNode deviceMNode = new MNode(null, deviceId);
deviceMNode.addChild("sensor0", new MeasurementMNode(null, null, null, null));
deviceMNode.addChild("sensor1", new MeasurementMNode(null, null, null, null));
InsertTabletPlan insertTabletPlan = new InsertTabletPlan(new PartialPath(deviceId), measurements, dataTypes);
long[] times = new long[100];
Object[] columns = new Object[2];
columns[0] = new boolean[100];
columns[1] = new long[100];
for (long r = 0; r < 100; r++) {
times[(int)r] = r;
((boolean[]) columns[0])[(int)r] = false;
((long[]) columns[1])[(int)r] = r;
}
insertTabletPlan.setTimes(times);
insertTabletPlan.setColumns(columns);
insertTabletPlan.setRowCount(times.length);
insertTabletPlan.setDeviceMNode(deviceMNode);
insertTabletPlan.setStart(0);
insertTabletPlan.setEnd(100);
return insertTabletPlan;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册