提交 ad3c9a97 编写于 作者: J Josh Rosen

[SPARK-13695] Don't cache MEMORY_AND_DISK blocks as bytes in memory after spills

When a cached block is spilled to disk and read back in serialized form (i.e. as bytes), the current BlockManager implementation will attempt to re-insert the serialized block into the MemoryStore even if the block's storage level requests deserialized caching.

This behavior adds some complexity to the MemoryStore but I don't think it offers many performance benefits and I'd like to remove it in order to simplify a larger refactoring patch. Therefore, this patch changes the behavior so that disk store reads will only cache bytes in the memory store for blocks with serialized storage levels.

There are two places where we request serialized bytes from the BlockStore:

1. getLocalBytes(), which is only called when reading local copies of TorrentBroadcast pieces. Broadcast pieces are always cached using a serialized storage level, so this won't lead to a mismatch in serialization forms if spilled bytes read from disk are cached as bytes in the memory store.
2. the non-shuffle-block branch in getBlockData(), which is only called by the NettyBlockRpcServer when responding to requests to read remote blocks. Caching the serialized bytes in memory will only benefit us if those cached bytes are read before they're evicted and the likelihood of that happening seems low since the frequency of remote reads of non-broadcast cached blocks seems very low. Caching these bytes when they have a low probability of being read is bad if it risks the eviction of blocks which are cached in their expected serialized/deserialized forms, since those blocks seem more likely to be read in local computation.

Given the argument above, I think this change is unlikely to cause performance regressions.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #11533 from JoshRosen/remove-memorystore-level-mismatch.
上级 78d3b605
......@@ -495,10 +495,9 @@ private[spark] class BlockManager(
}
} else {
// Otherwise, we also have to store something in the memory store
if (!level.deserialized || !asBlockResult) {
if (!level.deserialized && !asBlockResult) {
/* We'll store the bytes in memory if the block's storage level includes
* "memory serialized", or if it should be cached as objects in memory
* but we only requested its serialized bytes. */
* "memory serialized" and we requested its serialized bytes. */
memoryStore.putBytes(blockId, bytes.limit, () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we cannot
......
......@@ -88,6 +88,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
......@@ -106,6 +107,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
* The caller should guarantee that `size` is correct.
*/
def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Work on a duplicate - since the original input might be used elsewhere.
lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false)
......@@ -118,6 +120,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel): Either[Iterator[Any], Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
putIterator(blockId, values, level, allowPersistToDisk = true)
}
......@@ -138,6 +141,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
values: Iterator[Any],
level: StorageLevel,
allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
val unrolledValues = unrollSafely(blockId, values)
unrolledValues match {
case Left(arrayValues) =>
......
......@@ -585,36 +585,46 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
test("disk and memory storage") {
testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getSingleAndReleaseLock)
testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, getAsBytes = false)
}
test("disk and memory storage with getLocalBytes") {
testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getLocalBytesAndReleaseLock)
testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, getAsBytes = true)
}
test("disk and memory storage with serialization") {
testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getSingleAndReleaseLock)
testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = false)
}
test("disk and memory storage with serialization and getLocalBytes") {
testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getLocalBytesAndReleaseLock)
testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = true)
}
def testDiskAndMemoryStorage(
storageLevel: StorageLevel,
accessMethod: BlockManager => BlockId => Option[_]): Unit = {
getAsBytes: Boolean): Unit = {
store = makeBlockManager(12000)
val accessMethod =
if (getAsBytes) store.getLocalBytesAndReleaseLock else store.getSingleAndReleaseLock
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, storageLevel)
store.putSingle("a2", a2, storageLevel)
store.putSingle("a3", a3, storageLevel)
assert(accessMethod(store)("a2").isDefined, "a2 was not in store")
assert(accessMethod(store)("a3").isDefined, "a3 was not in store")
assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store")
assert(accessMethod(store)("a1").isDefined, "a1 was not in store")
assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
assert(accessMethod("a2").isDefined, "a2 was not in store")
assert(accessMethod("a3").isDefined, "a3 was not in store")
assert(!store.memoryStore.contains("a1"), "a1 was in memory store")
assert(accessMethod("a1").isDefined, "a1 was not in store")
val dataShouldHaveBeenCachedBackIntoMemory = {
if (storageLevel.deserialized) !getAsBytes
else getAsBytes
}
if (dataShouldHaveBeenCachedBackIntoMemory) {
assert(store.memoryStore.contains("a1"), "a1 was not in memory store")
} else {
assert(!store.memoryStore.contains("a1"), "a1 was in memory store")
}
}
test("LRU with mixed storage levels") {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册