未验证 提交 2a103317 编写于 作者: T tangyoupeng 提交者: GitHub

Bench fsio (#464)

* hadoop: add multi thread support for fsio

* hadoop: set TestFSIO and TestDFSIO write 0 bytes as default

* hadoop: set thread to daemon true

* hadoop: set thread to daemon true
上级 2a3abb03
......@@ -79,7 +79,11 @@ public abstract class IOMapperBase<T> extends Configured
hostName = "localhost";
}
threadPerMap = conf.getInt("test.threadsPerMap", 1);
pool = Executors.newFixedThreadPool(threadPerMap);
pool = Executors.newFixedThreadPool(threadPerMap, r -> {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
});
}
@Override
......
......@@ -507,7 +507,11 @@ public class NNBench {
beforeRename = conf.getBoolean("test.nnbench.deleteBeforeRename", false);
threadsPerMap = conf.getInt("test.nnbench.threadsPerMap", 1);
executorService = Executors.newFixedThreadPool(threadsPerMap);
executorService = Executors.newFixedThreadPool(threadsPerMap, r -> {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
});
}
/**
......
......@@ -38,7 +38,6 @@ import java.io.*;
import java.text.DecimalFormat;
import java.util.Date;
import java.util.Locale;
import java.util.Random;
import java.util.StringTokenizer;
import java.util.concurrent.ThreadLocalRandom;
......@@ -84,6 +83,7 @@ public class TestDFSIO implements Tool {
" [genericOptions]" +
" -read [-random | -backward | -skip [-skipSize Size]] |" +
" -write | -append | -truncate | -clean" +
" [-randomBytes]" +
" [-compression codecClassName]" +
" [-nrFiles N]" +
" [-size Size[B|KB|MB|GB|TB]]" +
......@@ -257,12 +257,15 @@ public class TestDFSIO implements Tool {
private abstract static class IOStatMapper extends IOMapperBase<Long> {
protected CompressionCodec compressionCodec;
private static final ThreadLocalRandom random = ThreadLocalRandom.current();
private boolean randomBytes;
IOStatMapper() {
}
public byte[] getBuffer() {
random.nextBytes(buffer.get());
if (randomBytes) {
random.nextBytes(buffer.get());
}
return buffer.get();
}
......@@ -275,6 +278,7 @@ public class TestDFSIO implements Tool {
} catch (IOException e) {
throw new RuntimeException("Cannot create file system.", e);
}
randomBytes = conf.getBoolean("test.randomBytes", false);
// grab compression
String compression = getConf().get("test.io.compression.class", null);
......@@ -678,6 +682,7 @@ public class TestDFSIO implements Tool {
String compressionClass = null;
boolean isSequential = false;
String version = TestDFSIO.class.getSimpleName() + ".1.8";
boolean randomBytes = false;
LOG.info(version);
if (args.length == 0) {
......@@ -690,6 +695,8 @@ public class TestDFSIO implements Tool {
testType = TestType.TEST_TYPE_READ;
} else if (args[i].equals("-write")) {
testType = TestType.TEST_TYPE_WRITE;
} else if (args[i].equals("-randomBytes")) {
randomBytes = true;
} else if (args[i].equals("-append")) {
testType = TestType.TEST_TYPE_APPEND;
} else if (args[i].equals("-random")) {
......@@ -738,6 +745,7 @@ public class TestDFSIO implements Tool {
skipSize = bufferSize;
LOG.info("nrFiles = " + nrFiles);
LOG.info("randomBytes = " + randomBytes);
LOG.info("nrBytes (MB) = " + toMB(nrBytes));
LOG.info("bufferSize = " + bufferSize);
if (skipSize > 0)
......@@ -755,6 +763,7 @@ public class TestDFSIO implements Tool {
config.setBoolean("dfs.support.append", true);
config.setInt("test.threadsPerMap", threadsPerMap);
config.set("test.basedir", baseDir);
config.setBoolean("test.randomBytes", randomBytes);
FileSystem fs = new Path(getBaseDir(config)).getFileSystem(config);
if (isSequential) {
......
......@@ -32,9 +32,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.DecimalFormat;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
......@@ -46,15 +48,27 @@ public class TestFSIO {
private static final long MEGA = TestDFSIO.ByteMultiple.MB.value();
private Configuration config;
private static String baseDir = "/benchmarks/FSIO";
private static FileSystem fs;
private static int threads = 1;
private static int nrFiles = 1;
private static long size = 1;
private static int bufferSize = 1000000;
private static long skipSize;
private static String compression;
private static boolean randomBytes;
private static final String USAGE =
"Usage: " + TestFSIO.class.getSimpleName() +
" [genericOptions]" +
" -read [-random | -backward | -skip [-skipSize Size]] |" +
" -write | -append | -truncate | -clean" +
" [-randomBytes]" +
" [-compression codecClassName]" +
" [-nrFiles N]" +
" [-threads N]" +
" [-size Size[B|KB|MB|GB|TB]]" +
" [-resFile resultFileName] [-bufferSize Bytes]" +
" [-bufferSize Bytes]" +
" [-baseDir]";
private static enum TestType {
......@@ -85,28 +99,19 @@ public class TestFSIO {
static abstract class TaskBase implements Callable<Long> {
protected byte[] buffer;
protected int bufferSize;
protected FileSystem fs;
protected int id;
protected int threadId;
protected CompressionCodec compressionCodec;
protected long totalSize;
protected ThreadLocalRandom random = ThreadLocalRandom.current();
protected Configuration config;
public TaskBase(int id, Configuration config) {
this.id = id;
public TaskBase(int threadId, Configuration config) {
this.threadId = threadId;
this.config = config;
try {
this.fs = new Path(getBaseDir(config)).getFileSystem(config);
} catch (IOException e) {
throw new RuntimeException("Cannot create file system.", e);
}
bufferSize = config.getInt("test.io.file.buffer.size", 4096);
buffer = new byte[bufferSize];
totalSize = config.getLong("test.nrbytes", MEGA);
totalSize = size;
// grab compression
String compression = config.get("test.io.compression.class", null);
Class<? extends CompressionCodec> codec;
// try to initialize codec
......@@ -123,7 +128,9 @@ public class TestFSIO {
}
public byte[] fillBuffer() {
random.nextBytes(buffer);
if (randomBytes) {
random.nextBytes(buffer);
}
return buffer;
}
......@@ -136,22 +143,15 @@ public class TestFSIO {
* Create an input or output stream based on the specified file.
* Subclasses should override this method to provide an actual stream.
*/
public Closeable getIOStream(int id) throws IOException {
return null;
}
abstract Closeable getIOStream(Path path) throws IOException;
/**
* Collect stat data to be combined by a subsequent reducer.
*/
void collectStats(int id,
long execTime,
void collectStats(long execTime,
Long size) throws IOException {
float ioRateMbSec = (float) size * 1000 / (execTime * MEGA);
LOG.info("task " + id + " Number of bytes processed = " + size);
LOG.info("task " + id + " Exec time = " + execTime);
LOG.info("task " + id + " IO rate = " + ioRateMbSec);
Collector.INSTANCE.collectTasks(1);
Collector.INSTANCE.collectSize(size);
Collector.INSTANCE.collectExecTime(execTime);
Collector.INSTANCE.collectRate(ioRateMbSec * 1000);
......@@ -160,18 +160,22 @@ public class TestFSIO {
@Override
public Long call() throws Exception {
long execTime;
long size;
try (Closeable stream = getIOStream(id)) {
long execTime = 0;
long size = 0;
final Path taskPath = new Path(baseDir, String.valueOf(threadId));
fs.mkdirs(taskPath);
for (int i = 0; i < nrFiles; i++) {
Path path = new Path(taskPath, "file-" + i);
long tStart = System.currentTimeMillis();
size = doIO(id, stream);
try (Closeable stream = getIOStream(path)) {
size += doIO(threadId, stream);
} catch (IOException e) {
throw new RuntimeException(e);
}
long tEnd = System.currentTimeMillis();
execTime = tEnd - tStart;
} catch (IOException e) {
throw new RuntimeException(e);
execTime += (tEnd - tStart);
}
collectStats(id, execTime, size);
collectStats(execTime, size);
return size;
}
}
......@@ -181,7 +185,6 @@ public class TestFSIO {
AtomicLong execTime = new AtomicLong(0L);
AtomicLong size = new AtomicLong(0L);
AtomicInteger tasks = new AtomicInteger(0);
List<Double> rates = Collections.synchronizedList(new ArrayList<>());
List<Double> sqrates = Collections.synchronizedList(new ArrayList<>());
......@@ -196,10 +199,6 @@ public class TestFSIO {
this.size.getAndAdd(size);
}
void collectTasks(int task) {
this.tasks.getAndAdd(task);
}
void collectRate(double rate) {
this.rates.add(rate);
}
......@@ -216,10 +215,6 @@ public class TestFSIO {
return size.get();
}
public int getTasks() {
return tasks.get();
}
public double getRate() {
return rates.stream().reduce(Double::sum).orElse(0.0);
}
......@@ -236,13 +231,12 @@ public class TestFSIO {
}
@Override
public Closeable getIOStream(int id) throws IOException {
public Closeable getIOStream(Path path) throws IOException {
// open file
InputStream in = fs.open(new Path(getDataDir(config), "data_" + id));
InputStream in = fs.open(path);
if (compressionCodec != null) {
in = compressionCodec.createInputStream(in);
}
LOG.info("in = " + in.getClass().getName());
return in;
}
......@@ -250,7 +244,6 @@ public class TestFSIO {
Long doIO(int id, Closeable stream) throws IOException {
InputStream in = (InputStream) stream;
long actualSize = 0;
LOG.info("totalsize = " + totalSize);
while (actualSize < totalSize) {
int curSize = in.read(buffer, 0, bufferSize);
if (curSize < 0) {
......@@ -258,12 +251,11 @@ public class TestFSIO {
}
actualSize += curSize;
}
LOG.info("actualSize = " + actualSize);
return actualSize;
}
}
long readTest(FileSystem fs) throws IOException {
long readTest() throws IOException {
long tStart = System.currentTimeMillis();
try {
runTest(ReadTask.class);
......@@ -276,22 +268,18 @@ public class TestFSIO {
static class RandomReadTask extends TaskBase {
private ThreadLocalRandom rnd;
private long fileSize;
private long skipSize;
public RandomReadTask(Integer id, Configuration config) {
super(id, config);
rnd = ThreadLocalRandom.current();
skipSize = config.getLong("test.io.skip.size", 0);
}
@Override
public Closeable getIOStream(int id) throws IOException {
Path filePath = new Path(getDataDir(config), "data_" + id);
this.fileSize = fs.getFileStatus(filePath).getLen();
InputStream in = fs.open(filePath);
public Closeable getIOStream(Path path) throws IOException {
this.fileSize = fs.getFileStatus(path).getLen();
InputStream in = fs.open(path);
if (compressionCodec != null)
in = new FSDataInputStream(compressionCodec.createInputStream(in));
LOG.info("in = " + in.getClass().getName());
LOG.info("skipSize = " + skipSize);
return in;
}
......@@ -320,9 +308,7 @@ public class TestFSIO {
}
}
private long randomReadTest(FileSystem fs) throws IOException {
Path readDir = getRandomReadDir(config);
fs.delete(readDir, true);
private long randomReadTest() throws IOException {
long tStart = System.currentTimeMillis();
try {
runTest(RandomReadTask.class);
......@@ -344,8 +330,8 @@ public class TestFSIO {
}
@Override
public Closeable getIOStream(int id) throws IOException {
filePath = new Path(getDataDir(config), "data_" + id);
public Closeable getIOStream(Path path) throws IOException {
filePath = path;
fileSize = fs.getFileStatus(filePath).getLen();
return null;
}
......@@ -366,9 +352,7 @@ public class TestFSIO {
}
}
private long truncateTest(FileSystem fs) throws IOException {
Path TruncateDir = getTruncateDir(config);
fs.delete(TruncateDir, true);
private long truncateTest() throws IOException {
long tStart = System.currentTimeMillis();
try {
runTest(TruncateTask.class);
......@@ -385,14 +369,13 @@ public class TestFSIO {
}
@Override
public Closeable getIOStream(int id) throws IOException {
public Closeable getIOStream(Path path) throws IOException {
// create file
OutputStream out =
fs.create(new Path(getDataDir(config), "data_" + id), true, bufferSize);
fs.create(path, false, bufferSize);
if (compressionCodec != null) {
out = compressionCodec.createOutputStream(out);
}
LOG.info("task " + id + " out = " + out.getClass().getName());
return out;
}
......@@ -410,10 +393,9 @@ public class TestFSIO {
}
}
long writeTest(FileSystem fs) throws IOException {
Path writeDir = getWriteDir(config);
fs.delete(getDataDir(config), true);
fs.delete(writeDir, true);
long writeTest() throws IOException {
fs.delete(new Path(baseDir), true);
fs.mkdirs(new Path(baseDir));
long tStart = System.currentTimeMillis();
try {
runTest(WriteTask.class);
......@@ -430,13 +412,12 @@ public class TestFSIO {
}
@Override // IOMapperBase
public Closeable getIOStream(int id) throws IOException {
public Closeable getIOStream(Path path) throws IOException {
// open file for append
OutputStream out =
fs.append(new Path(getDataDir(config), "data_" + id), bufferSize);
fs.append(path, bufferSize);
if (compressionCodec != null)
out = compressionCodec.createOutputStream(out);
LOG.info("out = " + out.getClass().getName());
return out;
}
......@@ -453,9 +434,7 @@ public class TestFSIO {
}
}
private long appendTest(FileSystem fs) throws IOException {
Path appendDir = getAppendDir(config);
fs.delete(appendDir, true);
private long appendTest() throws IOException {
long tStart = System.currentTimeMillis();
try {
runTest(AppendTask.class);
......@@ -466,8 +445,11 @@ public class TestFSIO {
}
void runTest(Class<? extends Callable<Long>> clazz) throws Exception {
int threads = config.getInt("test.threadsPerMap", 1);
ExecutorService pool = Executors.newFixedThreadPool(threads);
ExecutorService pool = Executors.newFixedThreadPool(threads, r -> {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
});
List<Future<Long>> futures = new ArrayList<>(threads);
for (int i = 0; i < threads; i++) {
Callable<Long> t = clazz.getDeclaredConstructor(Integer.class, Configuration.class).newInstance(i, config);
......@@ -498,12 +480,6 @@ public class TestFSIO {
public int run(String[] args) throws IOException {
TestType testType = null;
int bufferSize = 1000000;
long nrBytes = 1 * MEGA;
int nrFiles = 1;
long skipSize = 0;
int threadsPerMap = 1;
String compressionClass = null;
String version = TestFSIO.class.getSimpleName() + ".1.8";
LOG.info(version);
......@@ -517,6 +493,8 @@ public class TestFSIO {
testType = TestType.TEST_TYPE_READ;
} else if (args[i].equals("-write")) {
testType = TestType.TEST_TYPE_WRITE;
} else if (args[i].equals("-randomBytes")) {
randomBytes = true;
} else if (args[i].equals("-append")) {
testType = TestType.TEST_TYPE_APPEND;
} else if (args[i].equals("-random")) {
......@@ -533,19 +511,19 @@ public class TestFSIO {
} else if (args[i].equals("-clean")) {
testType = TestType.TEST_TYPE_CLEANUP;
} else if (args[i].startsWith("-compression")) {
compressionClass = args[++i];
compression = args[++i];
} else if (args[i].equals("-nrFiles")) {
nrFiles = Integer.parseInt(args[++i]);
} else if (args[i].equals("-fileSize") || args[i].equals("-size")) {
nrBytes = TestDFSIO.parseSize(args[++i]);
size = TestDFSIO.parseSize(args[++i]);
} else if (args[i].equals("-skipSize")) {
skipSize = TestDFSIO.parseSize(args[++i]);
} else if (args[i].equals("-bufferSize")) {
bufferSize = Integer.parseInt(args[++i]);
} else if (args[i].equals("-baseDir")) {
baseDir = args[++i];
} else if (args[i].equals("-threadsPerMap")) {
threadsPerMap = Integer.parseInt(args[++i]);
} else if (args[i].equals("-threads")) {
threads = Integer.parseInt(args[++i]);
} else {
System.err.println("Illegal argument: " + args[i]);
return -1;
......@@ -559,105 +537,72 @@ public class TestFSIO {
skipSize = bufferSize;
LOG.info("nrFiles = " + nrFiles);
LOG.info("nrBytes (MB) = " + TestDFSIO.toMB(nrBytes));
LOG.info("randomBytes = " + randomBytes);
LOG.info("fileSize (MB) = " + TestDFSIO.toMB(size));
LOG.info("bufferSize = " + bufferSize);
if (skipSize > 0)
LOG.info("skipSize = " + skipSize);
LOG.info("baseDir = " + getBaseDir(config));
LOG.info("threadsPerMap = " + threadsPerMap);
LOG.info("baseDir = " + baseDir);
LOG.info("threads = " + threads);
if (compressionClass != null) {
config.set("test.io.compression.class", compressionClass);
LOG.info("compressionClass = " + compressionClass);
if (compression != null) {
LOG.info("compressionClass = " + compression);
}
config.setInt("test.io.file.buffer.size", bufferSize);
config.setLong("test.io.skip.size", skipSize);
config.setBoolean("dfs.support.append", true);
config.setInt("test.threadsPerMap", threadsPerMap);
config.setLong("test.nrbytes", nrBytes);
FileSystem fs = new Path(getBaseDir(config)).getFileSystem(config);
fs = new Path(baseDir).getFileSystem(config);
if (testType == TestType.TEST_TYPE_CLEANUP) {
cleanup(fs);
cleanup();
return 0;
}
createControlFile(fs, nrBytes, nrFiles);
long tStart = System.currentTimeMillis();
switch (testType) {
case TEST_TYPE_WRITE:
writeTest(fs);
writeTest();
break;
case TEST_TYPE_READ:
readTest(fs);
readTest();
break;
case TEST_TYPE_APPEND:
appendTest(fs);
appendTest();
break;
case TEST_TYPE_READ_RANDOM:
case TEST_TYPE_READ_BACKWARD:
case TEST_TYPE_READ_SKIP:
randomReadTest(fs);
randomReadTest();
break;
case TEST_TYPE_TRUNCATE:
truncateTest(fs);
truncateTest();
break;
default:
}
long execTime = System.currentTimeMillis() - tStart;
analyzeResult(fs, testType, execTime, null);
analyzeResult(testType, execTime);
return 0;
}
private static String getBaseDir(Configuration conf) {
return baseDir;
}
private static Path getWriteDir(Configuration conf) {
return new Path(getBaseDir(conf), "io_write");
}
private static Path getAppendDir(Configuration conf) {
return new Path(getBaseDir(conf), "io_append");
}
private static Path getRandomReadDir(Configuration conf) {
return new Path(getBaseDir(conf), "io_random_read");
}
private static Path getTruncateDir(Configuration conf) {
return new Path(getBaseDir(conf), "io_truncate");
}
private static Path getDataDir(Configuration conf) {
return new Path(getBaseDir(conf), "io_data");
}
void createControlFile(FileSystem fs, long nrBytes, int nrFiles) throws IOException {
}
void analyzeResult(FileSystem fs,
TestType testType,
long execTime,
String resFileName
void analyzeResult(
TestType testType,
long execTime
) throws IOException {
long tasks = Collector.INSTANCE.getTasks();
long size = Collector.INSTANCE.getSize();
long time = Collector.INSTANCE.getExecTime();
double rate = Collector.INSTANCE.getRate();
double sqrate = Collector.INSTANCE.getSqrate();
double med = rate / 1000 / tasks;
double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med * med));
double med = rate / 1000 / threads;
double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / threads - med * med));
DecimalFormat df = new DecimalFormat("#.##");
String resultLines[] = {
"----- TestClient ----- : " + testType,
" Date & time: " + new Date(System.currentTimeMillis()),
" Number of files: " + tasks,
" Number of threads: " + threads,
"Number files per thread: " + nrFiles,
" Total files: " + threads * nrFiles,
" Total MBytes processed: " + df.format(TestDFSIO.toMB(size)),
" Throughput mb/sec: " + df.format(TestDFSIO.toMB(size) / TestDFSIO.msToSecs(time)),
"Total Throughput mb/sec: " + df.format(med * tasks),
" Average IO rate mb/sec: " + df.format(med),
" Avg Throughput mb/sec: " + df.format(TestDFSIO.toMB(size) / TestDFSIO.msToSecs(time)),
"Total Throughput mb/sec: " + df.format(med * threads),
" IO rate std deviation: " + df.format(stdDev),
" Test exec time sec: " + df.format(TestDFSIO.msToSecs(execTime)),
""};
......@@ -667,9 +612,9 @@ public class TestFSIO {
}
}
private void cleanup(FileSystem fs)
private void cleanup()
throws IOException {
LOG.info("Cleaning up test files");
fs.delete(new Path(getBaseDir(config)), true);
fs.delete(new Path(baseDir), true);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册