SingleNodeExecutor.scala 3.7 KB
Newer Older
A
Anatoly Sergeev 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright 2021 John A. De Goes and the ZIO contributors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package zio.redis

import zio._
import zio.redis.SingleNodeExecutor._
21
import zio.redis.internal.{RedisConnection, RespCommand, RespValue}
A
Anatoly Sergeev 已提交
22

23 24 25 26
final class SingleNodeExecutor private (
  connection: RedisConnection,
  requests: Queue[Request],
  responses: Queue[Promise[RedisError, RespValue]]
A
Anatoly Sergeev 已提交
27 28 29
) extends RedisExecutor {

  // TODO NodeExecutor doesn't throw connection errors, timeout errors, it is hanging forever
30
  def execute(command: RespCommand): IO[RedisError, RespValue] =
A
Anatoly Sergeev 已提交
31 32
    Promise
      .make[RedisError, RespValue]
33
      .flatMap(promise => requests.offer(Request(command.args.map(_.value), promise)) *> promise.await)
A
Anatoly Sergeev 已提交
34 35 36 37 38

  /**
   * Opens a connection to the server and launches send and receive operations. All failures are retried by opening a
   * new connection. Only exits by interruption or defect.
   */
39
  private val run: IO[RedisError, AnyVal] =
A
Anatoly Sergeev 已提交
40
    ZIO.logTrace(s"$this Executable sender and reader has been started") *>
A
Anatoly Sergeev 已提交
41
      (send.repeat[Any, Long](Schedule.forever) race receive)
A
Anatoly Sergeev 已提交
42 43 44 45
        .tapError(e => ZIO.logWarning(s"Reconnecting due to error: $e") *> drainWith(e))
        .retryWhile(True)
        .tapError(e => ZIO.logError(s"Executor exiting: $e"))

46
  private def drainWith(e: RedisError): UIO[Unit] = responses.takeAll.flatMap(ZIO.foreachDiscard(_)(_.fail(e)))
A
Anatoly Sergeev 已提交
47 48

  private def send: IO[RedisError.IOError, Option[Unit]] =
D
Dejan Mijić 已提交
49 50 51
    requests.takeBetween(1, RequestQueueSize).flatMap { requests =>
      val bytes =
        requests
52
          .foldLeft(new ChunkBuilder.Byte())((buffer, req) => buffer ++= RespValue.Array(req.command).asBytes)
D
Dejan Mijić 已提交
53
          .result()
A
Anatoly Sergeev 已提交
54 55 56 57 58

      connection
        .write(bytes)
        .mapError(RedisError.IOError(_))
        .tapBoth(
D
Dejan Mijić 已提交
59 60
          e => ZIO.foreachDiscard(requests)(_.promise.fail(e)),
          _ => ZIO.foreachDiscard(requests)(req => responses.offer(req.promise))
A
Anatoly Sergeev 已提交
61 62 63 64 65 66
        )
    }

  private def receive: IO[RedisError, Unit] =
    connection.read
      .mapError(RedisError.IOError(_))
67
      .via(RespValue.Decoder)
A
Anatoly Sergeev 已提交
68
      .collectSome
69
      .foreach(response => responses.take.flatMap(_.succeed(response)))
A
Anatoly Sergeev 已提交
70 71 72 73

}

object SingleNodeExecutor {
74 75
  lazy val layer: ZLayer[RedisConfig, RedisError.IOError, RedisExecutor] =
    RedisConnection.layer >>> makeLayer
A
Anatoly Sergeev 已提交
76

77 78
  lazy val local: ZLayer[Any, RedisError.IOError, RedisExecutor] =
    RedisConnection.local >>> makeLayer
A
Anatoly Sergeev 已提交
79

80
  private final case class Request(command: Chunk[RespValue.BulkString], promise: Promise[RedisError, RespValue])
A
Anatoly Sergeev 已提交
81 82 83 84 85 86 87

  private final val True: Any => Boolean = _ => true

  private final val RequestQueueSize = 16

  private[redis] def create(connection: RedisConnection): URIO[Scope, SingleNodeExecutor] =
    for {
88 89 90 91 92
      requests  <- Queue.bounded[Request](RequestQueueSize)
      responses <- Queue.unbounded[Promise[RedisError, RespValue]]
      executor   = new SingleNodeExecutor(connection, requests, responses)
      _         <- executor.run.forkScoped
      _         <- logScopeFinalizer(s"$executor Node Executor is closed")
A
Anatoly Sergeev 已提交
93 94
    } yield executor

95 96
  private def makeLayer: ZLayer[RedisConnection, RedisError.IOError, RedisExecutor] =
    ZLayer.scoped(ZIO.serviceWithZIO[RedisConnection](create))
A
Anatoly Sergeev 已提交
97
}