未验证 提交 403f0596 编写于 作者: S SilverNarcissus 提交者: GitHub

[IOTDB-2338] ID Table recovery (#4897)

[IOTDB-2338] ID Table recovery (#4897)
上级 b9e1f151
......@@ -618,8 +618,8 @@ public class MManager {
throw new MetadataException(e);
}
// update id table
if (config.isEnableIDTable()) {
// update id table if not in recovering or disable id table log file
if (config.isEnableIDTable() && (!isRecovering || !config.isEnableIDTableLogFile())) {
IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getPath().getDevicePath());
idTable.createTimeseries(plan);
}
......@@ -713,8 +713,8 @@ public class MManager {
throw new MetadataException(e);
}
// update id table
if (config.isEnableIDTable()) {
// update id table if not in recovering or disable id table log file
if (config.isEnableIDTable() && (!isRecovering || !config.isEnableIDTableLogFile())) {
IDTable idTable = IDTableManager.getInstance().getIDTable(plan.getPrefixPath());
idTable.createAlignedTimeseries(plan);
}
......
......@@ -19,12 +19,19 @@
package org.apache.iotdb.db.metadata.idtable;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.idtable.entry.DiskSchemaEntry;
import org.apache.iotdb.db.metadata.idtable.entry.SchemaEntry;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
......@@ -40,6 +47,9 @@ public class AppendOnlyDiskSchemaManager implements IDiskSchemaManager {
private static final String FILE_NAME = "SeriesKeyMapping.meta";
// file version to distinguish different id table file
private static final String FILE_VERSION = "AppendOnly_V1";
File dataFile;
OutputStream outputStream;
......@@ -51,7 +61,11 @@ public class AppendOnlyDiskSchemaManager implements IDiskSchemaManager {
public AppendOnlyDiskSchemaManager(File dir) {
try {
initFile(dir);
outputStream = new FileOutputStream(dataFile);
outputStream = new FileOutputStream(dataFile, true);
// we write file version to new file
if (loc == 0) {
ReadWriteIOUtils.write(FILE_VERSION, outputStream);
}
} catch (IOException e) {
logger.error(e.getMessage());
throw new IllegalArgumentException("can't initialize disk schema manager at " + dataFile);
......@@ -69,7 +83,7 @@ public class AppendOnlyDiskSchemaManager implements IDiskSchemaManager {
dataFile = new File(dir, FILE_NAME);
if (dataFile.exists()) {
loc = dataFile.length();
if (!checkLastEntry(loc)) {
if (!checkFileConsistency(loc)) {
throw new IOException("File corruption");
}
} else {
......@@ -84,7 +98,12 @@ public class AppendOnlyDiskSchemaManager implements IDiskSchemaManager {
}
}
private boolean checkLastEntry(long pos) {
private boolean checkFileConsistency(long pos) {
// empty file
if (pos == 0) {
return true;
}
// file length is smaller than one int
if (pos <= Integer.BYTES) {
return false;
......@@ -92,20 +111,33 @@ public class AppendOnlyDiskSchemaManager implements IDiskSchemaManager {
pos -= Integer.BYTES;
try (RandomAccessFile randomAccessFile = new RandomAccessFile(dataFile, "r");
FileInputStream inputStream = new FileInputStream(dataFile)) {
BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(dataFile))) {
// check file version
inputStream.mark(Integer.BYTES + (FILE_VERSION.length() << 2));
String version = ReadWriteIOUtils.readString(inputStream);
if (!FILE_VERSION.equals(version)) {
logger.error("File version isn't right, need: {}, actual: {} ", FILE_VERSION, version);
return false;
}
inputStream.reset();
// check last entry
randomAccessFile.seek(pos);
int lastEntrySize = randomAccessFile.readInt();
// last int is not right
if (pos - lastEntrySize < 0) {
logger.error("Last entry size isn't right");
return false;
}
long realSkip = inputStream.skip(pos - lastEntrySize);
// file length isn't right
if (realSkip != pos - lastEntrySize) {
logger.error("File length isn't right");
return false;
}
// try to deserialize last entry
DiskSchemaEntry.deserialize(inputStream);
} catch (Exception e) {
logger.error("can't deserialize last entry, file corruption." + e);
......@@ -117,37 +149,63 @@ public class AppendOnlyDiskSchemaManager implements IDiskSchemaManager {
@Override
public long serialize(DiskSchemaEntry schemaEntry) {
long beforeLoc = loc;
try {
schemaEntry.serialize(outputStream);
loc += schemaEntry.serialize(outputStream);
} catch (IOException e) {
logger.error("failed to serialize schema entry: " + schemaEntry);
throw new IllegalArgumentException("can't serialize disk entry of " + schemaEntry);
}
return 0;
return beforeLoc;
}
@Override
public void recover(IDTable idTable) {
long loc = 0;
try (FileInputStream inputStream = new FileInputStream(dataFile)) {
// read file version
ReadWriteIOUtils.readString(inputStream);
while (inputStream.available() > 0) {
DiskSchemaEntry cur = DiskSchemaEntry.deserialize(inputStream);
SchemaEntry schemaEntry =
new SchemaEntry(
TSDataType.deserialize(cur.type),
TSEncoding.deserialize(cur.encoding),
CompressionType.deserialize(cur.compressor),
loc);
idTable.putSchemaEntry(cur.deviceID, cur.measurementName, schemaEntry, cur.isAligned);
loc += cur.entrySize;
}
} catch (IOException | MetadataException e) {
logger.error("ID table can't recover from log: {}", dataFile);
}
}
@TestOnly
public Collection<DiskSchemaEntry> getAllSchemaEntry() throws IOException {
FileInputStream inputStream = new FileInputStream(dataFile);
List<DiskSchemaEntry> res = new ArrayList<>();
// for test, we read at most 1000 entries.
int maxCount = 1000;
while (maxCount > 0) {
try {
maxCount--;
DiskSchemaEntry cur = DiskSchemaEntry.deserialize(inputStream);
res.add(cur);
} catch (IOException e) {
logger.debug("read finished");
break;
try (FileInputStream inputStream = new FileInputStream(dataFile)) {
// read file version
ReadWriteIOUtils.readString(inputStream);
// for test, we read at most 1000 entries.
int maxCount = 1000;
while (maxCount > 0) {
try {
maxCount--;
DiskSchemaEntry cur = DiskSchemaEntry.deserialize(inputStream);
res.add(cur);
} catch (IOException e) {
logger.debug("read finished");
break;
}
}
}
// free resource
inputStream.close();
return res;
}
......
......@@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
import org.apache.iotdb.db.metadata.idtable.entry.SchemaEntry;
import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.path.AlignedPath;
......@@ -75,24 +76,6 @@ public interface IDTable {
*/
IDeviceID getSeriesSchemas(InsertPlan plan) throws MetadataException;
/**
* update latest flushed time of one timeseries
*
* @param timeseriesID timeseries id
* @param flushTime latest flushed time
* @throws MetadataException throw if this timeseries is not exist
*/
void updateLatestFlushTime(TimeseriesID timeseriesID, long flushTime) throws MetadataException;
/**
* update latest flushed time of one timeseries
*
* @param timeseriesID timeseries id
* @return latest flushed time of one timeseries
* @throws MetadataException throw if this timeseries is not exist
*/
long getLatestFlushedTime(TimeseriesID timeseriesID) throws MetadataException;
/**
* register trigger to the timeseries
*
......@@ -154,6 +137,18 @@ public interface IDTable {
*/
public List<DeviceEntry> getAllDeviceEntry();
/**
* put schema entry to id table, currently used in recover
*
* @param devicePath device path (can be device id formed path)
* @param measurement measurement name
* @param schemaEntry schema entry to put
* @param isAligned is the device aligned
*/
public void putSchemaEntry(
String devicePath, String measurement, SchemaEntry schemaEntry, boolean isAligned)
throws MetadataException;
/**
* translate query path's device path to device id
*
......
......@@ -79,6 +79,7 @@ public class IDTableHashmapImpl implements IDTable {
}
if (config.isEnableIDTableLogFile()) {
IDiskSchemaManager = new AppendOnlyDiskSchemaManager(storageGroupDir);
IDiskSchemaManager.recover(this);
}
}
......@@ -90,7 +91,7 @@ public class IDTableHashmapImpl implements IDTable {
*/
public synchronized void createAlignedTimeseries(CreateAlignedTimeSeriesPlan plan)
throws MetadataException {
DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPrefixPath(), true);
DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPrefixPath().toString(), true);
for (int i = 0; i < plan.getMeasurements().size(); i++) {
PartialPath fullPath =
......@@ -102,6 +103,7 @@ public class IDTableHashmapImpl implements IDTable {
plan.getCompressors().get(i),
deviceEntry.getDeviceID(),
fullPath,
true,
IDiskSchemaManager);
deviceEntry.putSchemaEntry(plan.getMeasurements().get(i), schemaEntry);
}
......@@ -114,7 +116,7 @@ public class IDTableHashmapImpl implements IDTable {
* @throws MetadataException if the device is aligned, throw it
*/
public synchronized void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException {
DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPath().getDevicePath(), false);
DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPath().getDevice(), false);
SchemaEntry schemaEntry =
new SchemaEntry(
plan.getDataType(),
......@@ -122,6 +124,7 @@ public class IDTableHashmapImpl implements IDTable {
plan.getCompressor(),
deviceEntry.getDeviceID(),
plan.getPath(),
false,
IDiskSchemaManager);
deviceEntry.putSchemaEntry(plan.getPath().getMeasurement(), schemaEntry);
}
......@@ -139,7 +142,8 @@ public class IDTableHashmapImpl implements IDTable {
IMeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes();
// 1. get device entry and check align
DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(devicePath, plan.isAligned());
DeviceEntry deviceEntry =
getDeviceEntryWithAlignedCheck(devicePath.toString(), plan.isAligned());
// 2. get schema of each measurement
for (int i = 0; i < measurementList.length; i++) {
......@@ -190,30 +194,6 @@ public class IDTableHashmapImpl implements IDTable {
return deviceEntry.getDeviceID();
}
/**
* update latest flushed time of one timeseries
*
* @param timeseriesID timeseries id
* @param flushTime latest flushed time
* @throws MetadataException throw if this timeseries is not exist
*/
public synchronized void updateLatestFlushTime(TimeseriesID timeseriesID, long flushTime)
throws MetadataException {
getSchemaEntry(timeseriesID).updateLastedFlushTime(flushTime);
}
/**
* update latest flushed time of one timeseries
*
* @param timeseriesID timeseries id
* @return latest flushed time of one timeseries
* @throws MetadataException throw if this timeseries is not exist
*/
public synchronized long getLatestFlushedTime(TimeseriesID timeseriesID)
throws MetadataException {
return getSchemaEntry(timeseriesID).getFlushTime();
}
/**
* register trigger to the timeseries
*
......@@ -224,7 +204,7 @@ public class IDTableHashmapImpl implements IDTable {
public synchronized void registerTrigger(PartialPath fullPath, IMeasurementMNode measurementMNode)
throws MetadataException {
boolean isAligned = measurementMNode.getParent().isAligned();
DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevicePath(), isAligned);
DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevice(), isAligned);
deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUsingTrigger();
}
......@@ -239,7 +219,7 @@ public class IDTableHashmapImpl implements IDTable {
public synchronized void deregisterTrigger(
PartialPath fullPath, IMeasurementMNode measurementMNode) throws MetadataException {
boolean isAligned = measurementMNode.getParent().isAligned();
DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevicePath(), isAligned);
DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevice(), isAligned);
deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUnUsingTrigger();
}
......@@ -305,6 +285,14 @@ public class IDTableHashmapImpl implements IDTable {
return res;
}
@Override
public void putSchemaEntry(
String devicePath, String measurement, SchemaEntry schemaEntry, boolean isAligned)
throws MetadataException {
DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(devicePath, isAligned);
deviceEntry.putSchemaEntry(measurement, schemaEntry);
}
/**
* check whether a time series is exist if exist, check the type consistency if not exist, call
* MManager to create it
......@@ -343,6 +331,7 @@ public class IDTableHashmapImpl implements IDTable {
schema.getCompressor(),
deviceEntry.getDeviceID(),
seriesKey,
deviceEntry.isAligned(),
IDiskSchemaManager);
deviceEntry.putSchemaEntry(measurementMNode.getName(), curEntry);
}
......@@ -372,7 +361,7 @@ public class IDTableHashmapImpl implements IDTable {
* @param isAligned whether the insert plan is aligned
* @return device entry of the timeseries
*/
private DeviceEntry getDeviceEntryWithAlignedCheck(PartialPath deviceName, boolean isAligned)
private DeviceEntry getDeviceEntryWithAlignedCheck(String deviceName, boolean isAligned)
throws MetadataException {
IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(deviceName);
int slot = calculateSlot(deviceID);
......
......@@ -35,6 +35,13 @@ public interface IDiskSchemaManager {
*/
public long serialize(DiskSchemaEntry schemaEntry);
/**
* recover id table from log file
*
* @param idTable id table need to be recovered
*/
public void recover(IDTable idTable);
/**
* get all disk schema entries from file
*
......
......@@ -19,8 +19,11 @@
package org.apache.iotdb.db.metadata.idtable.entry;
import org.apache.iotdb.db.utils.TestOnly;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/** device entry in id table */
public class DeviceEntry {
......@@ -146,4 +149,37 @@ public class DeviceEntry {
flushTimeMapOfEachPartition.clear();
}
// endregion
@TestOnly
public Map<String, SchemaEntry> getMeasurementMap() {
return measurementMap;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DeviceEntry)) {
return false;
}
DeviceEntry that = (DeviceEntry) o;
return isAligned == that.isAligned
&& globalFlushTime == that.globalFlushTime
&& deviceID.equals(that.deviceID)
&& measurementMap.equals(that.measurementMap)
&& lastTimeMapOfEachPartition.equals(that.lastTimeMapOfEachPartition)
&& flushTimeMapOfEachPartition.equals(that.flushTimeMapOfEachPartition);
}
@Override
public int hashCode() {
return Objects.hash(
deviceID,
measurementMap,
isAligned,
lastTimeMapOfEachPartition,
flushTimeMapOfEachPartition,
globalFlushTime);
}
}
......@@ -89,9 +89,9 @@ public class DeviceIDFactory {
.getConfig()
.getDeviceIDTransformationMethod()
.equals("SHA256")) {
getDeviceIDFunction = partialPath -> new SHA256DeviceID(partialPath.toString());
getDeviceIDFunction = SHA256DeviceID::new;
} else {
getDeviceIDFunction = partialPath -> new PlainDeviceID(partialPath.toString());
getDeviceIDFunction = PlainDeviceID::new;
}
}
}
......@@ -29,44 +29,61 @@ import java.io.OutputStream;
* the disk schema entry of schema entry of id table. This is a po class, so every field is public
*/
public class DiskSchemaEntry {
// id form device path, eg: 1#2#3#4
public String deviceID;
// full timeseries path, eg: root.sg1.d1.s1
public String seriesKey;
public long flushTime;
// measurement name of timeseries: eg: s1
public String measurementName;
// timeseries data type
public byte type;
// timeseries encoding type
public byte encoding;
// timeseries compressor type
public byte compressor;
// whether this device is aligned
public boolean isAligned;
// this entry's serialized size
public transient long entrySize;
private DiskSchemaEntry() {}
public DiskSchemaEntry(
String deviceID,
String seriesKey,
long flushTime,
String measurementName,
byte type,
byte encoding,
byte compressor) {
byte compressor,
boolean isAligned) {
this.deviceID = deviceID;
this.seriesKey = seriesKey;
this.flushTime = flushTime;
this.measurementName = measurementName;
this.type = type;
this.encoding = encoding;
this.compressor = compressor;
this.isAligned = isAligned;
}
public int serialize(OutputStream outputStream) throws IOException {
int byteLen = 0;
byteLen += ReadWriteIOUtils.write(deviceID, outputStream);
byteLen += ReadWriteIOUtils.write(seriesKey, outputStream);
byteLen += ReadWriteIOUtils.write(flushTime, outputStream);
byteLen += ReadWriteIOUtils.write(measurementName, outputStream);
byteLen += ReadWriteIOUtils.write(type, outputStream);
byteLen += ReadWriteIOUtils.write(encoding, outputStream);
byteLen += ReadWriteIOUtils.write(compressor, outputStream);
byteLen += ReadWriteIOUtils.write(isAligned, outputStream);
byteLen += ReadWriteIOUtils.write(byteLen, outputStream);
entrySize = byteLen;
return byteLen;
}
......@@ -75,12 +92,14 @@ public class DiskSchemaEntry {
DiskSchemaEntry res = new DiskSchemaEntry();
res.deviceID = ReadWriteIOUtils.readString(inputStream);
res.seriesKey = ReadWriteIOUtils.readString(inputStream);
res.flushTime = ReadWriteIOUtils.readLong(inputStream);
res.measurementName = ReadWriteIOUtils.readString(inputStream);
res.type = ReadWriteIOUtils.readByte(inputStream);
res.encoding = ReadWriteIOUtils.readByte(inputStream);
res.compressor = ReadWriteIOUtils.readByte(inputStream);
res.isAligned = ReadWriteIOUtils.readBool(inputStream);
// read byte len
ReadWriteIOUtils.readInt(inputStream);
res.entrySize = ReadWriteIOUtils.readInt(inputStream);
res.entrySize += Integer.BYTES;
return res;
}
......@@ -94,8 +113,6 @@ public class DiskSchemaEntry {
+ ", seriesKey='"
+ seriesKey
+ '\''
+ ", flushTime="
+ flushTime
+ ", type="
+ type
+ ", encoding="
......
......@@ -34,7 +34,7 @@ public class SHA256DeviceID implements IDeviceID {
long l3;
long l4;
private static final String SEPERATOR = "#";
private static final String SEPARATOR = "#";
/** using lots of message digest for improving parallelism */
private static MessageDigest[] md;
......@@ -71,7 +71,7 @@ public class SHA256DeviceID implements IDeviceID {
* @param deviceID a sha 256 string
*/
private void fromSHA256String(String deviceID) {
String[] part = deviceID.split(SEPERATOR);
String[] part = deviceID.split(SEPARATOR);
l1 = Long.parseLong(part[0]);
l2 = Long.parseLong(part[1]);
l3 = Long.parseLong(part[2]);
......@@ -143,6 +143,6 @@ public class SHA256DeviceID implements IDeviceID {
@Override
public String toStringID() {
return l1 + SEPERATOR + l2 + SEPERATOR + l3 + SEPERATOR + l4;
return l1 + SEPARATOR + l2 + SEPARATOR + l3 + SEPARATOR + l4;
}
}
......@@ -32,6 +32,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.util.Objects;
import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
/**
......@@ -51,8 +53,6 @@ public class SchemaEntry implements ILastCacheContainer {
private TsPrimitiveType lastValue;
private long flushTime;
/** This static field will not occupy memory */
private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
......@@ -66,7 +66,18 @@ public class SchemaEntry implements ILastCacheContainer {
schema |= (((long) compressionType.serialize()) << 16);
lastTime = Long.MIN_VALUE;
flushTime = Long.MIN_VALUE;
}
// used in recover
public SchemaEntry(
TSDataType dataType, TSEncoding encoding, CompressionType compressionType, long diskPos) {
schema |= dataType.serialize();
schema |= (((long) encoding.serialize()) << 8);
schema |= (((long) compressionType.serialize()) << 16);
lastTime = Long.MIN_VALUE;
schema |= (diskPos << 25);
}
public SchemaEntry(
......@@ -75,13 +86,13 @@ public class SchemaEntry implements ILastCacheContainer {
CompressionType compressionType,
IDeviceID deviceID,
PartialPath fullPath,
boolean isAligned,
IDiskSchemaManager IDiskSchemaManager) {
schema |= dataType.serialize();
schema |= (((long) encoding.serialize()) << 8);
schema |= (((long) compressionType.serialize()) << 16);
lastTime = Long.MIN_VALUE;
flushTime = Long.MIN_VALUE;
// write log file
if (config.isEnableIDTableLogFile()) {
......@@ -89,10 +100,11 @@ public class SchemaEntry implements ILastCacheContainer {
new DiskSchemaEntry(
deviceID.toStringID(),
fullPath.getFullPath(),
flushTime,
fullPath.getMeasurement(),
dataType.serialize(),
encoding.serialize(),
compressionType.serialize());
compressionType.serialize(),
isAligned);
schema |= (IDiskSchemaManager.serialize(diskSchemaEntry) << 25);
}
}
......@@ -124,10 +136,6 @@ public class SchemaEntry implements ILastCacheContainer {
return CompressionType.deserialize((byte) (schema >> 16));
}
public void updateLastedFlushTime(long lastFlushTime) {
flushTime = Math.max(flushTime, lastFlushTime);
}
public boolean isUsingTrigger() {
return ((schema >> 24) & 1) == 1;
}
......@@ -149,10 +157,6 @@ public class SchemaEntry implements ILastCacheContainer {
return lastValue;
}
public long getFlushTime() {
return flushTime;
}
// region last cache
@Override
public TimeValuePair getCachedLast() {
......@@ -189,5 +193,24 @@ public class SchemaEntry implements ILastCacheContainer {
public boolean isEmpty() {
return lastValue == null;
}
@Override
// Notice that we only compare schema
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof SchemaEntry)) {
return false;
}
SchemaEntry that = (SchemaEntry) o;
return schema == that.schema;
}
@Override
// Notice that we only compare schema
public int hashCode() {
return Objects.hash(schema);
}
// endregion
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.metadata.idtable;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.Planner;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class IDTableRecoverTest {
private final Planner processor = new Planner();
private boolean isEnableIDTable = false;
private String originalDeviceIDTransformationMethod = null;
private boolean isEnableIDTableLogFile = false;
@Before
public void before() {
IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
isEnableIDTable = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
originalDeviceIDTransformationMethod =
IoTDBDescriptor.getInstance().getConfig().getDeviceIDTransformationMethod();
isEnableIDTableLogFile = IoTDBDescriptor.getInstance().getConfig().isEnableIDTableLogFile();
IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(true);
IoTDBDescriptor.getInstance().getConfig().setDeviceIDTransformationMethod("SHA256");
IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(true);
EnvironmentUtils.envSetUp();
}
@After
public void clean() throws IOException, StorageEngineException {
IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(isEnableIDTable);
IoTDBDescriptor.getInstance()
.getConfig()
.setDeviceIDTransformationMethod(originalDeviceIDTransformationMethod);
IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(isEnableIDTableLogFile);
EnvironmentUtils.cleanEnv();
}
@Test
public void testRecover() throws Exception {
insertDataInMemoryWithTablet(false);
insertDataInMemoryWithRecord(false);
PlanExecutor executor = new PlanExecutor();
PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
executor.processNonQuery(flushPlan);
IDTable idTable = IDTableManager.getInstance().getIDTable(new PartialPath("root.isp"));
List<DeviceEntry> memoryList = idTable.getAllDeviceEntry();
// restart
try {
EnvironmentUtils.restartDaemon();
} catch (Exception e) {
Assert.fail();
}
// check id table fields
idTable = IDTableManager.getInstance().getIDTable(new PartialPath("root.isp.d1"));
List<DeviceEntry> recoverList = idTable.getAllDeviceEntry();
assertEquals(memoryList, recoverList);
}
@Test
public void testRecoverAligned() throws Exception {
insertDataInMemoryWithTablet(true);
insertDataInMemoryWithRecord(false);
PlanExecutor executor = new PlanExecutor();
PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
executor.processNonQuery(flushPlan);
IDTable idTable = IDTableManager.getInstance().getIDTable(new PartialPath("root.isp"));
List<DeviceEntry> memoryList = idTable.getAllDeviceEntry();
// restart
try {
EnvironmentUtils.restartDaemon();
} catch (Exception e) {
Assert.fail();
}
// check id table fields
idTable = IDTableManager.getInstance().getIDTable(new PartialPath("root.isp.d1"));
List<DeviceEntry> recoverList = idTable.getAllDeviceEntry();
assertEquals(memoryList, recoverList);
}
private void insertDataInMemoryWithRecord(boolean isAligned)
throws IllegalPathException, QueryProcessException {
long time = 100L;
TSDataType[] dataTypes =
new TSDataType[] {
TSDataType.DOUBLE,
TSDataType.FLOAT,
TSDataType.INT64,
TSDataType.INT32,
TSDataType.BOOLEAN,
TSDataType.TEXT
};
String[] columns = new String[6];
columns[0] = 1.0 + "";
columns[1] = 2 + "";
columns[2] = 10000 + "";
columns[3] = 100 + "";
columns[4] = false + "";
columns[5] = "hh" + 0;
InsertRowPlan insertRowPlan =
new InsertRowPlan(
new PartialPath("root.isp.d1"),
time,
new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
dataTypes,
columns);
insertRowPlan.setAligned(isAligned);
PlanExecutor executor = new PlanExecutor();
executor.insert(insertRowPlan);
}
private void insertDataInMemoryWithTablet(boolean isAligned)
throws IllegalPathException, QueryProcessException {
long[] times = new long[] {110L, 111L, 112L, 113L};
List<Integer> dataTypes = new ArrayList<>();
dataTypes.add(TSDataType.DOUBLE.ordinal());
dataTypes.add(TSDataType.FLOAT.ordinal());
dataTypes.add(TSDataType.INT64.ordinal());
dataTypes.add(TSDataType.INT32.ordinal());
dataTypes.add(TSDataType.BOOLEAN.ordinal());
dataTypes.add(TSDataType.TEXT.ordinal());
Object[] columns = new Object[6];
columns[0] = new double[4];
columns[1] = new float[4];
columns[2] = new long[4];
columns[3] = new int[4];
columns[4] = new boolean[4];
columns[5] = new Binary[4];
for (int r = 0; r < 4; r++) {
((double[]) columns[0])[r] = 10.0 + r;
((float[]) columns[1])[r] = 20 + r;
((long[]) columns[2])[r] = 100000 + r;
((int[]) columns[3])[r] = 1000 + r;
((boolean[]) columns[4])[r] = false;
((Binary[]) columns[5])[r] = new Binary("mm" + r);
}
InsertTabletPlan tabletPlan =
new InsertTabletPlan(
new PartialPath("root.isp.d2"),
new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
dataTypes);
tabletPlan.setTimes(times);
tabletPlan.setColumns(columns);
tabletPlan.setRowCount(times.length);
tabletPlan.setAligned(isAligned);
PlanExecutor executor = new PlanExecutor();
executor.insertTablet(tabletPlan);
}
}
......@@ -26,7 +26,6 @@ import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.idtable.entry.TimeseriesID;
import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
......@@ -648,16 +647,6 @@ public class IDTableTest {
assertEquals(new TsPrimitiveType.TsLong(2L), cacheContainer.getCachedLast().getValue());
assertEquals(110L, cacheContainer.getCachedLast().getTimestamp());
// flush time
TimeseriesID timeseriesID =
new TimeseriesID(new PartialPath("root.laptop.d1.non_aligned_device.s1"));
idTable.updateLatestFlushTime(timeseriesID, 10L);
assertEquals(10L, idTable.getLatestFlushedTime(timeseriesID));
idTable.updateLatestFlushTime(timeseriesID, 8L);
assertEquals(10L, idTable.getLatestFlushedTime(timeseriesID));
idTable.updateLatestFlushTime(timeseriesID, 12L);
assertEquals(12L, idTable.getLatestFlushedTime(timeseriesID));
} catch (MetadataException e) {
e.printStackTrace();
fail("throw exception");
......
......@@ -69,6 +69,8 @@ public class QueryWithIDTableTest {
private String originalDeviceIDTransformationMethod = null;
private boolean isEnableIDTableLogFile = false;
Set<String> retSet =
new HashSet<>(
Arrays.asList(
......@@ -140,9 +142,11 @@ public class QueryWithIDTableTest {
isEnableIDTable = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
originalDeviceIDTransformationMethod =
IoTDBDescriptor.getInstance().getConfig().getDeviceIDTransformationMethod();
isEnableIDTableLogFile = IoTDBDescriptor.getInstance().getConfig().isEnableIDTableLogFile();
IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(true);
IoTDBDescriptor.getInstance().getConfig().setDeviceIDTransformationMethod("SHA256");
IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(true);
EnvironmentUtils.envSetUp();
}
......@@ -152,6 +156,7 @@ public class QueryWithIDTableTest {
IoTDBDescriptor.getInstance()
.getConfig()
.setDeviceIDTransformationMethod(originalDeviceIDTransformationMethod);
IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(isEnableIDTableLogFile);
EnvironmentUtils.cleanEnv();
}
......
......@@ -48,12 +48,6 @@ public class SchemaEntryTest {
schemaEntry.getCompressionType(),
TSFileDescriptor.getInstance().getConfig().getCompressor());
// flush time
schemaEntry.updateLastedFlushTime(100);
assertEquals(schemaEntry.getFlushTime(), 100);
schemaEntry.updateLastedFlushTime(50);
assertEquals(schemaEntry.getFlushTime(), 100);
// last cache
schemaEntry.updateCachedLast(
new TimeValuePair(100L, new TsPrimitiveType.TsLong(1L)), false, 0L);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册