提交 971ec2b6 编写于 作者: J Jiang Tian 提交者: Xiangdong Huang

Add wal checking tool (#85)

* add wal checker

* fix show timeseries IT test
上级 cd8d5fdb
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.exception;
public class SysCheckException extends Exception {
public SysCheckException() {
}
public SysCheckException(String message) {
super(message);
}
public SysCheckException(String message, Throwable cause) {
super(message, cause);
}
public SysCheckException(Throwable cause) {
super(cause);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.tools;
import static org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode.WAL_FILE_NAME;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.iotdb.db.exception.SysCheckException;
import org.apache.iotdb.db.writelog.io.RAFLogReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* WalChecker verifies that whether all write ahead logs in the WAL folder are recognizable.
*/
public class WalChecker {
private static final Logger LOGGER = LoggerFactory.getLogger(WalChecker.class);
/**
* the root dir of wals, which should have wal directories of storage groups as its children.
*/
private String walFolder;
public WalChecker(String walFolder) {
this.walFolder = walFolder;
}
/**
* check the root wal dir and find the damaged files
* @return a list of damaged files.
* @throws SysCheckException if the root wal dir does not exist.
*/
public List<File> doCheck() throws SysCheckException {
File walFolderFile = new File(walFolder);
if(!walFolderFile.exists() || !walFolderFile.isDirectory()) {
throw new SysCheckException(String.format("%s is not a directory", walFolder));
}
File[] storageWalFolders = walFolderFile.listFiles();
if (storageWalFolders == null || storageWalFolders.length == 0) {
LOGGER.info("No sub-directories under the given directory, check ends");
return Collections.emptyList();
}
List<File> failedFiles = new ArrayList<>();
for (int dirIndex = 0; dirIndex < storageWalFolders.length; dirIndex++) {
File storageWalFolder = storageWalFolders[dirIndex];
LOGGER.debug("Checking the No.{} directory {}", dirIndex, storageWalFolder.getName());
File walFile = new File(storageWalFolder, WAL_FILE_NAME);
if (!walFile.exists()) {
LOGGER.debug("No wal file in this dir, skipping");
continue;
}
RAFLogReader logReader = null;
try {
logReader = new RAFLogReader(walFile);
while (logReader.hasNext()) {
logReader.next();
}
} catch (IOException e) {
failedFiles.add(walFile);
LOGGER.error("{} fails the check because", walFile.getAbsoluteFile(), e);
} finally {
if( logReader != null) {
logReader.close();
}
}
}
return failedFiles;
}
// a temporary method which should be in the integrated self-check module in the future
public static void report(List<File> failedFiles) {
if (failedFiles.isEmpty()) {
LOGGER.info("Check finished. There is no damaged file");
} else {
LOGGER.error("There are {} failed files. They are {}", failedFiles.size(), failedFiles);
}
}
/**
*
* @param args walRootDirectory
*/
public static void main(String[] args) throws SysCheckException {
if (args.length < 1) {
LOGGER.error("No enough args: require the walRootDirectory");
return;
}
WalChecker checker = new WalChecker(args[0]);
List<File> files = checker.doCheck();
report(files);
}
}
......@@ -20,12 +20,17 @@ package org.apache.iotdb.db.writelog.io;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
public interface ILogReader extends Iterator<PhysicalPlan> {
public interface ILogReader {
void open(File file) throws FileNotFoundException;
void close();
boolean hasNext() throws IOException;
PhysicalPlan next() throws IOException;
}
......@@ -48,42 +48,33 @@ public class RAFLogReader implements ILogReader {
}
@Override
public boolean hasNext() {
public boolean hasNext() throws IOException{
if (planBuffer != null) {
return true;
}
try {
if (logRaf.getFilePointer() + 12 > logRaf.length()) {
return false;
}
} catch (IOException e) {
logger.error("Cannot read from log file {}", filepath, e);
if (logRaf.getFilePointer() + 12 > logRaf.length()) {
return false;
}
try {
int logSize = logRaf.readInt();
if (logSize > bufferSize) {
bufferSize = logSize;
buffer = new byte[bufferSize];
}
final long checkSum = logRaf.readLong();
logRaf.read(buffer, 0, logSize);
checkSummer.reset();
checkSummer.update(buffer, 0, logSize);
if (checkSummer.getValue() != checkSum) {
return false;
}
PhysicalPlan plan = PhysicalPlanLogTransfer.logToOperator(buffer);
planBuffer = plan;
return true;
} catch (IOException e) {
logger.error("Cannot read log file {}", filepath, e);
return false;
int logSize = logRaf.readInt();
if (logSize > bufferSize) {
bufferSize = logSize;
buffer = new byte[bufferSize];
}
final long checkSum = logRaf.readLong();
logRaf.read(buffer, 0, logSize);
checkSummer.reset();
checkSummer.update(buffer, 0, logSize);
if (checkSummer.getValue() != checkSum) {
throw new IOException("The check sum is incorrect!");
}
planBuffer = PhysicalPlanLogTransfer.logToOperator(buffer);
return true;
}
@Override
public PhysicalPlan next() {
public PhysicalPlan next() throws IOException {
if (!hasNext()){
throw new NoSuchElementException();
}
......
......@@ -259,15 +259,15 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
replayLog();
}
private int replayLogFile(File logFile) throws RecoverException {
private int replayLogFile(File logFile) throws RecoverException, IOException {
int failedCnt = 0;
if (logFile.exists()) {
try {
rafLogReader.open(logFile);
} catch (FileNotFoundException e) {
logger
.error("Log node {} cannot read old log file, because {}", writeLogNode.getIdentifier(),
e.getMessage());
.error("Log node {} cannot read old log file, because ", writeLogNode.getIdentifier(),
e);
throw new RecoverException("Cannot read old log file, recovery aborted.");
}
while (rafLogReader.hasNext()) {
......@@ -294,11 +294,19 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
File oldLogFile = new File(
writeLogNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME
+ ExclusiveWriteLogNode.OLD_SUFFIX);
failedEntryCnt += replayLogFile(oldLogFile);
try {
failedEntryCnt += replayLogFile(oldLogFile);
} catch (IOException e) {
throw new RecoverException(e);
}
// then replay new log
File newLogFile = new File(
writeLogNode.getLogDirectory() + File.separator + ExclusiveWriteLogNode.WAL_FILE_NAME);
failedEntryCnt += replayLogFile(newLogFile);
try {
failedEntryCnt += replayLogFile(newLogFile);
} catch (IOException e) {
throw new RecoverException(e);
}
// TODO : do we need to proceed if there are failed logs ?
if (failedEntryCnt > 0) {
throw new RecoverException(
......
......@@ -57,9 +57,9 @@ public enum PhysicalPlanCodec {
this.codec = codec;
}
public static PhysicalPlanCodec fromOpcode(int opcode) {
public static PhysicalPlanCodec fromOpcode(int opcode) throws IOException {
if (!codecMap.containsKey(opcode)) {
throw new UnsupportedOperationException(
throw new IOException(
"SystemLogOperator [" + opcode + "] is not supported. ");
}
return codecMap.get(opcode);
......
......@@ -27,7 +27,7 @@ public class PhysicalPlanLogTransfer {
private PhysicalPlanLogTransfer(){}
public static byte[] operatorToLog(PhysicalPlan plan) throws WALOverSizedException {
public static byte[] operatorToLog(PhysicalPlan plan) throws IOException {
Codec<PhysicalPlan> codec;
switch (plan.getOperatorType()) {
case INSERT:
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.tools;
import static org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode.WAL_FILE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.exception.SysCheckException;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.writelog.io.LogWriter;
import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
import org.junit.Test;
public class WalCheckerTest {
@Test
public void testNoDir() {
WalChecker checker = new WalChecker("no such dir");
boolean caught = false;
try {
checker.doCheck();
} catch (SysCheckException e) {
caught = true;
}
assertTrue(caught);
}
@Test
public void testEmpty() throws IOException, SysCheckException {
File tempRoot = new File("root");
tempRoot.mkdir();
try {
WalChecker checker = new WalChecker(tempRoot.getAbsolutePath());
assertTrue(checker.doCheck().isEmpty());
} finally {
FileUtils.deleteDirectory(tempRoot);
}
}
@Test
public void testNormalCheck() throws IOException, SysCheckException {
File tempRoot = new File("root");
tempRoot.mkdir();
try {
for (int i = 0; i < 5; i++) {
File subDir = new File(tempRoot, "storage_group" + i);
subDir.mkdir();
LogWriter logWriter = new LogWriter(subDir.getPath() + File.separator
+ WAL_FILE_NAME);
List<byte[]> binaryPlans = new ArrayList<>();
String deviceId = "device1";
List<String> measurements = Arrays.asList("s1", "s2", "s3");
List<String> values = Arrays.asList("5", "6", "7");
for (int j = 0; j < 10; j++) {
binaryPlans.add(PhysicalPlanLogTransfer
.operatorToLog(new InsertPlan(deviceId, j, measurements, values)));
}
logWriter.write(binaryPlans);
logWriter.force();
logWriter.close();
}
WalChecker checker = new WalChecker(tempRoot.getAbsolutePath());
assertTrue(checker.doCheck().isEmpty());
} finally {
FileUtils.deleteDirectory(tempRoot);
}
}
@Test
public void testAbnormalCheck() throws IOException, SysCheckException {
File tempRoot = new File("root");
tempRoot.mkdir();
try {
for (int i = 0; i < 5; i++) {
File subDir = new File(tempRoot, "storage_group" + i);
subDir.mkdir();
LogWriter logWriter = new LogWriter(subDir.getPath() + File.separator
+ WAL_FILE_NAME);
List<byte[]> binaryPlans = new ArrayList<>();
String deviceId = "device1";
List<String> measurements = Arrays.asList("s1", "s2", "s3");
List<String> values = Arrays.asList("5", "6", "7");
for (int j = 0; j < 10; j++) {
binaryPlans.add(PhysicalPlanLogTransfer
.operatorToLog(new InsertPlan(deviceId, j, measurements, values)));
}
if (i > 2) {
binaryPlans.add("not a wal".getBytes());
}
logWriter.write(binaryPlans);
logWriter.force();
logWriter.close();
}
WalChecker checker = new WalChecker(tempRoot.getAbsolutePath());
assertEquals(2, checker.doCheck().size());
} finally {
FileUtils.deleteDirectory(tempRoot);
}
}
}
......@@ -216,7 +216,7 @@ public class PerformanceTest {
}
@Test
public void SQLEncodingComparisonTest() throws WALOverSizedException {
public void SQLEncodingComparisonTest() throws IOException {
String sql = "INSERT INTO root.logTestDevice(time,s1,s2,s3,s4) "
+ "VALUES (100,1.0,15,\"str\",false)";
InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice", 100,
......
......@@ -42,7 +42,7 @@ public class LogWriterReaderTest {
List<PhysicalPlan> plans = new ArrayList<>();
@Before
public void prepare() throws WALOverSizedException {
public void prepare() throws IOException {
if (new File(filePath).exists()) {
new File(filePath).delete();
}
......
......@@ -113,7 +113,6 @@
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册