未验证 提交 7b507fcb 编写于 作者: D Dejan Mijić 提交者: GitHub

Hide executors (#809)

上级 9268e9d4
......@@ -49,21 +49,19 @@ object ZIORedisExample extends ZIOAppDefault {
def get[A: Schema]: BinaryCodec[A] = ProtobufCodec.protobufCodec
}
val myApp: ZIO[Redis, RedisError, Unit] = for {
redis <- ZIO.service[Redis]
_ <- redis.set("myKey", 8L, Some(1.minutes))
v <- redis.get("myKey").returning[Long]
_ <- Console.printLine(s"Value of myKey: $v").orDie
_ <- redis.hSet("myHash", ("k1", 6), ("k2", 2))
_ <- redis.rPush("myList", 1, 2, 3, 4)
_ <- redis.sAdd("mySet", "a", "b", "a", "c")
} yield ()
override def run = myApp.provide(
Redis.layer,
SingleNodeExecutor.local,
ZLayer.succeed[CodecSupplier](ProtobufCodecSupplier)
)
val myApp: ZIO[Redis, RedisError, Unit] =
for {
redis <- ZIO.service[Redis]
_ <- redis.set("myKey", 8L, Some(1.minutes))
v <- redis.get("myKey").returning[Long]
_ <- Console.printLine(s"Value of myKey: $v").orDie
_ <- redis.hSet("myHash", ("k1", 6), ("k2", 2))
_ <- redis.rPush("myList", 1, 2, 3, 4)
_ <- redis.sAdd("mySet", "a", "b", "a", "c")
} yield ()
override def run =
myApp.provide(Redis.local, ZLayer.succeed[CodecSupplier](ProtobufCodecSupplier))
}
```
......@@ -108,9 +106,8 @@ object EmbeddedRedisSpec extends ZIOSpecDefault {
}
).provideShared(
EmbeddedRedis.layer,
SingleNodeExecutor.layer,
ZLayer.succeed[CodecSupplier](ProtobufCodecSupplier),
Redis.layer
Redis.singleNode
) @@ TestAspect.silentLogging
}
```
......
......@@ -35,10 +35,9 @@ trait BenchmarkRuntime {
object BenchmarkRuntime {
private final val Layer =
ZLayer.make[Redis](
SingleNodeExecutor.local,
Redis.local,
ZLayer.succeed[CodecSupplier](new CodecSupplier {
def get[A: Schema]: BinaryCodec[A] = ProtobufCodec.protobufCodec
}),
Redis.layer
})
)
}
......@@ -43,11 +43,10 @@ object EmbeddedRedisSpec extends ZIOSpecDefault {
}
).provideShared(
EmbeddedRedis.layer,
SingleNodeExecutor.layer,
ZLayer.succeed[CodecSupplier](new CodecSupplier {
def get[A: Schema]: BinaryCodec[A] = ProtobufCodec.protobufCodec
}),
Redis.layer
Redis.singleNode
) @@ TestAspect.silentLogging
}
......@@ -33,8 +33,7 @@ object Main extends ZIOAppDefault {
AppConfig.layer,
ContributorsCache.layer,
HttpClientZioBackend.layer(),
Redis.layer,
SingleNodeExecutor.layer,
Redis.singleNode,
ZLayer.succeed[CodecSupplier](new CodecSupplier {
def get[A: Schema]: BinaryCodec[A] = ProtobufCodec.protobufCodec
})
......
......@@ -17,6 +17,7 @@
package zio.redis
import zio._
import zio.redis.internal._
trait Redis
extends api.Connection
......@@ -33,7 +34,16 @@ trait Redis
with api.Cluster
object Redis {
lazy val layer: URLayer[RedisExecutor with CodecSupplier, Redis] =
lazy val cluster: ZLayer[CodecSupplier & RedisClusterConfig, RedisError, Redis] =
ClusterExecutor.layer >>> makeLayer
lazy val local: ZLayer[CodecSupplier, RedisError.IOError, Redis] =
SingleNodeExecutor.local >>> makeLayer
lazy val singleNode: ZLayer[CodecSupplier & RedisConfig, RedisError.IOError, Redis] =
SingleNodeExecutor.layer >>> makeLayer
private def makeLayer: URLayer[CodecSupplier & RedisExecutor, Redis] =
ZLayer {
for {
codecSupplier <- ZIO.service[CodecSupplier]
......
......@@ -20,7 +20,7 @@ import zio.redis.Input._
import zio.redis.Output.{ChunkOutput, ClusterPartitionOutput, UnitOutput}
import zio.redis._
import zio.redis.api.Cluster.{AskingCommand, ClusterSetSlots, ClusterSlots}
import zio.redis.internal.{RedisCommand, RedisEnvironment}
import zio.redis.internal.{RedisCommand, RedisEnvironment, RedisExecutor}
import zio.redis.options.Cluster.SetSlotSubCommand._
import zio.redis.options.Cluster.{Partition, Slot}
import zio.{Chunk, IO}
......
/*
* Copyright 2021 John A. De Goes and the ZIO contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zio.redis.internal
import zio._
import zio.redis._
import zio.redis.options.Cluster.{Partition, Slot}
private[redis] final case class ClusterConnection(
partitions: Chunk[Partition],
executors: Map[RedisUri, ExecutorScope],
slots: Map[Slot, RedisUri]
) {
def executor(slot: Slot): Option[RedisExecutor] = executors.get(slots(slot)).map(_.executor)
def addExecutor(uri: RedisUri, es: ExecutorScope): ClusterConnection =
copy(executors = executors + (uri -> es))
}
......@@ -14,22 +14,23 @@
* limitations under the License.
*/
package zio.redis
package zio.redis.internal
import zio._
import zio.redis.ClusterExecutor._
import zio.redis._
import zio.redis.api.Cluster.AskingCommand
import zio.redis.internal.{RedisConnection, RespCommand, RespCommandArgument, RespValue}
import zio.redis.options.Cluster._
import java.io.IOException
final class ClusterExecutor private (
private[redis] final class ClusterExecutor private (
clusterConnection: Ref.Synchronized[ClusterConnection],
config: RedisClusterConfig,
scope: Scope.Closeable
) extends RedisExecutor {
import ClusterExecutor._
def execute(command: RespCommand): IO[RedisError, RespValue] = {
def execute(keySlot: Slot) =
......@@ -93,7 +94,7 @@ final class ClusterExecutor private (
}
}
object ClusterExecutor {
private[redis] object ClusterExecutor {
lazy val layer: ZLayer[RedisClusterConfig, RedisError, RedisExecutor] =
ZLayer.scoped {
......@@ -106,7 +107,7 @@ object ClusterExecutor {
} yield executor
}
private[redis] def create(
def create(
config: RedisClusterConfig,
scope: Scope.Closeable
): ZIO[Scope, RedisError, ClusterExecutor] =
......@@ -148,16 +149,15 @@ object ClusterExecutor {
_ <- layerScope.addFinalizerExit(closableScope.close(_))
} yield ExecutorScope(executor, closableScope)
private def redis(address: RedisUri) = {
val executorLayer = ZLayer.succeed(RedisConfig(address.host, address.port)) >>> SingleNodeExecutor.layer
val codecLayer = ZLayer.succeed[CodecSupplier](CodecSupplier.utf8)
val redisLayer = executorLayer ++ codecLayer >>> Redis.layer
private def redis(address: RedisUri) =
for {
closableScope <- Scope.make
configLayer = ZLayer.succeed(RedisConfig(address.host, address.port))
supplierLayer = ZLayer.succeed(CodecSupplier.utf8)
redisLayer = ZLayer.make[Redis](configLayer, supplierLayer, Redis.singleNode)
layer <- closableScope.extend[Any](redisLayer.memoize)
_ <- logScopeFinalizer("Temporary redis connection is closed")
} yield (layer, closableScope)
}
private def slotAddress(partitions: Chunk[Partition]) =
partitions.flatMap { p =>
......@@ -166,6 +166,7 @@ object ClusterExecutor {
private final val CusterKeyExecutorError =
RedisError.IOError(new IOException("Executor doesn't found. No way to dispatch this command to Redis Cluster"))
private final val CusterConnectionError =
RedisError.IOError(new IOException("The connection to cluster has been failed. Can't reach a single startup node."))
}
/*
* Copyright 2021 John A. De Goes and the ZIO contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zio.redis.internal
import zio.Scope
private[redis] final case class ExecutorScope(executor: RedisExecutor, scope: Scope.Closeable)
......@@ -16,7 +16,7 @@
package zio.redis.internal
import zio.redis.{CodecSupplier, RedisExecutor}
import zio.redis.CodecSupplier
import zio.schema.Schema
import zio.schema.codec.BinaryCodec
......
......@@ -14,11 +14,11 @@
* limitations under the License.
*/
package zio.redis
package zio.redis.internal
import zio.IO
import zio.redis.internal.{RespCommand, RespValue}
import zio.redis.RedisError
trait RedisExecutor {
private[redis] def execute(command: RespCommand): IO[RedisError, RespValue]
private[redis] trait RedisExecutor {
def execute(command: RespCommand): IO[RedisError, RespValue]
}
......@@ -14,13 +14,13 @@
* limitations under the License.
*/
package zio.redis
package zio.redis.internal
import zio._
import zio.redis.SingleNodeExecutor._
import zio.redis.internal.{RedisConnection, RespCommand, RespValue}
import zio.redis.internal.SingleNodeExecutor._
import zio.redis.{RedisConfig, RedisError}
final class SingleNodeExecutor private (
private[redis] final class SingleNodeExecutor private (
connection: RedisConnection,
requests: Queue[Request],
responses: Queue[Promise[RedisError, RespValue]]
......@@ -70,20 +70,14 @@ final class SingleNodeExecutor private (
}
object SingleNodeExecutor {
private[redis] object SingleNodeExecutor {
lazy val layer: ZLayer[RedisConfig, RedisError.IOError, RedisExecutor] =
RedisConnection.layer >>> makeLayer
lazy val local: ZLayer[Any, RedisError.IOError, RedisExecutor] =
RedisConnection.local >>> makeLayer
private final case class Request(command: Chunk[RespValue.BulkString], promise: Promise[RedisError, RespValue])
private final val True: Any => Boolean = _ => true
private final val RequestQueueSize = 16
private[redis] def create(connection: RedisConnection): URIO[Scope, SingleNodeExecutor] =
def create(connection: RedisConnection): URIO[Scope, SingleNodeExecutor] =
for {
requests <- Queue.bounded[Request](RequestQueueSize)
responses <- Queue.unbounded[Promise[RedisError, RespValue]]
......@@ -92,6 +86,12 @@ object SingleNodeExecutor {
_ <- logScopeFinalizer(s"$executor Node Executor is closed")
} yield executor
private final case class Request(command: Chunk[RespValue.BulkString], promise: Promise[RedisError, RespValue])
private final val True: Any => Boolean = _ => true
private final val RequestQueueSize = 16
private def makeLayer: ZLayer[RedisConnection, RedisError.IOError, RedisExecutor] =
ZLayer.scoped(ZIO.serviceWithZIO[RedisConnection](create))
}
/*
* Copyright 2021 John A. De Goes and the ZIO contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zio.redis
import zio._
package object internal {
private[redis] def logScopeFinalizer(msg: String): URIO[Scope, Unit] =
for {
scope <- ZIO.scope
_ <- scope.addFinalizerExit {
case Exit.Success(_) => ZIO.logTrace(s"$msg with success")
case Exit.Failure(th) => ZIO.logTraceCause(s"$msg with failure", th)
}
} yield ()
}
......@@ -16,30 +16,16 @@
package zio.redis.options
import zio.redis.{RedisExecutor, RedisUri}
import zio.{Chunk, Scope}
import zio.Chunk
import zio.redis.RedisUri
object Cluster {
private[redis] final val SlotsAmount = 16384
final case class ExecutorScope(executor: RedisExecutor, scope: Scope.Closeable)
final case class ClusterConnection(
partitions: Chunk[Partition],
executors: Map[RedisUri, ExecutorScope],
slots: Map[Slot, RedisUri]
) {
def executor(slot: Slot): Option[RedisExecutor] = executors.get(slots(slot)).map(_.executor)
def addExecutor(uri: RedisUri, es: ExecutorScope): ClusterConnection =
copy(executors = executors + (uri -> es))
}
final case class Slot(number: Long) extends AnyVal
object Slot {
val Default: Slot = Slot(1)
final val Default: Slot = Slot(1)
}
final case class Node(id: String, address: RedisUri)
......
......@@ -28,13 +28,4 @@ package object redis
with options.Scripting {
type Id[+A] = A
private[redis] def logScopeFinalizer(msg: String): URIO[Scope, Unit] =
for {
scope <- ZIO.scope
_ <- scope.addFinalizerExit {
case Exit.Success(_) => ZIO.logTrace(s"$msg with success")
case Exit.Failure(th) => ZIO.logTraceCause(s"$msg with failure", th)
}
} yield ()
}
......@@ -34,11 +34,7 @@ object ApiSpec
hashSuite,
streamsSuite,
scriptingSpec
).provideShared(
SingleNodeExecutor.local,
Redis.layer,
ZLayer.succeed(ProtobufCodecSupplier)
)
).provideShared(Redis.local, ZLayer.succeed(ProtobufCodecSupplier))
private val clusterSuite =
suite("Cluster executor")(
......@@ -55,8 +51,7 @@ object ApiSpec
scriptingSpec,
clusterSpec
).provideShared(
ClusterExecutor.layer,
Redis.layer,
Redis.cluster,
ZLayer.succeed(ProtobufCodecSupplier),
ZLayer.succeed(RedisClusterConfig(Chunk(RedisUri("localhost", 5000))))
).filterNotTags(_.contains(BaseSpec.ClusterExecutorUnsupported))
......
......@@ -499,9 +499,8 @@ object KeysSpec {
ZLayer
.make[Redis](
ZLayer.succeed(RedisConfig("localhost", 6380)),
SingleNodeExecutor.layer,
ZLayer.succeed[CodecSupplier](ProtobufCodecSupplier),
Redis.layer
Redis.singleNode
)
.fresh
}
package zio.redis
package zio.redis.internal
import zio._
import zio.redis.internal.CRC16
import zio.redis._
import zio.redis.options.Cluster.{Slot, SlotsAmount}
import zio.test._
......@@ -28,9 +28,7 @@ object ClusterExecutorSpec extends BaseSpec {
value2 <- redis.get(key).returning[String] // have to redirect without error ASK
value3 <- redis.get(key).returning[String] // have to redirect without creating new connection
_ <- ZIO.serviceWithZIO[Redis](_.setSlotStable(keySlot)).provideLayer(destMasterConn)
} yield {
assertTrue(value1 == value2) && assertTrue(value2 == value3)
}
} yield assertTrue(value1 == value2) && assertTrue(value2 == value3)
} @@ TestAspect.flaky,
test("check client responsiveness when Moved redirect happened") {
for {
......@@ -59,28 +57,25 @@ object ClusterExecutorSpec extends BaseSpec {
_ <- ZIO.serviceWithZIO[Redis](_.setSlotNode(keySlot, destMaster.id)).provideLayer(sourceMasterConn)
value2 <- redis.get(key).returning[String] // have to refresh connection
value3 <- redis.get(key).returning[String] // have to get value without refreshing connection
} yield {
assertTrue(value1 == value2) && assertTrue(value2 == value3)
}
} yield assertTrue(value1 == value2) && assertTrue(value2 == value3)
}
).provideLayerShared(ClusterLayer)
private final def getRedisNodeLayer(uri: RedisUri): Layer[Any, Redis] =
ZLayer.make[Redis](
ZLayer.succeed(RedisConfig(uri.host, uri.port)),
SingleNodeExecutor.layer,
ZLayer.succeed(ProtobufCodecSupplier),
Redis.layer
Redis.singleNode
)
private val ClusterLayer: Layer[Any, Redis] = {
val address1 = RedisUri("localhost", 5010)
val address2 = RedisUri("localhost", 5000)
ZLayer.make[Redis](
ZLayer.succeed(RedisClusterConfig(Chunk(address1, address2))),
ClusterExecutor.layer.orDie,
ZLayer.succeed(ProtobufCodecSupplier),
Redis.layer
Redis.cluster
)
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册