未验证 提交 17ae7e36 编写于 作者: A Anatoly Sergeev 提交者: GitHub

update libraries versions (#684)

上级 340672de
......@@ -39,7 +39,7 @@ jobs:
fail-fast: false
matrix:
java: ['17']
scala: ['2.12.16', '2.13.8', '3.2.0']
scala: ['2.12.17', '2.13.8', '3.2.1']
steps:
- name: Checkout current branch
uses: actions/checkout@v3.1.0
......@@ -66,7 +66,7 @@ jobs:
fail-fast: false
matrix:
java: ['8', '11', '17']
scala: ['2.12.16', '2.13.8', '3.2.0']
scala: ['2.12.17', '2.13.8', '3.2.1']
steps:
- name: Checkout current branch
uses: actions/checkout@v3.1.0
......
......@@ -19,7 +19,7 @@ package zio.redis.benchmarks
import cats.effect.{IO => CIO}
import zio._
import zio.redis._
import zio.schema.codec.{Codec, ProtobufCodec}
import zio.schema.codec.{BinaryCodec, ProtobufCodec}
trait BenchmarkRuntime {
final def execute(query: ZIO[Redis, RedisError, Unit]): Unit =
......@@ -35,7 +35,7 @@ object BenchmarkRuntime {
private final val Layer =
ZLayer.make[Redis](
RedisExecutor.local,
ZLayer.succeed[Codec](ProtobufCodec),
ZLayer.succeed[BinaryCodec](ProtobufCodec),
RedisLive.layer
)
}
......@@ -16,6 +16,10 @@ inThisBuild(
)
)
ThisBuild / libraryDependencySchemes ++= Seq(
"org.scala-lang.modules" %% "scala-xml" % VersionScheme.Always
)
addCommandAlias("compileBenchmarks", "benchmarks/Jmh/compile")
addCommandAlias("compileSources", "example/Test/compile; redis/Test/compile")
addCommandAlias("check", "fixCheck; fmtCheck")
......@@ -41,12 +45,12 @@ lazy val redis =
.settings(stdSettings("zio-redis"))
.settings(
libraryDependencies ++= List(
"dev.zio" %% "zio-streams" % "2.0.2",
"dev.zio" %% "zio-logging" % "2.1.3",
"dev.zio" %% "zio-schema" % "0.2.1",
"dev.zio" %% "zio-schema-protobuf" % "0.2.1" % Test,
"dev.zio" %% "zio-test" % "2.0.2" % Test,
"dev.zio" %% "zio-test-sbt" % "2.0.2" % Test,
"dev.zio" %% "zio-streams" % "2.0.4",
"dev.zio" %% "zio-logging" % "2.1.4",
"dev.zio" %% "zio-schema" % "0.3.0",
"dev.zio" %% "zio-schema-protobuf" % "0.3.0" % Test,
"dev.zio" %% "zio-test" % "2.0.4" % Test,
"dev.zio" %% "zio-test-sbt" % "2.0.4" % Test,
"org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1"
),
testFrameworks := List(new TestFramework("zio.test.sbt.ZTestFramework"))
......@@ -65,7 +69,7 @@ lazy val benchmarks =
"dev.profunktor" %% "redis4cats-effects" % "1.2.0",
"io.chrisdavenport" %% "rediculous" % "0.4.0",
"io.laserdisc" %% "laserdisc-fs2" % "0.5.0",
"dev.zio" %% "zio-schema-protobuf" % "0.2.1"
"dev.zio" %% "zio-schema-protobuf" % "0.3.1"
)
)
......@@ -77,13 +81,13 @@ lazy val example =
.settings(
publish / skip := true,
libraryDependencies ++= List(
"com.softwaremill.sttp.client3" %% "async-http-client-backend-zio" % "3.8.0",
"com.softwaremill.sttp.client3" %% "zio-json" % "3.8.0",
"dev.zio" %% "zio-streams" % "2.0.2",
"com.softwaremill.sttp.client3" %% "async-http-client-backend-zio" % "3.8.3",
"com.softwaremill.sttp.client3" %% "zio-json" % "3.8.3",
"dev.zio" %% "zio-streams" % "2.0.3",
"dev.zio" %% "zio-config-magnolia" % "3.0.2",
"dev.zio" %% "zio-config-typesafe" % "3.0.2",
"dev.zio" %% "zio-schema-protobuf" % "0.2.1",
"dev.zio" %% "zio-json" % "0.3.0-RC11",
"dev.zio" %% "zio-schema-protobuf" % "0.3.1",
"dev.zio" %% "zio-json" % "0.3.0",
"io.d11" %% "zhttp" % "2.0.0-RC11"
)
)
......@@ -22,9 +22,12 @@ import sttp.client3.asynchttpclient.zio.AsyncHttpClientZioBackend
import zhttp.service.Server
import zio._
import zio.redis.{RedisExecutor, RedisLive}
import zio.schema.codec.{Codec, ProtobufCodec}
import zio.schema.codec.{BinaryCodec, ProtobufCodec}
import scala.annotation.nowarn
object Main extends ZIOAppDefault {
@nowarn("cat=deprecation")
def run: ZIO[ZIOAppArgs with Scope, Any, ExitCode] =
Server
.start(9000, Api.routes)
......@@ -34,7 +37,7 @@ object Main extends ZIOAppDefault {
ContributorsCacheLive.layer,
RedisExecutor.layer,
RedisLive.layer,
ZLayer.succeed[Codec](ProtobufCodec)
ZLayer.succeed[BinaryCodec](ProtobufCodec)
)
.exitCode
}
sbt.version = 1.7.3
sbt.version = 1.8.0
addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.3")
addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.4")
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.10.4")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0")
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.11")
addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.5.0")
addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.2")
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.8.0")
addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.3.3")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.9.0")
addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.3.6")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0")
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.3")
addSbtPlugin("dev.zio" % "zio-sbt-website" % "0.0.0+80-e5b408eb-SNAPSHOT")
addSbtPlugin("dev.zio" % "zio-sbt-website" % "0.0.0+84-6fd7d64e-SNAPSHOT")
libraryDependencies += "org.snakeyaml" % "snakeyaml-engine" % "2.4"
......
......@@ -21,7 +21,7 @@ import zio.redis.ClusterExecutor._
import zio.redis.api.Cluster.AskingCommand
import zio.redis.codec.StringUtf8Codec
import zio.redis.options.Cluster._
import zio.schema.codec.Codec
import zio.schema.codec.BinaryCodec
import java.io.IOException
......@@ -70,8 +70,9 @@ final case class ClusterExecutor(
// TODO introduce max connection amount
private def executor(address: RedisUri): IO[RedisError.IOError, RedisExecutor] =
clusterConnectionRef.modifyZIO { cc =>
val executorOpt = cc.executors.get(address).map(es => (es.executor, cc))
val enrichedClusterIO = scope.extend(connectToNode(address)).map(es => (es.executor, cc.addExecutor(address, es)))
val executorOpt = cc.executors.get(address).map(es => (es.executor, cc))
val enrichedClusterIO =
scope.extend[Any](connectToNode(address)).map(es => (es.executor, cc.addExecutor(address, es)))
ZIO.fromOption(executorOpt).catchAll(_ => enrichedClusterIO)
}
......@@ -79,12 +80,12 @@ final case class ClusterExecutor(
clusterConnectionRef.updateZIO { connection =>
val addresses = connection.partitions.flatMap(_.addresses)
for {
cluster <- scope.extend(initConnectToCluster(addresses))
cluster <- scope.extend[Any](initConnectToCluster(addresses))
_ <- ZIO.foreachParDiscard(connection.executors) { case (_, es) => es.scope.close(Exit.unit) }
} yield cluster
}
private val RetryPolicy: Schedule[Any, Throwable, (zio.Duration, Long, Throwable)] =
private val RetryPolicy: Schedule[Any, Throwable, (Duration, Long, Throwable)] =
Schedule.exponential(config.retry.base, config.retry.factor) &&
Schedule.recurs(config.retry.maxRecurs) &&
Schedule.recurWhile[Throwable] {
......@@ -101,7 +102,7 @@ object ClusterExecutor {
config <- ZIO.service[RedisClusterConfig]
layerScope <- ZIO.scope
clusterScope <- Scope.make
executor <- clusterScope.extend(create(config, clusterScope))
executor <- clusterScope.extend[Any](create(config, clusterScope))
_ <- layerScope.addFinalizerExit(e => clusterScope.close(e))
} yield executor
}
......@@ -142,19 +143,19 @@ object ClusterExecutor {
private def connectToNode(address: RedisUri) =
for {
closableScope <- Scope.make
connection <- closableScope.extend(RedisConnectionLive.create(RedisConfig(address.host, address.port)))
executor <- closableScope.extend(SingleNodeExecutor.create(connection))
connection <- closableScope.extend[Any](RedisConnectionLive.create(RedisConfig(address.host, address.port)))
executor <- closableScope.extend[Any](SingleNodeExecutor.create(connection))
layerScope <- ZIO.scope
_ <- layerScope.addFinalizerExit(closableScope.close(_))
} yield ExecutorScope(executor, closableScope)
private def redis(address: RedisUri) = {
val executorLayer = ZLayer.succeed(RedisConfig(address.host, address.port)) >>> RedisExecutor.layer
val codecLayer = ZLayer.succeed[Codec](StringUtf8Codec)
val codecLayer = ZLayer.succeed[BinaryCodec](StringUtf8Codec)
val redisLayer = executorLayer ++ codecLayer >>> RedisLive.layer
for {
closableScope <- Scope.make
layer <- closableScope.extend(redisLayer.memoize)
layer <- closableScope.extend[Any](redisLayer.memoize)
_ <- logScopeFinalizer("Temporary redis connection is closed")
} yield (layer, closableScope)
}
......
......@@ -19,22 +19,22 @@ package zio.redis
import zio._
import zio.redis.options.Cluster.{Node, Partition, SlotRange}
import zio.schema.Schema
import zio.schema.codec.Codec
import zio.schema.codec.BinaryCodec
sealed trait Output[+A] {
self =>
private[redis] final def unsafeDecode(respValue: RespValue)(implicit codec: Codec): A =
private[redis] final def unsafeDecode(respValue: RespValue)(implicit codec: BinaryCodec): A =
respValue match {
case error: RespValue.Error => throw error.toRedisError
case success => tryDecode(success)
}
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): A
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): A
final def map[B](f: A => B): Output[B] =
new Output[B] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): B = f(self.tryDecode(respValue))
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): B = f(self.tryDecode(respValue))
}
}
......@@ -46,11 +46,11 @@ object Output {
def apply[A](implicit output: Output[A]): Output[A] = output
case object RespValueOutput extends Output[RespValue] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): RespValue = respValue
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): RespValue = respValue
}
case object BoolOutput extends Output[Boolean] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Boolean =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Boolean =
respValue match {
case RespValue.Integer(0) => false
case RespValue.Integer(1) => true
......@@ -59,7 +59,7 @@ object Output {
}
final case class ChunkOutput[+A](output: Output[A]) extends Output[Chunk[A]] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Chunk[A] =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[A] =
respValue match {
case RespValue.NullArray => Chunk.empty
case RespValue.Array(values) => values.map(output.tryDecode)
......@@ -68,7 +68,7 @@ object Output {
}
final case class ZRandMemberOutput[+A](output: Output[A]) extends Output[Chunk[A]] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Chunk[A] =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[A] =
respValue match {
case RespValue.NullBulkString => Chunk.empty
case RespValue.NullArray => Chunk.empty
......@@ -78,7 +78,7 @@ object Output {
}
final case class ChunkTuple2Output[+A, +B](_1: Output[A], _2: Output[B]) extends Output[Chunk[(A, B)]] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Chunk[(A, B)] =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[(A, B)] =
respValue match {
case RespValue.NullArray =>
Chunk.empty
......@@ -92,7 +92,7 @@ object Output {
}
final case class ZRandMemberTuple2Output[+A, +B](_1: Output[A], _2: Output[B]) extends Output[Chunk[(A, B)]] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Chunk[(A, B)] =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[(A, B)] =
respValue match {
case RespValue.NullBulkString => Chunk.empty
case RespValue.NullArray => Chunk.empty
......@@ -106,7 +106,7 @@ object Output {
}
case object DoubleOutput extends Output[Double] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Double =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Double =
respValue match {
case RespValue.BulkString(bytes) => decodeDouble(bytes)
case other => throw ProtocolError(s"$other isn't a double.")
......@@ -114,7 +114,7 @@ object Output {
}
private object DurationOutput extends Output[Long] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Long =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Long =
respValue match {
case RespValue.Integer(-2L) => throw ProtocolError("Key not found.")
case RespValue.Integer(-1L) => throw ProtocolError("Key has no expire.")
......@@ -128,7 +128,7 @@ object Output {
final val DurationSecondsOutput: Output[Duration] = DurationOutput.map(_.seconds)
case object LongOutput extends Output[Long] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Long =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Long =
respValue match {
case RespValue.Integer(v) => v
case other => throw ProtocolError(s"$other isn't an integer")
......@@ -136,7 +136,7 @@ object Output {
}
final case class OptionalOutput[+A](output: Output[A]) extends Output[Option[A]] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Option[A] =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Option[A] =
respValue match {
case RespValue.NullBulkString | RespValue.NullArray => None
case RespValue.BulkString(value) if value.isEmpty => None
......@@ -145,7 +145,7 @@ object Output {
}
final case class ScanOutput[+A](output: Output[A]) extends Output[(Long, Chunk[A])] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): (Long, Chunk[A]) =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): (Long, Chunk[A]) =
respValue match {
case RespValue.ArrayValues(cursor @ RespValue.BulkString(_), RespValue.Array(items)) =>
(cursor.asLong, items.map(output.tryDecode))
......@@ -155,7 +155,7 @@ object Output {
}
case object KeyElemOutput extends Output[Option[(String, String)]] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Option[(String, String)] =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Option[(String, String)] =
respValue match {
case RespValue.NullArray =>
None
......@@ -166,7 +166,7 @@ object Output {
}
case object StringOutput extends Output[String] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): String =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): String =
respValue match {
case RespValue.SimpleString(s) => s
case other => throw ProtocolError(s"$other isn't a simple string")
......@@ -174,7 +174,7 @@ object Output {
}
case object MultiStringOutput extends Output[String] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): String =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): String =
respValue match {
case s @ RespValue.BulkString(_) => s.asString
case other => throw ProtocolError(s"$other isn't a bulk string")
......@@ -182,7 +182,7 @@ object Output {
}
case object BulkStringOutput extends Output[Chunk[Byte]] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Chunk[Byte] =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[Byte] =
respValue match {
case RespValue.BulkString(value) => value
case other => throw ProtocolError(s"$other isn't a bulk string")
......@@ -190,15 +190,15 @@ object Output {
}
final case class ArbitraryOutput[A]()(implicit schema: Schema[A]) extends Output[A] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): A =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): A =
respValue match {
case RespValue.BulkString(s) => codec.decode(schema)(s).fold(e => throw CodecError(e), identity)
case RespValue.BulkString(s) => codec.decode(schema)(s).fold(e => throw CodecError(e.message), identity)
case other => throw ProtocolError(s"$other isn't a bulk string")
}
}
final case class Tuple2Output[+A, +B](_1: Output[A], _2: Output[B]) extends Output[(A, B)] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): (A, B) =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): (A, B) =
respValue match {
case RespValue.ArrayValues(a: RespValue, b: RespValue) => (_1.tryDecode(a), _2.tryDecode(b))
case other => throw ProtocolError(s"$other isn't a tuple2")
......@@ -206,7 +206,7 @@ object Output {
}
final case class Tuple3Output[+A, +B, +C](_1: Output[A], _2: Output[B], _3: Output[C]) extends Output[(A, B, C)] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): (A, B, C) =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): (A, B, C) =
respValue match {
case RespValue.ArrayValues(a: RespValue, b: RespValue, c: RespValue) =>
(_1.tryDecode(a), _2.tryDecode(b), _3.tryDecode(c))
......@@ -215,7 +215,7 @@ object Output {
}
case object SingleOrMultiStringOutput extends Output[String] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): String =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): String =
respValue match {
case RespValue.SimpleString(s) => s
case s @ RespValue.BulkString(_) => s.asString
......@@ -224,7 +224,7 @@ object Output {
}
final case class MultiStringChunkOutput[+A](output: Output[A]) extends Output[Chunk[A]] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Chunk[A] =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[A] =
respValue match {
case RespValue.NullBulkString => Chunk.empty
case s @ RespValue.BulkString(_) => Chunk.single(output.tryDecode(s))
......@@ -234,7 +234,7 @@ object Output {
}
case object TypeOutput extends Output[RedisType] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): RedisType =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): RedisType =
respValue match {
case RespValue.SimpleString("string") => RedisType.String
case RespValue.SimpleString("list") => RedisType.List
......@@ -247,7 +247,7 @@ object Output {
}
case object UnitOutput extends Output[Unit] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Unit =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Unit =
respValue match {
case RespValue.SimpleString("OK") => ()
case other => throw ProtocolError(s"$other isn't unit.")
......@@ -255,7 +255,7 @@ object Output {
}
case object ResetOutput extends Output[Unit] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Unit =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Unit =
respValue match {
case RespValue.SimpleString("RESET") => ()
case other => throw ProtocolError(s"$other isn't unit.")
......@@ -263,7 +263,7 @@ object Output {
}
case object GeoOutput extends Output[Chunk[Option[LongLat]]] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Chunk[Option[LongLat]] =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[Option[LongLat]] =
respValue match {
case RespValue.NullArray =>
Chunk.empty
......@@ -281,7 +281,7 @@ object Output {
}
case object GeoRadiusOutput extends Output[Chunk[GeoView]] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Chunk[GeoView] =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[GeoView] =
respValue match {
case RespValue.Array(elements) =>
elements.map {
......@@ -307,7 +307,7 @@ object Output {
}
final case class KeyValueOutput[K, V](outK: Output[K], outV: Output[V]) extends Output[Map[K, V]] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Map[K, V] =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Map[K, V] =
respValue match {
case RespValue.NullArray =>
Map.empty[K, V]
......@@ -336,7 +336,7 @@ object Output {
keySchema: Schema[K],
valueSchema: Schema[V]
) extends Output[StreamEntry[I, K, V]] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): StreamEntry[I, K, V] =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): StreamEntry[I, K, V] =
respValue match {
case RespValue.Array(Seq(id @ RespValue.BulkString(_), value)) =>
val entryId = ArbitraryOutput[I]().unsafeDecode(id)
......@@ -352,7 +352,7 @@ object Output {
keySchema: Schema[K],
valueSchema: Schema[V]
) extends Output[Chunk[StreamEntry[I, K, V]]] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Chunk[StreamEntry[I, K, V]] =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[StreamEntry[I, K, V]] =
ChunkOutput(StreamEntryOutput[I, K, V]()).unsafeDecode(respValue)
}
......@@ -362,14 +362,14 @@ object Output {
keySchema: Schema[K],
valueSchema: Schema[V]
) extends Output[StreamChunk[N, I, K, V]] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): StreamChunk[N, I, K, V] = {
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): StreamChunk[N, I, K, V] = {
val (name, entries) = Tuple2Output(ArbitraryOutput[N](), StreamEntriesOutput[I, K, V]()).unsafeDecode(respValue)
StreamChunk(name, entries)
}
}
case object StreamGroupsInfoOutput extends Output[Chunk[StreamGroupsInfo]] {
override protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Chunk[StreamGroupsInfo] =
override protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[StreamGroupsInfo] =
respValue match {
case RespValue.NullArray => Chunk.empty
case RespValue.Array(messages) =>
......@@ -408,7 +408,7 @@ object Output {
}
case object StreamConsumersInfoOutput extends Output[Chunk[StreamConsumersInfo]] {
override protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Chunk[StreamConsumersInfo] =
override protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[StreamConsumersInfo] =
respValue match {
case RespValue.NullArray => Chunk.empty
case RespValue.Array(messages) =>
......@@ -448,7 +448,7 @@ object Output {
extends Output[StreamInfoWithFull.FullStreamInfo[I, K, V]] {
override protected def tryDecode(
respValue: RespValue
)(implicit codec: Codec): StreamInfoWithFull.FullStreamInfo[I, K, V] = {
)(implicit codec: BinaryCodec): StreamInfoWithFull.FullStreamInfo[I, K, V] = {
var streamInfoFull: StreamInfoWithFull.FullStreamInfo[I, K, V] = StreamInfoWithFull.FullStreamInfo.empty
respValue match {
// Note that you should not rely on the fields exact position. see https://redis.io/commands/xinfo
......@@ -584,7 +584,7 @@ object Output {
}
final case class StreamInfoOutput[I: Schema, K: Schema, V: Schema]() extends Output[StreamInfo[I, K, V]] {
override protected def tryDecode(respValue: RespValue)(implicit codec: Codec): StreamInfo[I, K, V] = {
override protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): StreamInfo[I, K, V] = {
var streamInfo: StreamInfo[I, K, V] = StreamInfo.empty
respValue match {
// Note that you should not rely on the fields exact position. see https://redis.io/commands/xinfo
......@@ -625,7 +625,7 @@ object Output {
}
case object XPendingOutput extends Output[PendingInfo] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): PendingInfo =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): PendingInfo =
respValue match {
case RespValue.Array(Seq(RespValue.Integer(total), f, l, ps)) =>
val first = OptionalOutput(MultiStringOutput).unsafeDecode(f)
......@@ -657,7 +657,7 @@ object Output {
}
case object PendingMessagesOutput extends Output[Chunk[PendingMessage]] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Chunk[PendingMessage] =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[PendingMessage] =
respValue match {
case RespValue.Array(messages) =>
messages.collect {
......@@ -680,7 +680,7 @@ object Output {
}
case object SetOutput extends Output[Boolean] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Boolean =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Boolean =
respValue match {
case RespValue.NullBulkString => false
case RespValue.SimpleString(_) => true
......@@ -689,7 +689,7 @@ object Output {
}
case object StrAlgoLcsOutput extends Output[LcsOutput] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): LcsOutput =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): LcsOutput =
respValue match {
case result @ RespValue.BulkString(_) => LcsOutput.Lcs(result.asString)
case RespValue.Integer(length) => LcsOutput.Length(length)
......@@ -728,7 +728,7 @@ object Output {
}
case object ClientTrackingInfoOutput extends Output[ClientTrackingInfo] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): ClientTrackingInfo =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): ClientTrackingInfo =
respValue match {
case RespValue.NullArray => throw ProtocolError(s"Array must not be empty")
case RespValue.Array(values) if values.length % 2 == 0 =>
......@@ -792,7 +792,7 @@ object Output {
}
case object ClientTrackingRedirectOutput extends Output[ClientTrackingRedirect] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): ClientTrackingRedirect =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): ClientTrackingRedirect =
respValue match {
case RespValue.Integer(-1L) => ClientTrackingRedirect.NotEnabled
case RespValue.Integer(0L) => ClientTrackingRedirect.NotRedirected
......@@ -802,7 +802,7 @@ object Output {
}
case object ClusterPartitionOutput extends Output[Partition] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Partition =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Partition =
respValue match {
case RespValue.NullArray => throw ProtocolError(s"Array must not be empty")
case RespValue.Array(values) =>
......@@ -816,7 +816,7 @@ object Output {
}
case object ClusterPartitionNodeOutput extends Output[Node] {
protected def tryDecode(respValue: RespValue)(implicit codec: Codec): Node =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Node =
respValue match {
case RespValue.NullArray => throw ProtocolError(s"Array must not be empty")
case RespValue.Array(values) =>
......
......@@ -18,7 +18,7 @@ package zio.redis
import zio._
import zio.redis.Input.{StringInput, Varargs}
import zio.schema.codec.Codec
import zio.schema.codec.BinaryCodec
final class RedisCommand[-In, +Out] private (val name: String, val input: Input[In], val output: Output[Out]) {
private[redis] def run(in: In): ZIO[Redis, RedisError, Out] =
......@@ -30,7 +30,7 @@ final class RedisCommand[-In, +Out] private (val name: String, val input: Input[
}
.refineToOrDie[RedisError]
private[redis] def resp(in: In, codec: Codec): Chunk[RespValue.BulkString] =
private[redis] def resp(in: In, codec: BinaryCodec): Chunk[RespValue.BulkString] =
Varargs(StringInput).encode(name.split(" "))(codec) ++ input.encode(in)(codec)
}
......
......@@ -37,7 +37,7 @@ final class SingleNodeExecutor(
*/
val run: IO[RedisError, AnyVal] =
ZIO.logTrace(s"$this Executable sender and reader has been started") *>
(send.repeat(Schedule.forever) race receive)
(send.repeat[Any, Long](Schedule.forever) race receive)
.tapError(e => ZIO.logWarning(s"Reconnecting due to error: $e") *> drainWith(e))
.retryWhile(True)
.tapError(e => ZIO.logError(s"Executor exiting: $e"))
......
......@@ -18,46 +18,46 @@ package zio.redis.codec
import zio.redis.RedisError.CodecError
import zio.schema.Schema
import zio.schema.StandardType.{DoubleType, IntType, LongType}
import zio.schema.codec.Codec
import zio.schema.codec.BinaryCodec.{BinaryDecoder, BinaryEncoder, BinaryStreamDecoder, BinaryStreamEncoder}
import zio.schema.codec.{BinaryCodec, DecodeError}
import zio.stream.ZPipeline
import zio.{Chunk, ZIO}
import zio.{Cause, Chunk, ZIO}
import java.nio.charset.StandardCharsets
private[redis] object StringUtf8Codec extends Codec {
def encoder[A](schema: Schema[A]): ZPipeline[Any, Nothing, A, Byte] =
ZPipeline.mapChunks(values => values.flatMap(Encoder.encode(schema, _)))
private[redis] object StringUtf8Codec extends BinaryCodec {
def encode[A](schema: Schema[A]): A => Chunk[Byte] = { a =>
Encoder.encode(schema, a)
}
def encoderFor[A](schema: Schema[A]): BinaryEncoder[A] =
new BinaryEncoder[A] {
def decoder[A](schema: Schema[A]): ZPipeline[Any, String, Byte, A] =
ZPipeline.mapChunksZIO(chunk => ZIO.fromEither(Decoder.decode(schema, chunk).map(Chunk(_))))
override def encode(value: A): Chunk[Byte] =
schema match {
case Schema.Primitive(_, _) => Chunk.fromArray(value.toString.getBytes(StandardCharsets.UTF_8))
case _ => throw CodecError("the codec support only primitives")
}
def decode[A](schema: Schema[A]): Chunk[Byte] => Either[String, A] = { ch =>
Decoder.decode(schema, ch)
}
override def streamEncoder: BinaryStreamEncoder[A] =
ZPipeline.mapChunks(_.flatMap(encode))
object Encoder {
def encode[A](schema: Schema[A], value: A): Chunk[Byte] =
schema match {
case Schema.Primitive(_, _) => Chunk.fromArray(value.toString.getBytes(StandardCharsets.UTF_8))
case _ => throw CodecError("the codec support only primitives")
}
}
object Decoder {
def decode[A](schema: Schema[A], chunk: Chunk[Byte]): Either[String, A] = {
def utf8String = new String(chunk.toArray, StandardCharsets.UTF_8)
schema match {
case Schema.Primitive(IntType, _) => Right(utf8String.toInt.asInstanceOf[A])
case Schema.Primitive(LongType, _) => Right(utf8String.toLong.asInstanceOf[A])
case Schema.Primitive(DoubleType, _) => Right(utf8String.toDouble.asInstanceOf[A])
case Schema.Primitive(_, _) => Right(utf8String.asInstanceOf[A])
case _ => Left("the codec support only primitives")
}
def decoderFor[A](schema: Schema[A]): BinaryDecoder[A] =
new BinaryDecoder[A] {
override def decode(chunk: Chunk[Byte]): Either[DecodeError, A] = {
def utf8String = new String(chunk.toArray, StandardCharsets.UTF_8)
schema match {
case Schema.Primitive(IntType, _) => Right(utf8String.toInt.asInstanceOf[A])
case Schema.Primitive(LongType, _) => Right(utf8String.toLong.asInstanceOf[A])
case Schema.Primitive(DoubleType, _) => Right(utf8String.toDouble.asInstanceOf[A])
case Schema.Primitive(_, _) => Right(utf8String.asInstanceOf[A])
case _ => Left(DecodeError.ReadError(Cause.empty, "the codec support only primitives"))
}
}
override def streamDecoder: BinaryStreamDecoder[A] =
ZPipeline.mapChunksZIO(chunk => ZIO.fromEither(decode(chunk).map(Chunk(_))))
}
}
}
......@@ -17,16 +17,16 @@
package zio.redis
import zio._
import zio.schema.codec.Codec
import zio.schema.codec.BinaryCodec
trait Redis {
def codec: Codec
def codec: BinaryCodec
def executor: RedisExecutor
}
final case class RedisLive(codec: Codec, executor: RedisExecutor) extends Redis
final case class RedisLive(codec: BinaryCodec, executor: RedisExecutor) extends Redis
object RedisLive {
lazy val layer: URLayer[RedisExecutor with Codec, Redis] =
lazy val layer: URLayer[RedisExecutor with BinaryCodec, Redis] =
ZLayer.fromFunction(RedisLive.apply _)
}
package zio.redis
import zio._
import zio.schema.codec.{Codec, ProtobufCodec}
import zio.schema.codec.{BinaryCodec, ProtobufCodec}
import zio.test.TestAspect.tag
import zio.test._
......@@ -9,7 +9,7 @@ import java.time.Instant
import java.util.UUID
trait BaseSpec extends ZIOSpecDefault {
implicit val codec: Codec = ProtobufCodec
implicit val codec: BinaryCodec = ProtobufCodec
override def aspects: Chunk[TestAspectAtLeastR[Live]] =
Chunk.succeed(TestAspect.timeout(10.seconds))
......
......@@ -2,7 +2,7 @@ package zio.redis
import zio._
import zio.redis.RedisError.ProtocolError
import zio.schema.codec.{Codec, ProtobufCodec}
import zio.schema.codec.{BinaryCodec, ProtobufCodec}
import zio.test.Assertion.{exists => _, _}
import zio.test.TestAspect.{restore => _, _}
import zio.test._
......@@ -420,7 +420,7 @@ object KeysSpec {
ZLayer.succeed(RedisConfig("localhost", 6380)),
RedisConnectionLive.layer,
SingleNodeExecutor.layer,
ZLayer.succeed[Codec](ProtobufCodec),
ZLayer.succeed[BinaryCodec](ProtobufCodec),
RedisLive.layer
)
.fresh
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册