diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/SysCheckException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/SysCheckException.java new file mode 100644 index 0000000000000000000000000000000000000000..640acae75c950997e6d98015f7c4e021418099e1 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/SysCheckException.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.exception; + +public class SysCheckException extends Exception { + + public SysCheckException() { + } + + public SysCheckException(String message) { + super(message); + } + + public SysCheckException(String message, Throwable cause) { + super(message, cause); + } + + public SysCheckException(Throwable cause) { + super(cause); + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/tools/WalChecker.java b/iotdb/src/main/java/org/apache/iotdb/db/tools/WalChecker.java new file mode 100644 index 0000000000000000000000000000000000000000..6bef7a57de497a56f82a56c8bc93c747e584da4a --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/tools/WalChecker.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.tools; + +import static org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode.WAL_FILE_NAME; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.iotdb.db.exception.SysCheckException; +import org.apache.iotdb.db.writelog.io.RAFLogReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * WalChecker verifies that whether all write ahead logs in the WAL folder are recognizable. + */ +public class WalChecker { + + private static final Logger LOGGER = LoggerFactory.getLogger(WalChecker.class); + + /** + * the root dir of wals, which should have wal directories of storage groups as its children. + */ + private String walFolder; + + public WalChecker(String walFolder) { + this.walFolder = walFolder; + } + + /** + * check the root wal dir and find the damaged files + * @return a list of damaged files. + * @throws SysCheckException if the root wal dir does not exist. + */ + public List doCheck() throws SysCheckException { + File walFolderFile = new File(walFolder); + if(!walFolderFile.exists() || !walFolderFile.isDirectory()) { + throw new SysCheckException(String.format("%s is not a directory", walFolder)); + } + + File[] storageWalFolders = walFolderFile.listFiles(); + if (storageWalFolders == null || storageWalFolders.length == 0) { + LOGGER.info("No sub-directories under the given directory, check ends"); + return Collections.emptyList(); + } + + List failedFiles = new ArrayList<>(); + for (int dirIndex = 0; dirIndex < storageWalFolders.length; dirIndex++) { + File storageWalFolder = storageWalFolders[dirIndex]; + LOGGER.debug("Checking the No.{} directory {}", dirIndex, storageWalFolder.getName()); + File walFile = new File(storageWalFolder, WAL_FILE_NAME); + if (!walFile.exists()) { + LOGGER.debug("No wal file in this dir, skipping"); + continue; + } + + RAFLogReader logReader = null; + try { + logReader = new RAFLogReader(walFile); + while (logReader.hasNext()) { + logReader.next(); + } + } catch (IOException e) { + failedFiles.add(walFile); + LOGGER.error("{} fails the check because", walFile.getAbsoluteFile(), e); + } finally { + if( logReader != null) { + logReader.close(); + } + } + } + return failedFiles; + } + + // a temporary method which should be in the integrated self-check module in the future + public static void report(List failedFiles) { + if (failedFiles.isEmpty()) { + LOGGER.info("Check finished. There is no damaged file"); + } else { + LOGGER.error("There are {} failed files. They are {}", failedFiles.size(), failedFiles); + } + } + + /** + * + * @param args walRootDirectory + */ + public static void main(String[] args) throws SysCheckException { + if (args.length < 1) { + LOGGER.error("No enough args: require the walRootDirectory"); + return; + } + + WalChecker checker = new WalChecker(args[0]); + List files = checker.doCheck(); + report(files); + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogReader.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogReader.java index e0d52758e6a8ce17b7a0fd074a32c4213a4fb7b3..55e30a8472d4772b059b109fdbc6ec8bc8c40522 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogReader.java @@ -20,12 +20,17 @@ package org.apache.iotdb.db.writelog.io; import java.io.File; import java.io.FileNotFoundException; +import java.io.IOException; import java.util.Iterator; import org.apache.iotdb.db.qp.physical.PhysicalPlan; -public interface ILogReader extends Iterator { +public interface ILogReader { void open(File file) throws FileNotFoundException; void close(); + + boolean hasNext() throws IOException; + + PhysicalPlan next() throws IOException; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/RAFLogReader.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/RAFLogReader.java index 42e320e5899c0cfb46bf506031d892ca85e0d333..6700805f02dcf5b5a6284ae3537634eb57405450 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/RAFLogReader.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/RAFLogReader.java @@ -48,42 +48,33 @@ public class RAFLogReader implements ILogReader { } @Override - public boolean hasNext() { + public boolean hasNext() throws IOException{ if (planBuffer != null) { return true; } - try { - if (logRaf.getFilePointer() + 12 > logRaf.length()) { - return false; - } - } catch (IOException e) { - logger.error("Cannot read from log file {}", filepath, e); + + if (logRaf.getFilePointer() + 12 > logRaf.length()) { return false; } - try { - int logSize = logRaf.readInt(); - if (logSize > bufferSize) { - bufferSize = logSize; - buffer = new byte[bufferSize]; - } - final long checkSum = logRaf.readLong(); - logRaf.read(buffer, 0, logSize); - checkSummer.reset(); - checkSummer.update(buffer, 0, logSize); - if (checkSummer.getValue() != checkSum) { - return false; - } - PhysicalPlan plan = PhysicalPlanLogTransfer.logToOperator(buffer); - planBuffer = plan; - return true; - } catch (IOException e) { - logger.error("Cannot read log file {}", filepath, e); - return false; + + int logSize = logRaf.readInt(); + if (logSize > bufferSize) { + bufferSize = logSize; + buffer = new byte[bufferSize]; + } + final long checkSum = logRaf.readLong(); + logRaf.read(buffer, 0, logSize); + checkSummer.reset(); + checkSummer.update(buffer, 0, logSize); + if (checkSummer.getValue() != checkSum) { + throw new IOException("The check sum is incorrect!"); } + planBuffer = PhysicalPlanLogTransfer.logToOperator(buffer); + return true; } @Override - public PhysicalPlan next() { + public PhysicalPlan next() throws IOException { if (!hasNext()){ throw new NoSuchElementException(); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java index 56db00a0f53244d3b457393cbb520aa21f68d388..15e88ddc6f9d4757798f34f94c03b05840c74229 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java @@ -259,15 +259,15 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer { replayLog(); } - private int replayLogFile(File logFile) throws RecoverException { + private int replayLogFile(File logFile) throws RecoverException, IOException { int failedCnt = 0; if (logFile.exists()) { try { rafLogReader.open(logFile); } catch (FileNotFoundException e) { logger - .error("Log node {} cannot read old log file, because {}", writeLogNode.getIdentifier(), - e.getMessage()); + .error("Log node {} cannot read old log file, because ", writeLogNode.getIdentifier(), + e); throw new RecoverException("Cannot read old log file, recovery aborted."); } while (rafLogReader.hasNext()) { @@ -294,11 +294,19 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer { File oldLogFile = new File( writeLogNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME + ExclusiveWriteLogNode.OLD_SUFFIX); - failedEntryCnt += replayLogFile(oldLogFile); + try { + failedEntryCnt += replayLogFile(oldLogFile); + } catch (IOException e) { + throw new RecoverException(e); + } // then replay new log File newLogFile = new File( writeLogNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME); - failedEntryCnt += replayLogFile(newLogFile); + try { + failedEntryCnt += replayLogFile(newLogFile); + } catch (IOException e) { + throw new RecoverException(e); + } // TODO : do we need to proceed if there are failed logs ? if (failedEntryCnt > 0) { throw new RecoverException( diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java index 54ba9e5c5394c172ac537e1f871cff6dca43f3f1..b611a4a85101ece148fabd1d25d36410ec8b77b6 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanCodec.java @@ -57,9 +57,9 @@ public enum PhysicalPlanCodec { this.codec = codec; } - public static PhysicalPlanCodec fromOpcode(int opcode) { + public static PhysicalPlanCodec fromOpcode(int opcode) throws IOException { if (!codecMap.containsKey(opcode)) { - throw new UnsupportedOperationException( + throw new IOException( "SystemLogOperator [" + opcode + "] is not supported. "); } return codecMap.get(opcode); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java index 4fb4792c42a68131cf41306cf0197c64abca75b5..76a6e46a839d402a8babd1a846e0e242cfed5bef 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/transfer/PhysicalPlanLogTransfer.java @@ -27,7 +27,7 @@ public class PhysicalPlanLogTransfer { private PhysicalPlanLogTransfer(){} - public static byte[] operatorToLog(PhysicalPlan plan) throws WALOverSizedException { + public static byte[] operatorToLog(PhysicalPlan plan) throws IOException { Codec codec; switch (plan.getOperatorType()) { case INSERT: diff --git a/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..0c3ca290bc664bfef5cc4a0c7df2f56c4853cfd8 --- /dev/null +++ b/iotdb/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.tools; + +import static org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode.WAL_FILE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.iotdb.db.exception.SysCheckException; +import org.apache.iotdb.db.qp.physical.crud.InsertPlan; +import org.apache.iotdb.db.writelog.io.LogWriter; +import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer; +import org.junit.Test; + +public class WalCheckerTest { + + @Test + public void testNoDir() { + WalChecker checker = new WalChecker("no such dir"); + boolean caught = false; + try { + checker.doCheck(); + } catch (SysCheckException e) { + caught = true; + } + assertTrue(caught); + } + + @Test + public void testEmpty() throws IOException, SysCheckException { + File tempRoot = new File("root"); + tempRoot.mkdir(); + + try { + WalChecker checker = new WalChecker(tempRoot.getAbsolutePath()); + assertTrue(checker.doCheck().isEmpty()); + } finally { + FileUtils.deleteDirectory(tempRoot); + } + } + + @Test + public void testNormalCheck() throws IOException, SysCheckException { + File tempRoot = new File("root"); + tempRoot.mkdir(); + + try { + for (int i = 0; i < 5; i++) { + File subDir = new File(tempRoot, "storage_group" + i); + subDir.mkdir(); + LogWriter logWriter = new LogWriter(subDir.getPath() + File.separator + + WAL_FILE_NAME); + + List binaryPlans = new ArrayList<>(); + String deviceId = "device1"; + List measurements = Arrays.asList("s1", "s2", "s3"); + List values = Arrays.asList("5", "6", "7"); + for (int j = 0; j < 10; j++) { + binaryPlans.add(PhysicalPlanLogTransfer + .operatorToLog(new InsertPlan(deviceId, j, measurements, values))); + } + logWriter.write(binaryPlans); + logWriter.force(); + + logWriter.close(); + } + + WalChecker checker = new WalChecker(tempRoot.getAbsolutePath()); + assertTrue(checker.doCheck().isEmpty()); + } finally { + FileUtils.deleteDirectory(tempRoot); + } + } + + @Test + public void testAbnormalCheck() throws IOException, SysCheckException { + File tempRoot = new File("root"); + tempRoot.mkdir(); + + try { + for (int i = 0; i < 5; i++) { + File subDir = new File(tempRoot, "storage_group" + i); + subDir.mkdir(); + LogWriter logWriter = new LogWriter(subDir.getPath() + File.separator + + WAL_FILE_NAME); + + List binaryPlans = new ArrayList<>(); + String deviceId = "device1"; + List measurements = Arrays.asList("s1", "s2", "s3"); + List values = Arrays.asList("5", "6", "7"); + for (int j = 0; j < 10; j++) { + binaryPlans.add(PhysicalPlanLogTransfer + .operatorToLog(new InsertPlan(deviceId, j, measurements, values))); + } + if (i > 2) { + binaryPlans.add("not a wal".getBytes()); + } + logWriter.write(binaryPlans); + logWriter.force(); + + logWriter.close(); + } + + WalChecker checker = new WalChecker(tempRoot.getAbsolutePath()); + assertEquals(2, checker.doCheck().size()); + } finally { + FileUtils.deleteDirectory(tempRoot); + } + } +} diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java index e3b81c4282f6b15801254977222877318a320d4d..ce0af2955de7cb1325aae6ca439475300770a5d8 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/PerformanceTest.java @@ -216,7 +216,7 @@ public class PerformanceTest { } @Test - public void SQLEncodingComparisonTest() throws WALOverSizedException { + public void SQLEncodingComparisonTest() throws IOException { String sql = "INSERT INTO root.logTestDevice(time,s1,s2,s3,s4) " + "VALUES (100,1.0,15,\"str\",false)"; InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice", 100, diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java index 4ba32ef3bbda9de58f1af86f628d8835a84cc3f0..d68478ee5b1d4d4a24a18fd81c455e64a08e84fb 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/io/LogWriterReaderTest.java @@ -42,7 +42,7 @@ public class LogWriterReaderTest { List plans = new ArrayList<>(); @Before - public void prepare() throws WALOverSizedException { + public void prepare() throws IOException { if (new File(filePath).exists()) { new File(filePath).delete(); } diff --git a/tsfile/pom.xml b/tsfile/pom.xml index 53ef892ded42201aacc6d3afe1fae096aa92e799..5749bc5e6d9d421a41fa54ca8b2f98aeab09fae9 100644 --- a/tsfile/pom.xml +++ b/tsfile/pom.xml @@ -113,7 +113,6 @@ -