diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index dcf359e3c29fe3e8db242de29a0966d19e23fa00..b38e2ec57fe323105d05017142da9f31585c361f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -495,10 +495,9 @@ private[spark] class BlockManager( } } else { // Otherwise, we also have to store something in the memory store - if (!level.deserialized || !asBlockResult) { + if (!level.deserialized && !asBlockResult) { /* We'll store the bytes in memory if the block's storage level includes - * "memory serialized", or if it should be cached as objects in memory - * but we only requested its serialized bytes. */ + * "memory serialized" and we requested its serialized bytes. */ memoryStore.putBytes(blockId, bytes.limit, () => { // https://issues.apache.org/jira/browse/SPARK-6076 // If the file size is bigger than the free memory, OOM will happen. So if we cannot diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 12b70d1807994ebffdbada381618eb36623b1d78..bb72fe4bcafbeb95060f25eef50a6021a441b2f1 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -88,6 +88,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = { + require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // Work on a duplicate - since the original input might be used elsewhere. val bytes = _bytes.duplicate() bytes.rewind() @@ -106,6 +107,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * The caller should guarantee that `size` is correct. */ def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = { + require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // Work on a duplicate - since the original input might be used elsewhere. lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false) @@ -118,6 +120,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo blockId: BlockId, values: Iterator[Any], level: StorageLevel): Either[Iterator[Any], Long] = { + require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") putIterator(blockId, values, level, allowPersistToDisk = true) } @@ -138,6 +141,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo values: Iterator[Any], level: StorageLevel, allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = { + require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") val unrolledValues = unrollSafely(blockId, values) unrolledValues match { case Left(arrayValues) => diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index cfcbf1745d1b144ff5cdbb079ebf3c43ef020cc1..0485b0501c0300a62da2c439853ca199773b3cd8 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -585,36 +585,46 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } test("disk and memory storage") { - testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getSingleAndReleaseLock) + testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, getAsBytes = false) } test("disk and memory storage with getLocalBytes") { - testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getLocalBytesAndReleaseLock) + testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, getAsBytes = true) } test("disk and memory storage with serialization") { - testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getSingleAndReleaseLock) + testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = false) } test("disk and memory storage with serialization and getLocalBytes") { - testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getLocalBytesAndReleaseLock) + testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = true) } def testDiskAndMemoryStorage( storageLevel: StorageLevel, - accessMethod: BlockManager => BlockId => Option[_]): Unit = { + getAsBytes: Boolean): Unit = { store = makeBlockManager(12000) + val accessMethod = + if (getAsBytes) store.getLocalBytesAndReleaseLock else store.getSingleAndReleaseLock val a1 = new Array[Byte](4000) val a2 = new Array[Byte](4000) val a3 = new Array[Byte](4000) store.putSingle("a1", a1, storageLevel) store.putSingle("a2", a2, storageLevel) store.putSingle("a3", a3, storageLevel) - assert(accessMethod(store)("a2").isDefined, "a2 was not in store") - assert(accessMethod(store)("a3").isDefined, "a3 was not in store") - assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store") - assert(accessMethod(store)("a1").isDefined, "a1 was not in store") - assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store") + assert(accessMethod("a2").isDefined, "a2 was not in store") + assert(accessMethod("a3").isDefined, "a3 was not in store") + assert(!store.memoryStore.contains("a1"), "a1 was in memory store") + assert(accessMethod("a1").isDefined, "a1 was not in store") + val dataShouldHaveBeenCachedBackIntoMemory = { + if (storageLevel.deserialized) !getAsBytes + else getAsBytes + } + if (dataShouldHaveBeenCachedBackIntoMemory) { + assert(store.memoryStore.contains("a1"), "a1 was not in memory store") + } else { + assert(!store.memoryStore.contains("a1"), "a1 was in memory store") + } } test("LRU with mixed storage levels") {