提交 32130160 编写于 作者: U Ufuk Celebi

[FLINK-4150] [runtime] Don't clean up BlobStore on BlobServer shut down

The `BlobServer` acts as a local cache for uploaded BLOBs. The life-cycle of
each BLOB is bound to the life-cycle of the `BlobServer`. If the BlobServer
shuts down (on JobManager shut down), all local files will be removed.

With HA, BLOBs are persisted to another file system (e.g. HDFS) via the
`BlobStore` in order to have BLOBs available after a JobManager failure (or
shut down). These BLOBs are only allowed to be removed when the job that
requires them enters a globally terminal state (`FINISHED`, `CANCELLED`,
`FAILED`).

This commit removes the `BlobStore` clean up call from the `BlobServer`
shutdown. The `BlobStore` files will only be cleaned up via the
`BlobLibraryCacheManager`'s' clean up task (periodically or on
BlobLibraryCacheManager shutdown). This means that there is a chance that
BLOBs will linger around after the job has terminated, if the job manager
fails before the clean up.

This closes #2256.
上级 2818ee00
......@@ -301,9 +301,6 @@ public class BlobServer extends Thread implements BlobService {
LOG.error("BLOB server failed to properly clean up its storage directory.");
}
// Clean up the recovery directory
blobStore.cleanUp();
// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
// shutdown hook itself
if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
......
......@@ -64,6 +64,9 @@ class BlobServerConnection extends Thread {
/** The BLOB server. */
private final BlobServer blobServer;
/** The HA blob store. */
private final BlobStore blobStore;
/**
* Creates a new BLOB connection for a client request
*
......@@ -80,6 +83,7 @@ class BlobServerConnection extends Thread {
this.clientSocket = clientSocket;
this.blobServer = blobServer;
this.blobStore = blobServer.getBlobStore();
}
// --------------------------------------------------------------------------------------------
......@@ -182,7 +186,7 @@ class BlobServerConnection extends Thread {
blobFile = this.blobServer.getStorageLocation(jobID, key);
if (!blobFile.exists()) {
blobServer.getBlobStore().get(jobID, key, blobFile);
blobStore.get(jobID, key, blobFile);
}
}
else if (contentAddressable == CONTENT_ADDRESSABLE) {
......@@ -190,7 +194,7 @@ class BlobServerConnection extends Thread {
blobFile = blobServer.getStorageLocation(key);
if (!blobFile.exists()) {
blobServer.getBlobStore().get(key, blobFile);
blobStore.get(key, blobFile);
}
}
else {
......@@ -320,7 +324,7 @@ class BlobServerConnection extends Thread {
Files.move(incomingFile, storageFile);
incomingFile = null;
blobServer.getBlobStore().put(storageFile, jobID, key);
blobStore.put(storageFile, jobID, key);
outputStream.write(RETURN_OKAY);
}
......@@ -330,7 +334,7 @@ class BlobServerConnection extends Thread {
Files.move(incomingFile, storageFile);
incomingFile = null;
blobServer.getBlobStore().put(storageFile, blobKey);
blobStore.put(storageFile, blobKey);
// Return computed key to client for validation
outputStream.write(RETURN_OKAY);
......@@ -390,7 +394,7 @@ class BlobServerConnection extends Thread {
throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
}
blobServer.getBlobStore().delete(key);
blobStore.delete(key);
}
else if (type == NAME_ADDRESSABLE) {
byte[] jidBytes = new byte[JobID.SIZE];
......@@ -404,7 +408,7 @@ class BlobServerConnection extends Thread {
throw new IOException("Cannot delete BLOB file " + blobFile.getAbsolutePath());
}
blobServer.getBlobStore().delete(jobID, key);
blobStore.delete(jobID, key);
}
else if (type == JOB_ID_SCOPE) {
byte[] jidBytes = new byte[JobID.SIZE];
......@@ -413,7 +417,7 @@ class BlobServerConnection extends Thread {
blobServer.deleteJobDirectory(jobID);
blobServer.getBlobStore().deleteAll(jobID);
blobStore.deleteAll(jobID);
}
else {
throw new IOException("Unrecognized addressing type: " + type);
......
......@@ -143,7 +143,17 @@ class FileSystemBlobStore implements BlobStore {
try {
LOG.debug("Deleting {}.", blobPath);
FileSystem.get(new URI(blobPath)).delete(new Path(blobPath), true);
FileSystem fs = FileSystem.get(new URI(blobPath));
Path path = new Path(blobPath);
fs.delete(path, true);
// send a call to delete the directory containing the file. This will
// fail (and be ignored) when some files still exist.
try {
fs.delete(path.getParent(), false);
fs.delete(new Path(basePath), false);
} catch (IOException ignored) {}
}
catch (Exception e) {
LOG.warn("Failed to delete blob at " + blobPath);
......
......@@ -77,7 +77,7 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
// Initializing the clean up task
this.cleanupTimer = new Timer(true);
this.cleanupTimer.schedule(this, cleanupInterval);
this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval);
}
// --------------------------------------------------------------------------------------------
......@@ -200,6 +200,12 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
@Override
public void shutdown() throws IOException{
try {
run();
} catch (Throwable t) {
LOG.warn("Failed to run clean up task before shutdown", t);
}
blobService.shutdown();
cleanupTimer.cancel();
}
......@@ -210,7 +216,6 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
@Override
public void run() {
synchronized (lockObject) {
Iterator<Map.Entry<BlobKey, Integer>> entryIter = blobKeyReferenceCounters.entrySet().iterator();
while (entryIter.hasNext()) {
......
......@@ -139,6 +139,12 @@ public class BlobRecoveryITCase {
assertEquals(expected[i], actual[j]);
}
}
// Remove again
client.delete(keys[0]);
client.delete(keys[1]);
client.delete(jobId[0], testKey[0]);
client.delete(jobId[1], testKey[1]);
}
finally {
for (BlobServer s : server) {
......
......@@ -136,6 +136,12 @@ public class BlobLibraryCacheRecoveryITCase {
assertEquals(0, fis.available());
}
// Remove blobs again
try (BlobClient client = new BlobClient(serverAddress[1])) {
client.delete(keys.get(0));
client.delete(keys.get(1));
}
}
finally {
for (BlobServer s : server) {
......
......@@ -58,4 +58,9 @@ public class TestingLeaderElectionService implements LeaderElectionService {
hasLeadership = false;
contender.revokeLeadership();
}
public void reset() {
contender = null;
hasLeadership = false;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册