提交 e1e6e063 编写于 作者: R redestad

8222532: (zipfs) Performance regression when writing ZipFileSystem entries in parallel

Reviewed-by: lancea, clanger, alanb
上级 0e32bf8e
......@@ -591,11 +591,11 @@ class ZipFileSystem extends FileSystem {
throw new IllegalArgumentException("APPEND + TRUNCATE_EXISTING not allowed");
}
// Returns an output SeekableByteChannel for either
// (1) writing the contents of a new entry, if the entry doesn't exit, or
// (2) updating/replacing the contents of an existing entry.
// Note: The content is not compressed.
// Note: The content of the channel is not compressed until the
// channel is closed
private class EntryOutputChannel extends ByteArrayChannel {
Entry e;
......@@ -614,19 +614,19 @@ class ZipFileSystem extends FileSystem {
@Override
public void close() throws IOException {
e.bytes = toByteArray();
e.size = e.bytes.length;
e.crc = -1;
// will update the entry
try (OutputStream os = getOutputStream(e)) {
os.write(toByteArray());
}
super.close();
update(e);
}
}
private int getCompressMethod(FileAttribute<?>... attrs) {
private int getCompressMethod() {
return defaultMethod;
}
// Returns a Writable/ReadByteChannel for now. Might consdier to use
// Returns a Writable/ReadByteChannel for now. Might consider to use
// newFileChannel() instead, which dump the entry data into a regular
// file on the default file system and create a FileChannel on top of
// it.
......@@ -639,10 +639,9 @@ class ZipFileSystem extends FileSystem {
if (options.contains(StandardOpenOption.WRITE) ||
options.contains(StandardOpenOption.APPEND)) {
checkWritable();
beginRead(); // only need a readlock, the "update()" will obtain
// thewritelock when the channel is closed
beginRead(); // only need a read lock, the "update()" will obtain
// the write lock when the channel is closed
try {
ensureOpen();
Entry e = getEntry(path);
if (e != null) {
if (e.isDir() || options.contains(CREATE_NEW))
......@@ -667,8 +666,7 @@ class ZipFileSystem extends FileSystem {
throw new NoSuchFileException(getString(path));
checkParents(path);
return new EntryOutputChannel(
new Entry(path, Entry.NEW, false, getCompressMethod(attrs)));
new Entry(path, Entry.NEW, false, getCompressMethod()));
} finally {
endRead();
}
......@@ -735,7 +733,7 @@ class ZipFileSystem extends FileSystem {
final Entry u = isFCH ? e : new Entry(path, tmpfile, Entry.FILECH);
if (forWrite) {
u.flag = FLAG_DATADESCR;
u.method = getCompressMethod(attrs);
u.method = getCompressMethod();
}
// is there a better way to hook into the FileChannel's close method?
return new FileChannel() {
......@@ -836,7 +834,11 @@ class ZipFileSystem extends FileSystem {
// the outstanding input streams that need to be closed
private Set<InputStream> streams =
Collections.synchronizedSet(new HashSet<InputStream>());
Collections.synchronizedSet(new HashSet<>());
// the ex-channel and ex-path that need to close when their outstanding
// input streams are all closed by the obtainers.
private Set<ExistingChannelCloser> exChClosers = new HashSet<>();
private Set<Path> tmppaths = Collections.synchronizedSet(new HashSet<Path>());
private Path getTempPathForEntry(byte[] path) throws IOException {
......@@ -1193,25 +1195,20 @@ class ZipFileSystem extends FileSystem {
return written;
}
private long writeEntry(Entry e, OutputStream os, byte[] buf)
private long writeEntry(Entry e, OutputStream os)
throws IOException {
if (e.bytes == null && e.file == null) // dir, 0-length data
return 0;
long written = 0;
try (OutputStream os2 = e.method == METHOD_STORED ?
new EntryOutputStreamCRC32(e, os) : new EntryOutputStreamDef(e, os)) {
if (e.bytes != null) { // in-memory
os2.write(e.bytes, 0, e.bytes.length);
} else if (e.file != null) { // tmp file
if (e.type == Entry.NEW || e.type == Entry.FILECH) {
try (InputStream is = Files.newInputStream(e.file)) {
is.transferTo(os2);
}
}
Files.delete(e.file);
tmppaths.remove(e.file);
if (e.crc != 0 && e.csize > 0) {
// pre-compressed entry, write directly to output stream
writeTo(e, os);
} else {
try (OutputStream os2 = (e.method == METHOD_STORED) ?
new EntryOutputStreamCRC32(e, os) : new EntryOutputStreamDef(e, os)) {
writeTo(e, os2);
}
}
written += e.csize;
......@@ -1221,18 +1218,38 @@ class ZipFileSystem extends FileSystem {
return written;
}
private void writeTo(Entry e, OutputStream os) throws IOException {
if (e.bytes != null) {
os.write(e.bytes, 0, e.bytes.length);
} else if (e.file != null) {
if (e.type == Entry.NEW || e.type == Entry.FILECH) {
try (InputStream is = Files.newInputStream(e.file)) {
is.transferTo(os);
}
}
Files.delete(e.file);
tmppaths.remove(e.file);
}
}
// sync the zip file system, if there is any udpate
private void sync() throws IOException {
// check ex-closer
if (!exChClosers.isEmpty()) {
for (ExistingChannelCloser ecc : exChClosers) {
if (ecc.closeAndDeleteIfDone()) {
exChClosers.remove(ecc);
}
}
}
if (!hasUpdate)
return;
Path tmpFile = createTempFileInSameDirectoryAs(zfpath);
try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tmpFile, WRITE)))
{
try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tmpFile, WRITE))) {
ArrayList<Entry> elist = new ArrayList<>(inodes.size());
long written = 0;
byte[] buf = new byte[8192];
Entry e = null;
byte[] buf = null;
Entry e;
// write loc
for (IndexNode inode : inodes.values()) {
......@@ -1245,11 +1262,13 @@ class ZipFileSystem extends FileSystem {
// LOC in new file and simply copy the rest (data and
// ext) without enflating/deflating from the old zip
// file LOC entry.
if (buf == null)
buf = new byte[8192];
written += copyLOCEntry(e, true, os, written, buf);
} else { // NEW, FILECH or CEN
e.locoff = written;
written += e.writeLOC(os); // write loc header
written += writeEntry(e, os, buf);
written += writeEntry(e, os);
}
elist.add(e);
} catch (IOException x) {
......@@ -1265,6 +1284,8 @@ class ZipFileSystem extends FileSystem {
}
e = Entry.readCEN(this, inode);
try {
if (buf == null)
buf = new byte[8192];
written += copyLOCEntry(e, false, os, written, buf);
elist.add(e);
} catch (IOException x) {
......@@ -1282,9 +1303,23 @@ class ZipFileSystem extends FileSystem {
end.cenlen = written - end.cenoff;
end.write(os, written, forceEnd64);
}
if (!streams.isEmpty()) {
//
// There are outstanding input streams open on existing "ch",
// so, don't close the "cha" and delete the "file for now, let
// the "ex-channel-closer" to handle them
Path path = createTempFileInSameDirectoryAs(zfpath);
ExistingChannelCloser ecc = new ExistingChannelCloser(path,
ch,
streams);
Files.move(zfpath, path, REPLACE_EXISTING);
exChClosers.add(ecc);
streams = Collections.synchronizedSet(new HashSet<>());
} else {
ch.close();
Files.delete(zfpath);
}
ch.close();
Files.delete(zfpath);
Files.move(tmpFile, zfpath, REPLACE_EXISTING);
hasUpdate = false; // clear
}
......@@ -1342,11 +1377,15 @@ class ZipFileSystem extends FileSystem {
} else {
os = new ByteArrayOutputStream((e.size > 0)? (int)e.size : 8192);
}
return new EntryOutputStream(e, os);
if (e.method == METHOD_DEFLATED) {
return new DeflatingEntryOutputStream(e, os);
} else {
return new EntryOutputStream(e, os);
}
}
private class EntryOutputStream extends FilterOutputStream {
private Entry e;
private final Entry e;
private long written;
private boolean isClosed;
......@@ -1383,13 +1422,56 @@ class ZipFileSystem extends FileSystem {
}
}
// Output stream returned when writing "deflated" entries into memory,
// to enable eager (possibly parallel) deflation and reduce memory required.
private class DeflatingEntryOutputStream extends DeflaterOutputStream {
private final CRC32 crc;
private final Entry e;
private boolean isClosed;
DeflatingEntryOutputStream(Entry e, OutputStream os) throws IOException {
super(os, getDeflater());
this.e = Objects.requireNonNull(e, "Zip entry is null");
this.crc = new CRC32();
}
@Override
public synchronized void write(int b) throws IOException {
super.write(b);
crc.update(b);
}
@Override
public synchronized void write(byte b[], int off, int len)
throws IOException {
super.write(b, off, len);
crc.update(b, off, len);
}
@Override
public synchronized void close() throws IOException {
if (isClosed)
return;
isClosed = true;
finish();
e.size = def.getBytesRead();
e.csize = def.getBytesWritten();
e.crc = crc.getValue();
if (out instanceof ByteArrayOutputStream)
e.bytes = ((ByteArrayOutputStream)out).toByteArray();
super.close();
update(e);
releaseDeflater(def);
}
}
// Wrapper output stream class to write out a "stored" entry.
// (1) this class does not close the underlying out stream when
// being closed.
// (2) no need to be "synchronized", only used by sync()
private class EntryOutputStreamCRC32 extends FilterOutputStream {
private Entry e;
private CRC32 crc;
private final CRC32 crc;
private final Entry e;
private long written;
private boolean isClosed;
......@@ -1429,8 +1511,8 @@ class ZipFileSystem extends FileSystem {
// being closed.
// (2) no need to be "synchronized", only used by sync()
private class EntryOutputStreamDef extends DeflaterOutputStream {
private CRC32 crc;
private Entry e;
private final CRC32 crc;
private final Entry e;
private boolean isClosed;
EntryOutputStreamDef(Entry e, OutputStream os) throws IOException {
......@@ -1462,14 +1544,12 @@ class ZipFileSystem extends FileSystem {
private InputStream getInputStream(Entry e)
throws IOException
{
InputStream eis = null;
InputStream eis;
if (e.type == Entry.NEW) {
// now bytes & file is uncompressed.
if (e.bytes != null)
return new ByteArrayInputStream(e.bytes);
eis = new ByteArrayInputStream(e.bytes);
else if (e.file != null)
return Files.newInputStream(e.file);
eis = Files.newInputStream(e.file);
else
throw new ZipException("update entry data is missing");
} else if (e.type == Entry.FILECH) {
......@@ -1570,7 +1650,7 @@ class ZipFileSystem extends FileSystem {
len = (int) rem;
}
// readFullyAt()
long n = 0;
long n;
ByteBuffer bb = ByteBuffer.wrap(b);
bb.position(off);
bb.limit(off + len);
......@@ -1896,7 +1976,7 @@ class ZipFileSystem extends FileSystem {
this.type = type;
}
Entry (Entry e, int type) {
Entry(Entry e, int type) {
name(e.name);
this.isdir = e.isdir;
this.version = e.version;
......@@ -1919,7 +1999,7 @@ class ZipFileSystem extends FileSystem {
this.type = type;
}
Entry (byte[] name, Path file, int type) {
Entry(byte[] name, Path file, int type) {
this(name, type, false, METHOD_STORED);
this.file = file;
}
......@@ -2415,6 +2495,36 @@ class ZipFileSystem extends FileSystem {
}
}
private static class ExistingChannelCloser {
private final Path path;
private final SeekableByteChannel ch;
private final Set<InputStream> streams;
ExistingChannelCloser(Path path,
SeekableByteChannel ch,
Set<InputStream> streams) {
this.path = path;
this.ch = ch;
this.streams = streams;
}
/**
* If there are no more outstanding streams, close the channel and
* delete the backing file
*
* @return true if we're done and closed the backing file,
* otherwise false
* @throws IOException
*/
public boolean closeAndDeleteIfDone() throws IOException {
if (streams.isEmpty()) {
ch.close();
Files.delete(path);
return true;
}
return false;
}
}
// ZIP directory has two issues:
// (1) ZIP spec does not require the ZIP file to include
// directory entry
......
/*
* Copyright (c) 2009, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2009, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
......@@ -101,7 +101,7 @@ public class ZipFileSystemProvider extends FileSystemProvider {
if (filesystems.containsKey(realPath))
throw new FileSystemAlreadyExistsException();
}
ZipFileSystem zipfs = null;
ZipFileSystem zipfs;
try {
if (env.containsKey("multi-release")) {
zipfs = new JarFileSystem(this, path, env);
......@@ -128,13 +128,13 @@ public class ZipFileSystemProvider extends FileSystemProvider {
throws IOException
{
ensureFile(path);
try {
ZipFileSystem zipfs;
if (env.containsKey("multi-release")) {
zipfs = new JarFileSystem(this, path, env);
} else {
zipfs = new ZipFileSystem(this, path, env);
}
try {
ZipFileSystem zipfs;
if (env.containsKey("multi-release")) {
zipfs = new JarFileSystem(this, path, env);
} else {
zipfs = new ZipFileSystem(this, path, env);
}
return zipfs;
} catch (ZipException ze) {
String pname = path.toString();
......
......@@ -676,7 +676,7 @@ final class ZipPath implements Path {
@Override
public Iterator<Path> iterator() {
return new Iterator<Path>() {
return new Iterator<>() {
private int i = 0;
@Override
......@@ -746,8 +746,8 @@ final class ZipPath implements Path {
void setAttribute(String attribute, Object value, LinkOption... options)
throws IOException
{
String type = null;
String attr = null;
String type;
String attr;
int colonPos = attribute.indexOf(':');
if (colonPos == -1) {
type = "basic";
......@@ -772,8 +772,8 @@ final class ZipPath implements Path {
throws IOException
{
String view = null;
String attrs = null;
String view;
String attrs;
int colonPos = attributes.indexOf(':');
if (colonPos == -1) {
view = "basic";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册