未验证 提交 f0fff253 编写于 作者: A Aleksandar Novaković 提交者: GitHub

Add logging capability (#141)

上级 84cd02a8
......@@ -5,6 +5,7 @@ import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations._
import zio.ZIO
import zio.logging.Logging
@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
......@@ -26,10 +27,9 @@ class PutBenchmarks {
@Benchmark
def laserdisc(): Unit = {
import _root_.laserdisc._
import _root_.laserdisc.auto._
import _root_.laserdisc.fs2._
import _root_.laserdisc.{ all => cmd }
import _root_.laserdisc.{ all => cmd, _ }
import cats.instances.list._
import cats.syntax.foldable._
......@@ -76,7 +76,7 @@ class PutBenchmarks {
def zio(): Unit = {
val effect = ZIO
.foreach_(items)(i => set(i, i, None, None, None))
.provideLayer(RedisExecutor.live(RedisHost, RedisPort).orDie)
.provideLayer(Logging.ignore >>> RedisExecutor.live(RedisHost, RedisPort).orDie)
unsafeRun(effect)
}
......
......@@ -44,6 +44,7 @@ lazy val redis =
.settings(
libraryDependencies ++= Seq(
"dev.zio" %% "zio-streams" % "1.0.3",
"dev.zio" %% "zio-logging" % "0.5.3",
"dev.zio" %% "zio-test" % "1.0.3" % Test,
"dev.zio" %% "zio-test-sbt" % "1.0.3" % Test
),
......
......@@ -6,6 +6,7 @@ import java.nio.ByteBuffer
import java.nio.channels.{ AsynchronousByteChannel, AsynchronousSocketChannel, Channel, CompletionHandler }
import zio._
import zio.logging._
import zio.stream.Stream
trait Interpreter {
......@@ -29,7 +30,8 @@ trait Interpreter {
private final class Live(
reqQueue: Queue[Request],
resQueue: Queue[Promise[RedisError, RespValue]],
byteStream: Managed[IOException, ByteStream.ReadWriteBytes]
byteStream: Managed[IOException, ByteStream.ReadWriteBytes],
logger: Logger[String]
) extends Service {
override def execute(command: Chunk[RespValue.BulkString]): IO[RedisError, RespValue] =
......@@ -66,30 +68,32 @@ trait Interpreter {
sendWith(rwBytes.write).forever race runReceive(rwBytes.read)
}
.tapError { e =>
// TODO - log that a reconnection is happening
resQueue.takeAll.flatMap(IO.foreach_(_)(_.fail(e)))
logger.warn(s"Reconnecting due to error: $e") *>
resQueue.takeAll.flatMap(IO.foreach_(_)(_.fail(e)))
}
.retryWhile(Function.const(true))
.tapError(e => logger.error(s"Executor exiting: $e"))
}
private def fromBytestream: ZLayer[ByteStream, RedisError.IOError, RedisExecutor] =
ZLayer.fromServiceManaged { env =>
for {
reqQueue <- Queue.bounded[Request](RequestQueueSize).toManaged_
resQueue <- Queue.unbounded[Promise[RedisError, RespValue]].toManaged_
live = new Live(reqQueue, resQueue, env.connect)
_ <- live.run.forkManaged
} yield live
private def fromBytestream: ZLayer[ByteStream with Logging, RedisError.IOError, RedisExecutor] =
ZLayer.fromServicesManaged[ByteStream.Service, Logger[String], Any, RedisError.IOError, RedisExecutor.Service] {
(byteStream: ByteStream.Service, logging: Logger[String]) =>
for {
reqQueue <- Queue.bounded[Request](RequestQueueSize).toManaged_
resQueue <- Queue.unbounded[Promise[RedisError, RespValue]].toManaged_
live = new Live(reqQueue, resQueue, byteStream.connect, logging)
_ <- live.run.forkManaged
} yield live
}
def live(address: SocketAddress): Layer[RedisError.IOError, RedisExecutor] =
ByteStream.socket(address).mapError(RedisError.IOError) >>> fromBytestream
def live(address: SocketAddress): ZLayer[Logging, RedisError.IOError, RedisExecutor] =
(ZLayer.identity[Logging] ++ ByteStream.socket(address).mapError(RedisError.IOError)) >>> fromBytestream
def live(host: String, port: Int = DefaultPort): Layer[RedisError.IOError, RedisExecutor] =
ByteStream.socket(host, port).mapError(RedisError.IOError) >>> fromBytestream
def live(host: String, port: Int = DefaultPort): ZLayer[Logging, RedisError.IOError, RedisExecutor] =
(ZLayer.identity[Logging] ++ ByteStream.socket(host, port).mapError(RedisError.IOError)) >>> fromBytestream
def loopback(port: Int = DefaultPort): Layer[RedisError.IOError, RedisExecutor] =
ByteStream.socketLoopback(port).mapError(RedisError.IOError) >>> fromBytestream
def loopback(port: Int = DefaultPort): ZLayer[Logging, RedisError.IOError, RedisExecutor] =
(ZLayer.identity[Logging] ++ ByteStream.socketLoopback(port).mapError(RedisError.IOError)) >>> fromBytestream
}
......@@ -134,29 +138,31 @@ trait Interpreter {
Left(IO.effect(channel.close()).ignore)
}
def socket(host: String, port: Int): Layer[IOException, ByteStream] =
def socket(host: String, port: Int): ZLayer[Logging, IOException, ByteStream] =
socket(IO.effectTotal(new InetSocketAddress(host, port)))
def socket(address: SocketAddress): Layer[IOException, ByteStream] = socket(UIO.succeed(address))
def socket(address: SocketAddress): ZLayer[Logging, IOException, ByteStream] = socket(UIO.succeed(address))
def socketLoopback(port: Int): Layer[IOException, ByteStream] =
def socketLoopback(port: Int): ZLayer[Logging, IOException, ByteStream] =
socket(IO.effectTotal(new InetSocketAddress(InetAddress.getLoopbackAddress, port)))
private def socket(getAddress: UIO[SocketAddress]): Layer[IOException, ByteStream] = {
private def socket(getAddress: UIO[SocketAddress]): ZLayer[Logging, IOException, ByteStream] = {
val makeBuffer = IO.effectTotal(ByteBuffer.allocateDirect(ResponseBufferSize))
ZLayer.fromEffect {
ZLayer.fromServiceM { logger =>
for {
address <- getAddress
readBuffer <- makeBuffer
writeBuffer <- makeBuffer
} yield new Connection(address, readBuffer, writeBuffer)
} yield new Connection(address, readBuffer, writeBuffer, logger)
}
}
private final class Connection(
address: SocketAddress,
readBuffer: ByteBuffer,
writeBuffer: ByteBuffer
writeBuffer: ByteBuffer,
logger: Logger[String]
) extends Service {
private def openChannel: Managed[IOException, AsynchronousSocketChannel] = {
......@@ -168,6 +174,7 @@ trait Interpreter {
channel
}
_ <- effectAsyncChannel[AsynchronousSocketChannel, Void](channel)(c => c.connect(address, null, _))
_ <- logger.info("Connected to the redis server.")
} yield channel
Managed.fromAutoCloseable(make).refineToOrDie[IOException]
}
......
package zio.redis
import zio.clock.Clock
import zio.logging.Logging
import zio.test._
object ApiSpec
......@@ -23,7 +24,7 @@ object ApiSpec
geoSuite,
hyperLogLogSuite,
hashSuite
).provideCustomLayerShared(Executor ++ Clock.live)
).provideCustomLayerShared(Logging.ignore >>> Executor ++ Clock.live)
private val Executor = RedisExecutor.loopback().orDie
}
......@@ -2,6 +2,7 @@ package zio.redis
import java.nio.charset.StandardCharsets
import zio.Chunk
import zio.logging.Logging
import zio.test.Assertion._
import zio.test._
......@@ -18,5 +19,5 @@ object ByteStreamSpec extends BaseSpec {
}
}
).provideCustomLayerShared(ByteStream.socketLoopback(RedisExecutor.DefaultPort).orDie)
).provideCustomLayerShared(Logging.ignore >>> ByteStream.socketLoopback(RedisExecutor.DefaultPort).orDie)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册