未验证 提交 3a9c8e20 编写于 作者: 梦境迷离's avatar 梦境迷离 提交者: GitHub

Fix cache (#173)

* revert commit `STM`
上级 a622514e
......@@ -2,4 +2,5 @@ caffeine {
maximumSize = 100
expireAfterWriteSeconds = 60
disabledLog = true
calculateResultTimeout = "5 s"
}
\ No newline at end of file
......@@ -21,9 +21,9 @@
package org.bitlap.cacheable.caffeine
import zio.stm.STM
import zio.Chunk
import scala.concurrent.Await
import org.bitlap.cacheable.core._
import zio.ZIO
import zio.stream.ZStream
......@@ -38,12 +38,12 @@ object Implicits {
implicit def StreamUpdateCache[T]: ZStreamUpdateCache[Any, Throwable, T] = new ZStreamUpdateCache[Any, Throwable, T] {
override def evict(business: => ZStream[Any, Throwable, T])(identities: List[String]): ZStream[Any, Throwable, T] = {
ZStream.fromEffect(STM.atomically {
STM.foreach_(identities)(key => ZCaffeine.del(key))
}) *> ({
// 使用when会导致流变成空,继而使用runHead返回None
if (ZCaffeine.disabledLog) ZStream.unit else Utils.debugS(s"Caffeine ZStream update >>> identities:[$identities]")
} *> business)
for {
updateResult <- ZStream.fromIterable(identities).flatMap(key => ZStream.fromEffect(ZCaffeine.del(key))) *> business tap (ur =>
Utils.debug(s"Caffeine ZStream update: identities:[$identities], updateResult:[$ur]")
.unless(ZCaffeine.disabledLog)
)
} yield updateResult
}
}
......@@ -51,17 +51,19 @@ object Implicits {
override def getIfPresent(business: => ZStream[Any, Throwable, T])(identities: List[String], args: List[_]): ZStream[Any, Throwable, T] = {
val key = cacheKey(identities)
val field = cacheField(args)
// TODO fix it?
lazy val syncResult = zio.Runtime.default.unsafeRun(business.runCollect)
val stmResult = STM.atomically {
for {
chunk <- ZCaffeine.hGet[Chunk[T]](key, field).map(_.getOrElse(Chunk.empty))
ret <- if (chunk.isEmpty) ZCaffeine.hSet[Chunk[T]](key, field, syncResult) else STM.succeed(chunk)
} yield ret
}
val syncResultFuture = zio.Runtime.global.unsafeRunToFuture(business.runCollect)
lazy val result = Await.result(syncResultFuture, ZCaffeine.calculateResultTimeout)
for {
ret <- ZStream.fromEffect(stmResult)
_ <- if (ZCaffeine.disabledLog) ZStream.unit else Utils.debugS(s"Caffeine ZStream getIfPresent >>> identity:[$key],field:[$field],result:[$ret]")
chunk <- ZStream.fromEffect(
ZCaffeine.hGet[Chunk[T]](key, field).map(_.getOrElse(Chunk.empty))
.tap(cv =>
Utils.debug(s"Caffeine ZStream getIfPresent: identity:[$key],field:[$field],cacheValue:[$cv]")
.unless(ZCaffeine.disabledLog)
))
ret <- ZStream.fromEffect(if (chunk.isEmpty) ZCaffeine.hSet(key, field, result).as(result) else ZIO.succeed(chunk))
.tap(result => Utils.debug(s"Caffeine ZStream getIfPresent: identity:[$key],field:[$field],result:[$result]")
.unless(ZCaffeine.disabledLog)
)
r <- ZStream.fromIterable(ret)
} yield r
}
......@@ -69,12 +71,11 @@ object Implicits {
implicit def UpdateCache[T]: ZIOUpdateCache[Any, Throwable, T] = new ZIOUpdateCache[Any, Throwable, T] {
override def evict(business: => ZIO[Any, Throwable, T])(identities: List[String]): ZIO[Any, Throwable, T] = {
STM.atomically {
STM.foreach_(identities)(key => ZCaffeine.del(key))
} *> {
business.tap(updateResult => Utils.debug(s"Caffeine ZIO update >>> identities:[$identities],updateResult:[$updateResult]")
.unless(ZCaffeine.disabledLog))
}
for {
updateResult <- ZIO.foreach_(identities)(key => ZCaffeine.del(key)) *> business tap (updateResult =>
Utils.debug(s"Caffeine ZIO update: identities:[$identities], updateResult:[$updateResult]")
.unless(ZCaffeine.disabledLog))
} yield updateResult
}
}
......@@ -82,15 +83,16 @@ object Implicits {
override def getIfPresent(business: => ZIO[Any, Throwable, T])(identities: List[String], args: List[_]): ZIO[Any, Throwable, T] = {
val key = cacheKey(identities)
val field = cacheField(args)
lazy val syncResult = zio.Runtime.default.unsafeRun(business)
// TODO fix it?
STM.atomically {
for {
chunk <- ZCaffeine.hGet[T](key, field)
ret <- chunk.fold(ZCaffeine.hSet[T](key, field, syncResult))(c => STM.succeed(c))
} yield ret
}.tap(ret => Utils.debug(s"Caffeine ZIO getIfPresent >>> identity:[$key],field:[$field],result:[$ret]")
.unless(ZCaffeine.disabledLog))
for {
cacheValue <- ZCaffeine.hGet[T](key, field)
_ <- Utils.debug(s"Caffeine ZIO getIfPresent: identity:[$key], field:[$field], cacheValue:[$cacheValue]")
.unless(ZCaffeine.disabledLog)
result <- cacheValue.fold(business.tap(r => ZCaffeine.hSet(key, field, r).as(r)))(value => ZIO.effectTotal(value))
.tap(result =>
Utils.debug(s"Caffeine ZIO getIfPresent: identity:[$key], field:[$field], result:[$result]")
.unless(ZCaffeine.disabledLog)
)
} yield result
}
}
}
......@@ -23,10 +23,10 @@ package org.bitlap.cacheable.caffeine
import com.github.benmanes.caffeine.cache.{ Cache, Caffeine }
import com.typesafe.config.{ Config, ConfigFactory }
import org.bitlap.cacheable.core.Utils
import java.util.concurrent.{ ConcurrentHashMap, TimeUnit }
import zio.stm.{ TRef, ZSTM }
import zio.stm.USTM
import scala.concurrent.duration.Duration
/**
*
......@@ -35,66 +35,75 @@ import zio.stm.USTM
*/
object ZCaffeine {
import zio.{ Task, ZIO }
private val conf: Config = ConfigFactory.load("reference.conf")
private val custom: Config = ConfigFactory.load("application.conf").withFallback(conf)
private[caffeine] lazy val disabledLog: Boolean = custom.getBoolean("caffeine.disabledLog")
private[caffeine] lazy val calculateResultTimeout: Duration = Duration(custom.getString("caffeine.calculateResultTimeout"))
private lazy val maximumSize = custom.getInt("caffeine.maximumSize")
private lazy val expireAfterWriteSeconds = custom.getInt("caffeine.expireAfterWriteSeconds")
private lazy val maximumSize: Int = custom.getInt("caffeine.maximumSize")
private lazy val expireAfterWriteSeconds: Int = custom.getInt("caffeine.expireAfterWriteSeconds")
val hashCache: Cache[String, ConcurrentHashMap[String, Any]] = Caffeine.newBuilder()
.maximumSize(maximumSize)
.expireAfterWrite(expireAfterWriteSeconds, TimeUnit.SECONDS)
.build[String, ConcurrentHashMap[String, Any]]
private val cacheRef: USTM[TRef[Cache[String, ConcurrentHashMap[String, Any]]]] = TRef.make(hashCache)
def hGet[T](key: String, field: String): ZSTM[Any, Throwable, Option[T]] = {
for {
cache <- cacheRef
chm <- cache.get
} yield {
val hashMap = chm.getIfPresent(key)
if (hashMap == null || hashMap.isEmpty) {
None
} else {
val fieldValue = hashMap.get(field)
if (fieldValue == null || !fieldValue.isInstanceOf[T]) {
def hGet[T](key: String, field: String): Task[Option[T]] = {
Utils.effectBlocking {
key.synchronized {
val hashMap = hashCache.getIfPresent(key)
if (hashMap == null || hashMap.isEmpty) {
None
} else {
Some(fieldValue.asInstanceOf[T])
val fieldValue = hashMap.get(field)
if (fieldValue == null || !fieldValue.isInstanceOf[T]) {
None
} else {
Some(fieldValue.asInstanceOf[T])
}
}
}
}
}
def del(key: String): ZSTM[Any, Throwable, Unit] = {
for {
cache <- cacheRef
ret <- cache.update { stmCache =>
stmCache.invalidate(key)
stmCache
def hDel(key: String, field: String): Task[Unit] = {
Utils.effectBlocking {
key.synchronized {
val hashMap = hashCache.getIfPresent(key)
if (hashMap == null || hashMap.isEmpty) {
()
} else {
hashMap.remove(field)
hashCache.put(key, new ConcurrentHashMap(hashMap))
}
}
} yield ret
}
}
def hSet[T](key: String, field: String, value: Any): ZSTM[Any, Throwable, T] = {
for {
cache <- cacheRef
c <- cache.modify { stmCache =>
val hashMap = stmCache.getIfPresent(key)
def del(key: String): Task[Unit] = {
Utils.effectBlocking {
key.synchronized {
hashCache.invalidate(key)
}
}
}
def hSet(key: String, field: String, value: Any): Task[Unit] = {
Utils.effectBlocking {
key.synchronized {
val hashMap = hashCache.getIfPresent(key)
if (hashMap == null || hashMap.isEmpty) {
val chm = new ConcurrentHashMap[String, Any]()
chm.put(field, value)
stmCache.put(key, chm)
hashCache.put(key, chm)
} else {
hashMap.put(field, value)
stmCache.put(key, new ConcurrentHashMap(hashMap))
hashCache.put(key, new ConcurrentHashMap(hashMap))
}
stmCache -> stmCache
}
} yield c.getIfPresent(key).get(field).asInstanceOf[T]
}
}
}
......@@ -26,7 +26,6 @@ import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import zio.ZIO
import zio.stream.ZStream
import zio.stm.STM
import scala.util.Random
......@@ -102,10 +101,10 @@ class CacheEvictTest extends AnyFlatSpec with Matchers {
}
val result = runtime.unsafeRun(for {
_ <- STM.atomically(ZCaffeine.del("CacheableTest-" + readIOMethodName))
_ <- ZCaffeine.del("CacheableTest-" + readIOMethodName)
read <- readIOFunction(1, "hello")
update <- updateIOFunction(1, "hello")
cache <- STM.atomically(ZCaffeine.hGet[String]("CacheableTest-" + readIOMethodName, "1-hello"))
cache <- ZCaffeine.hGet[String]("CacheableTest-" + readIOMethodName, "1-hello")
} yield cache)
result shouldEqual None
}
......@@ -124,10 +123,10 @@ class CacheEvictTest extends AnyFlatSpec with Matchers {
}
val result = runtime.unsafeRun(for {
_ <- STM.atomically(ZCaffeine.del("CacheableTest-" + readStreamMethodName))
_ <- ZCaffeine.del("CacheableTest-" + readStreamMethodName)
read <- readStreamFunction(1, "hello").runHead
update <- updateStreamFunction(1, "hello").runHead
cache <- STM.atomically(ZCaffeine.hGet[String]("CacheableTest-" + readStreamMethodName, "1-hello"))
cache <- ZCaffeine.hGet[String]("CacheableTest-" + readStreamMethodName, "1-hello")
} yield cache
)
result shouldEqual None
......
......@@ -21,17 +21,14 @@
package org.bitlap.cacheable.caffeine
import org.bitlap.cacheable.core.cacheable
import org.bitlap.cacheable.core.{ cacheEvict, cacheable }
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import zio.stream.ZStream
import zio.{ Task, ZIO }
import org.bitlap.cacheable.core.cacheEvict
import zio.stm.STM
import zio.Chunk
import zio.{ Chunk, Task, ZIO }
import scala.util.Random
import java.util
import scala.util.Random
/**
*
......@@ -98,9 +95,9 @@ class CacheableTest extends AnyFlatSpec with Matchers {
}
val result = runtime.unsafeRun(for {
_ <- STM.atomically(ZCaffeine.del("CacheableTest-readIOFunction"))
_ <- ZCaffeine.del("CacheableTest-readIOFunction")
method <- readIOFunction(1, "hello")
cache <- STM.atomically(ZCaffeine.hGet[String]("CacheableTest-readIOFunction", "1-hello"))
cache <- ZCaffeine.hGet[String]("CacheableTest-readIOFunction", "1-hello")
} yield method -> cache
)
Some(result._1) shouldEqual result._2
......@@ -131,7 +128,7 @@ class CacheableTest extends AnyFlatSpec with Matchers {
}
val result = runtime.unsafeRun(for {
_ <- STM.atomically(ZCaffeine.del("CacheableTest-readIOFunction"))
_ <- ZCaffeine.del("CacheableTest-readIOFunction")
method <- readIOFunction(globalId.toInt).runHead
update <- updateIOFunction("lisi").runHead
after <- readIOFunction(globalId.toInt).runHead
......@@ -157,7 +154,7 @@ class CacheableTest extends AnyFlatSpec with Matchers {
val newId = Random.nextInt().toString
val result = runtime.unsafeRun(for {
_ <- STM.atomically(ZCaffeine.del("CacheableTest-readIOFunction"))
_ <- ZCaffeine.del("CacheableTest-readIOFunction")
method <- readIOFunction(globalId.toInt).runHead
update <- updateIOFunction(newId).runHead
after <- readIOFunction(globalId.toInt).runHead
......@@ -176,9 +173,9 @@ class CacheableTest extends AnyFlatSpec with Matchers {
println(chunk)
val result = runtime.unsafeRun(for {
_ <- STM.atomically(ZCaffeine.del("CacheableTest-readIOFunction"))
_ <- ZCaffeine.del("CacheableTest-readIOFunction")
method <- readIOFunction(1, "hello")
cache <- STM.atomically(ZCaffeine.hGet[Chunk[String]]("CacheableTest-readIOFunction", "1-hello").map(_.getOrElse(Chunk.empty)))
cache <- ZCaffeine.hGet[Chunk[String]]("CacheableTest-readIOFunction", "1-hello").map(_.getOrElse(Chunk.empty))
} yield method -> cache
)
result._1 shouldEqual result._2
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册