未验证 提交 30db77ac 编写于 作者: D Dejan Mijić 提交者: GitHub

Pass connection info through layers (#235)

上级 a145b6da
......@@ -12,5 +12,9 @@ trait BenchmarksUtils {
unsafeRunner.unsafeRun(f)
def zioUnsafeRun(source: ZIO[RedisExecutor, RedisError, Unit]): Unit =
unsafeRun(source.provideLayer(Logging.ignore >>> RedisExecutor.live(RedisHost, RedisPort).orDie))
unsafeRun(source.provideLayer(BenchmarksUtils.Layer))
}
object BenchmarksUtils {
private final val Layer = Logging.ignore >>> RedisExecutor.default.orDie
}
......@@ -11,13 +11,11 @@ import io.lettuce.core.ClientOptions
import laserdisc.fs2.RedisClient
trait RedisClients {
import RedisClients._
implicit val cs: ContextShift[CatsIO] = CatsIO.contextShift(ExecutionContext.global)
implicit val timer: Timer[CatsIO] = CatsIO.timer(ExecutionContext.global)
final val RedisHost = "127.0.0.1"
final val RedisPort = 6379
type RedisIO[A] = Redis[CatsIO, A]
type Redis4CatsClient[V] = RedisCommands[CatsIO, String, V]
......@@ -69,3 +67,8 @@ trait RedisClients {
Redis[CatsIO].withOptions(s"redis://$RedisHost:$RedisPort", ClientOptions.create(), longCodec)
}
}
object RedisClients {
private final val RedisHost = "127.0.0.1"
private final val RedisPort = 6379
}
package zio.redis
import java.io.{ EOFException, IOException }
import java.net.{ InetAddress, InetSocketAddress, SocketAddress, StandardSocketOptions }
import java.net.{ InetSocketAddress, SocketAddress, StandardSocketOptions }
import java.nio.ByteBuffer
import java.nio.channels.{ AsynchronousSocketChannel, Channel, CompletionHandler }
......@@ -15,22 +15,22 @@ private[redis] object ByteStream {
def write(chunk: Chunk[Byte]): IO[IOException, Unit]
}
def live(host: String, port: Int): ZLayer[Logging, RedisError.IOError, Has[Service]] =
live(new InetSocketAddress(host, port))
lazy val default: ZLayer[Logging, RedisError.IOError, Has[ByteStream.Service]] =
ZLayer.succeed(RedisConfig.Default) ++ ZLayer.identity[Logging] >>> live
def live(address: => SocketAddress): ZLayer[Logging, RedisError.IOError, Has[Service]] = connect(address)
def loopback(port: Int = RedisExecutor.DefaultPort): ZLayer[Logging, RedisError.IOError, Has[Service]] =
live(new InetSocketAddress(InetAddress.getLoopbackAddress, port))
lazy val live: ZLayer[Logging with Has[RedisConfig], RedisError.IOError, Has[ByteStream.Service]] =
ZLayer.fromServiceManaged[RedisConfig, Logging, RedisError.IOError, Service] { config =>
connect(new InetSocketAddress(config.host, config.port))
}
private[this] def connect(address: => SocketAddress): ZLayer[Logging, RedisError.IOError, Has[Service]] =
private[this] def connect(address: => SocketAddress): ZManaged[Logging, RedisError.IOError, ByteStream.Service] =
(for {
address <- UIO(address).toManaged_
makeBuffer = IO.effectTotal(ByteBuffer.allocateDirect(ResponseBufferSize))
readBuffer <- makeBuffer.toManaged_
writeBuffer <- makeBuffer.toManaged_
channel <- openChannel(address)
} yield new Connection(readBuffer, writeBuffer, channel)).mapError(RedisError.IOError).toLayer
} yield new Connection(readBuffer, writeBuffer, channel)).mapError(RedisError.IOError)
private[this] final val ResponseBufferSize = 1024
......
package zio.redis
import java.io.IOException
import java.net.SocketAddress
import scala.collection.compat.immutable.LazyList
......@@ -22,16 +21,13 @@ trait Interpreter {
def execute(command: Chunk[RespValue.BulkString]): IO[RedisError, RespValue]
}
def live(address: => SocketAddress): ZLayer[Logging, RedisError.IOError, RedisExecutor] =
(ZLayer.identity[Logging] ++ ByteStream.live(address)) >>> StreamedExecutor
lazy val default: ZLayer[Logging, RedisError.IOError, RedisExecutor] =
ZLayer.identity[Logging] ++ ByteStream.default >>> StreamedExecutor
def live(host: String, port: Int = DefaultPort): ZLayer[Logging, RedisError.IOError, RedisExecutor] =
(ZLayer.identity[Logging] ++ ByteStream.live(host, port)) >>> StreamedExecutor
lazy val live: ZLayer[Logging with Has[RedisConfig], RedisError.IOError, RedisExecutor] =
ZLayer.identity[Logging] ++ ByteStream.live >>> StreamedExecutor
def loopback(port: Int = DefaultPort): ZLayer[Logging, RedisError.IOError, RedisExecutor] =
(ZLayer.identity[Logging] ++ ByteStream.loopback(port)) >>> StreamedExecutor
val test: ZLayer[zio.random.Random, Nothing, RedisExecutor] = {
lazy val test: URLayer[zio.random.Random, RedisExecutor] = {
val makePickRandom: URIO[zio.random.Random, Int => USTM[Int]] =
for {
seed <- random.nextInt
......@@ -52,8 +48,6 @@ trait Interpreter {
}
}
private[redis] final val DefaultPort = 6379
private[this] final val RequestQueueSize = 16
private[this] final val StreamedExecutor =
......
package zio.redis
final case class RedisConfig(host: String, port: Int)
object RedisConfig {
lazy val Default = RedisConfig("localhost", 6379)
}
......@@ -29,7 +29,7 @@ object ApiSpec
hyperLogLogSuite,
hashSuite,
streamsSuite
).provideCustomLayerShared(Logging.ignore >>> Executor ++ Clock.live),
).provideCustomLayerShared(Clock.live ++ Logging.ignore >>> RedisExecutor.default.orDie),
suite("Test Executor")(
connectionSuite,
setsSuite
......@@ -37,6 +37,4 @@ object ApiSpec
.get
.provideCustomLayerShared(RedisExecutor.test)
)
private val Executor = RedisExecutor.loopback().orDie
}
......@@ -18,5 +18,5 @@ object ByteStreamSpec extends BaseSpec {
res <- stream.read.runHead
} yield assert(res)(isSome(equalTo('*'.toByte)))
}
).provideCustomLayer(Logging.ignore >>> ByteStream.loopback().orDie)
).provideCustomLayer(Logging.ignore >>> ByteStream.default.orDie)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册