未验证 提交 80348113 编写于 作者: D Dejan Mijić 提交者: GitHub

Wraps up the demo app (#241)

上级 8113326b
......@@ -37,10 +37,10 @@ lazy val redis =
.settings(buildInfoSettings("zio.redis"))
.settings(
libraryDependencies ++= Seq(
"dev.zio" %% "zio-streams" % "1.0.3",
"dev.zio" %% "zio-streams" % Zio,
"dev.zio" %% "zio-logging" % "0.5.4",
"dev.zio" %% "zio-test" % "1.0.3" % Test,
"dev.zio" %% "zio-test-sbt" % "1.0.3" % Test
"dev.zio" %% "zio-test" % Zio % Test,
"dev.zio" %% "zio-test-sbt" % Zio % Test
),
testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework"))
)
......@@ -68,15 +68,16 @@ lazy val example =
.settings(
skip in publish := true,
libraryDependencies ++= Seq(
"io.scalac" %% "zio-akka-http-interop" % "0.4.0",
"com.softwaremill.sttp.client" %% "core" % "2.2.9",
"com.softwaremill.sttp.client" %% "async-http-client-backend-zio" % "2.2.9",
"com.softwaremill.sttp.client" %% "circe" % "2.2.9",
"io.circe" %% "circe-core" % "0.12.3",
"io.circe" %% "circe-generic" % "0.12.3",
"de.heikoseeberger" %% "akka-http-circe" % "1.31.0",
"dev.zio" %% "zio-streams" % Zio,
"dev.zio" %% "zio-config-magnolia" % "1.0.0-RC30-1",
"dev.zio" %% "zio-config-typesafe" % "1.0.0-RC30-1"
"dev.zio" %% "zio-config-typesafe" % "1.0.0-RC30-1",
"dev.zio" %% "zio-prelude" % "1.0.0-RC1",
"io.circe" %% "circe-core" % "0.12.3",
"io.circe" %% "circe-generic" % "0.12.3",
"io.scalac" %% "zio-akka-http-interop" % "0.4.0"
),
scalacOptions in Compile := Seq("-Xlint:unused")
)
api {
host = "0.0.0.0"
port = 8080
}
example = {
redis = {
host = "0.0.0.0"
port = 6379
}
redis {
host = "0.0.0.0"
port = 6379
}
\ No newline at end of file
server = {
host = "0.0.0.0"
port = 8080
}
}
package example
sealed trait ApiError {
def asThrowable: Throwable =
this match {
case GithubUnavailable(msg) => new Throwable(msg)
case NoContributors(msg) => new Throwable(msg)
}
}
import scala.util.control.NoStackTrace
import akka.http.interop.ErrorResponse
import akka.http.scaladsl.model.{ HttpResponse, StatusCodes }
sealed trait ApiError extends NoStackTrace
final case class GithubUnavailable(msg: String) extends ApiError
final case class NoContributors(msg: String) extends ApiError
object ApiError {
case object CacheMiss extends ApiError
case object CorruptedData extends ApiError
case object GithubUnreachable extends ApiError
case object UnknownProject extends ApiError
implicit val errorResponse: ErrorResponse[ApiError] = {
case CorruptedData | GithubUnreachable => HttpResponse(StatusCodes.InternalServerError)
case CacheMiss | UnknownProject => HttpResponse(StatusCodes.NotFound)
}
}
package example
import io.circe.{ Decoder, Encoder }
import zio.prelude.Newtype
object Contributions extends Newtype[Int] {
implicit val decoder: Decoder[Contributions] = Decoder[Int].map(Contributions(_))
implicit val encoder: Encoder[Contributions] = Encoder[Int].contramap(Contributions.unwrap)
}
package example
import io.circe.Codec
import io.circe.generic.semiauto._
import io.circe.{ Decoder, _ }
final case class Contributor(login: String, contributions: Int)
final case class Contributor(login: Login, contributions: Contributions)
object Contributor {
implicit val decoder: Decoder[Contributor] = deriveDecoder[Contributor]
implicit val encoder: Encoder[Contributor] = deriveEncoder[Contributor]
implicit val codec: Codec[Contributor] = deriveCodec[Contributor]
}
package example
import example.Contributor._
import io.circe
import io.circe.parser.decode
import io.circe.syntax._
import sttp.client.asynchttpclient.zio.SttpClient
import sttp.client.circe.asJson
import sttp.client.{ UriContext, basicRequest }
import sttp.model.Uri
import io.circe.Codec
import io.circe.generic.semiauto._
import zio._
import zio.duration._
import zio.redis._
import zio.Chunk
object Contributors {
trait Service {
def getContributors(organization: String, repository: String): IO[ApiError, Chunk[Contributor]]
}
lazy val live: ZLayer[RedisExecutor with SttpClient, Nothing, Contributors] =
ZLayer.fromFunction { env =>
new Service {
override def getContributors(organization: String, repository: String): IO[ApiError, Chunk[Contributor]] =
sMembers(repository).flatMap { response => // key should be unique combination of org + repo
if (response.isEmpty)
for {
contributors <- fetchContributors(organization, repository)
_ <- sAdd(repository, contributors.asJson.toString, contributors.map(_.asJson.toString): _*)
_ <- pExpire(repository, 1.minute)
} yield contributors
else
deserialize(response).orElseFail(GithubUnavailable("Github Client Unavailable"))
}.orElseFail(GithubUnavailable("Github Client Unavailable")).provide(env)
private def deserialize(response: Chunk[String]): IO[circe.Error, Chunk[Contributor]] =
response.mapM(contributor => ZIO.fromEither(decode[Contributor](contributor)))
final case class Contributors(contributors: Chunk[Contributor]) extends AnyVal
private def fetchContributors(organization: String, repository: String): IO[ApiError, Chunk[Contributor]] = {
val request = basicRequest
.get(buildUrl(organization, repository))
.response(asJson[List[Contributor]])
SttpClient
.send(request)
.map(_.body)
.flatMap { case Right(value) => ZIO.succeed(Chunk.fromIterable(value)) }
.orElseFail(GithubUnavailable("Github Client Unavailable"))
.provide(env)
}
private def buildUrl(organization: String, repository: String): Uri =
uri"https://api.github.com/repos/$organization/$repository/contributors"
}
}
object Contributors {
implicit val codec: Codec[Contributors] = deriveCodec[Contributors]
}
package example
import example.ApiError._
import example.Contributor._
import io.circe.parser.decode
import io.circe.syntax._
import sttp.client.asynchttpclient.zio.SttpClient
import sttp.client.circe.asJson
import sttp.client.{ UriContext, basicRequest }
import sttp.model.Uri
import zio._
import zio.duration._
import zio.redis._
object ContributorsCache {
trait Service {
def fetchAll(repository: Repository): IO[ApiError, Contributors]
}
lazy val live: ZLayer[RedisExecutor with SttpClient, Nothing, ContributorsCache] =
ZLayer.fromFunction { env =>
new Service {
def fetchAll(repository: Repository): IO[ApiError, Contributors] =
(read(repository) <> retrieve(repository)).provide(env)
}
}
private[this] def read(repository: Repository): ZIO[RedisExecutor, ApiError, Contributors] =
get(repository.key)
.someOrFail(ApiError.CacheMiss)
.map(decode[Contributors])
.rightOrFail(ApiError.CorruptedData)
.refineToOrDie[ApiError]
private[this] def retrieve(repository: Repository): ZIO[RedisExecutor with SttpClient, ApiError, Contributors] =
for {
req <- ZIO.succeed(basicRequest.get(urlOf(repository)).response(asJson[Chunk[Contributor]]))
res <- SttpClient.send(req).orElseFail(GithubUnreachable)
contributors <- res.body.fold(_ => ZIO.fail(UnknownProject), ZIO.succeed(_))
_ <- cache(repository, contributors)
} yield Contributors(contributors)
private def cache(repository: Repository, contributors: Chunk[Contributor]): URIO[RedisExecutor, Any] =
ZIO
.fromOption(NonEmptyChunk.fromChunk(contributors))
.map(Contributors(_).asJson.noSpaces)
.flatMap(data => set(repository.key, data, Some(1.minute)).orDie)
.ignore
private[this] def urlOf(repository: Repository): Uri =
uri"https://api.github.com/repos/${repository.owner}/${repository.name}/contributors"
}
package example
import io.circe.{ Decoder, Encoder }
import zio.prelude.Newtype
object Login extends Newtype[String] {
implicit val decoder: Decoder[Login] = Decoder[String].map(Login(_))
implicit val encoder: Encoder[Login] = Encoder[String].contramap(Login.unwrap)
}
......@@ -18,32 +18,31 @@ import zio.redis.RedisExecutor
object Main extends App {
def run(args: List[String]): URIO[ZEnv, ExitCode] =
ZIO(ConfigFactory.load.resolve)
.flatMap(rawConfig => program.provideCustomLayer(prepareEnvironment(rawConfig)))
ZIO
.effect(ConfigFactory.load().getConfig("example"))
.map(makeLayer)
.flatMap(runServer)
.exitCode
private val program: RIO[HttpServer with ZEnv, Unit] =
HttpServer.start.tapM(_ => putStrLn("Server online.")).useForever
private def makeLayer(rawConfig: Config): TaskLayer[HttpServer] = {
val config = TypesafeConfig.fromTypesafeConfig(rawConfig, AppConfig.descriptor)
val serverConfig = config.narrow(_.server)
val redisConfig = config.narrow(_.redis)
private def prepareEnvironment(rawConfig: Config): TaskLayer[HttpServer] = {
val configLayer = TypesafeConfig.fromTypesafeConfig(rawConfig, AppConfig.descriptor)
val actorSystem =
ZManaged
.make(ZIO.succeed(ActorSystem("zio-redis-example")))(as => ZIO.fromFuture(_ => as.terminate()).either)
.toLayer
val actorSystemLayer =
ZManaged.make {
ZIO(ActorSystem("zio-redis-example"))
} { system =>
ZIO.fromFuture(_ => system.terminate()).either
}.toLayer
val redis = Logging.ignore ++ redisConfig >>> RedisExecutor.live
val sttp = AsyncHttpClientZioBackend.layer()
val cache = redis ++ sttp >>> ContributorsCache.live
val api = cache >>> Api.live
val routes = ZLayer.fromService[Api.Service, Route](_.routes)
val apiConfigLayer = configLayer.narrow(_.api)
val redisLayer = Logging.ignore ++ configLayer.narrow(_.redis) >>> RedisExecutor.live.orDie
val sttpLayer = AsyncHttpClientZioBackend.layer()
val contributorsLayer = redisLayer ++ sttpLayer >>> Contributors.live
val apiLayer = contributorsLayer >>> Api.live
val routesLayer = ZLayer.fromService[Api.Service, Route](_.routes)
(actorSystemLayer ++ apiConfigLayer ++ (apiLayer >>> routesLayer)) >>> HttpServer.live
(actorSystem ++ serverConfig ++ (api >>> routes)) >>> HttpServer.live
}
private def runServer(layer: TaskLayer[HttpServer]): RIO[ZEnv, Nothing] =
HttpServer.start.tapM(_ => putStrLn("Server online.")).useForever.provideCustomLayer(layer)
}
package example
import zio.prelude.Newtype
object Name extends Newtype[String]
package example
import zio.prelude.Newtype
object Owner extends Newtype[String]
package example
final case class Repository(owner: Owner, name: Name) {
lazy val key: String = s"$owner:$name"
}
package example.api
import akka.http.interop.{ ErrorResponse, ZIOSupport }
import akka.http.scaladsl.model.{ HttpResponse, StatusCodes }
import akka.http.interop.ZIOSupport
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
......@@ -10,32 +9,21 @@ import example._
import zio._
object Api {
trait Service {
def routes: Route
}
lazy val live: ZLayer[Contributors, Nothing, Api] =
ZLayer.fromService { contributorService =>
lazy val live: ZLayer[ContributorsCache, Nothing, Api] =
ZLayer.fromService { contributorsCache =>
new Service with ZIOSupport {
implicit val apiErrorResponse: ErrorResponse[ApiError] = {
case GithubUnavailable(_) => HttpResponse(StatusCodes.InternalServerError)
case NoContributors(_) => HttpResponse(StatusCodes.BadRequest)
}
def routes =
pathPrefix("contributors") {
path(Segment / Segment) { (organization, repository) =>
get {
complete {
contributorService.getContributors(organization, repository)
}
val routes =
path("repositories" / Segment / Segment / "contributors") { (owner, name) =>
get {
complete {
contributorsCache.fetchAll(Repository(Owner(owner), Name(name)))
}
}
}
}
}
val routes: URIO[Api, Route] = ZIO.access[Api](api => Route.seal(api.get.routes))
}
......@@ -6,7 +6,7 @@ import zio.config.ConfigDescriptor
import zio.config.magnolia.DeriveConfigDescriptor
import zio.redis.RedisConfig
final case class AppConfig(api: HttpServer.Config, redis: RedisConfig)
final case class AppConfig(redis: RedisConfig, server: HttpServer.Config)
object AppConfig {
val descriptor: ConfigDescriptor[AppConfig] = DeriveConfigDescriptor.descriptor[AppConfig]
......
import zio.Has
package object example {
type Contributors = Has[Contributors.Service]
type ContributorsCache = Has[ContributorsCache.Service]
type Contributions = Contributions.Type
type Login = Login.Type
type Owner = Owner.Type
type Name = Name.Type
}
......@@ -7,35 +7,7 @@ import scalafix.sbt.ScalafixPlugin.autoImport._
object BuildHelper {
val Scala212 = "2.12.12"
val Scala213 = "2.13.3"
private val stdOptions =
Seq(
"-deprecation",
"-encoding",
"UTF-8",
"-feature",
"-unchecked",
"-Xfatal-warnings"
)
private val std2xOptions =
Seq(
"-language:higherKinds",
"-language:existentials",
"-explaintypes",
"-Yrangepos",
"-Xlint:_,-missing-interpolator,-type-parameter-shadow",
"-Ywarn-numeric-widen",
"-Ywarn-value-discard"
)
private def optimizerOptions(optimize: Boolean) =
if (optimize)
Seq(
"-opt:l:inline",
"-opt-inline-from:zio.internal.**"
)
else Nil
val Zio = "1.0.3+90-84fbcdcb-SNAPSHOT"
def buildInfoSettings(packageName: String) =
Seq(
......@@ -44,47 +16,20 @@ object BuildHelper {
buildInfoObject := "BuildInfo"
)
def extraOptions(scalaVersion: String, optimize: Boolean) =
CrossVersion.partialVersion(scalaVersion) match {
case Some((2, 13)) =>
Seq(
"-Ywarn-unused:params,-implicits"
) ++ std2xOptions ++ optimizerOptions(optimize)
case Some((2, 12)) =>
Seq(
"-opt-warnings",
"-Ywarn-extra-implicit",
"-Ywarn-unused:_,imports",
"-Ywarn-unused:imports",
"-Ypartial-unification",
"-Yno-adapted-args",
"-Ywarn-inaccessible",
"-Ywarn-infer-any",
"-Ywarn-nullary-override",
"-Ywarn-nullary-unit",
"-Ywarn-unused:params,-implicits",
"-Xfuture",
"-Xsource:2.13",
"-Xmax-classfile-name",
"242"
) ++ std2xOptions ++ optimizerOptions(optimize)
case _ => Seq.empty
}
def stdSettings(prjName: String) =
Seq(
name := s"$prjName",
crossScalaVersions := Seq(Scala212, Scala213),
ThisBuild / scalaVersion := Scala213,
scalacOptions := stdOptions ++ extraOptions(scalaVersion.value, optimize = !isSnapshot.value),
ThisBuild / semanticdbEnabled := true,
ThisBuild / semanticdbOptions += "-P:semanticdb:synthetics:on",
ThisBuild / semanticdbVersion := scalafixSemanticdb.revision, // use Scalafix compatible version
ThisBuild / semanticdbVersion := scalafixSemanticdb.revision,
ThisBuild / scalafixScalaBinaryVersion := CrossVersion.binaryScalaVersion(scalaVersion.value),
ThisBuild / scalafixDependencies ++= List(
"com.github.liancheng" %% "organize-imports" % "0.4.4",
"com.github.vovapolu" %% "scaluzzi" % "0.1.16"
),
resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
parallelExecution in Test := true,
incOptions ~= (_.withLogRecompileOnMacro(false)),
autoAPIMappings := true
......
......@@ -3,6 +3,7 @@ addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.24"
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.10.0")
addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.4")
addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.0")
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.15")
addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.2.13")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2")
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.0")
......@@ -2,8 +2,8 @@ package zio.redis
import java.nio.charset.StandardCharsets
import zio.stream.{ Sink, ZSink }
import zio.{ Chunk, IO, Ref }
import zio.stream.Sink
import zio.{ Chunk, IO }
sealed trait RespValue extends Any {
......@@ -117,8 +117,8 @@ object RespValue {
IntDeserializer.flatMap {
case size if size >= 0 =>
for {
bytes <- sinkTake[Byte](size.toInt)
_ <- sinkTake[Byte](2) // crlf terminator
bytes <- Sink.take[Byte](size.toInt)
_ <- Sink.take[Byte](2)
} yield BulkString(bytes)
case -1 =>
Sink.succeed(NullValue)
......@@ -140,7 +140,7 @@ object RespValue {
}
val Deserializer: Sink[RedisError.ProtocolError, Byte, Byte, RespValue] =
sinkTake[Byte](1).flatMap { header =>
Sink.take[Byte](1).flatMap { header =>
header.head match {
case Header.simpleString => SimpleStringDeserializer.map(SimpleString)
case Header.error => SimpleStringDeserializer.map(Error)
......@@ -159,32 +159,4 @@ object RespValue {
case _ => None
}
}
/**
* Fixed version of `ZSink.take`.
*
* This will be removed once https://github.com/zio/zio/pull/4342 is available.
*/
private def sinkTake[I](n: Int): ZSink[Any, Nothing, I, I, Chunk[I]] =
ZSink {
for {
state <- Ref.make[Chunk[I]](Chunk.empty).toManaged_
push = (is: Option[Chunk[I]]) =>
state.get.flatMap { take =>
is match {
case Some(ch) =>
val idx = n - take.length
if (idx <= ch.length) {
val (chunk, leftover) = ch.splitAt(idx)
state.set(Chunk.empty) *> ZSink.Push.emit(take ++ chunk, leftover)
} else
state.set(take ++ ch) *> ZSink.Push.more
case None =>
if (n >= 0) ZSink.Push.emit(take, Chunk.empty)
else ZSink.Push.emit(Chunk.empty, take)
}
}
} yield push
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册