...
 
Commits (7)
    https://gitcode.net/qq_34446485/zio-redis/-/commit/15858bba8b6aa213ab42ca57f8a1fb2b65e93627 Update zio-schema, zio-schema-protobuf to 0.4.10 (#795) 2023-03-31T06:17:45+00:00 Scala Steward 43047562+scala-steward@users.noreply.github.com https://gitcode.net/qq_34446485/zio-redis/-/commit/fb1e84589f635e96d41284eb42fa5273287c2a94 Update zio, zio-json to 3.8.14 (#793) 2023-03-31T08:11:19+00:00 Scala Steward 43047562+scala-steward@users.noreply.github.com https://gitcode.net/qq_34446485/zio-redis/-/commit/a993b0b51ba2b8b6fb88f4f9d2461f5ed49dda5a Update redis4cats-effects to 1.4.1 (#794) 2023-03-31T08:38:19+00:00 Scala Steward 43047562+scala-steward@users.noreply.github.com https://gitcode.net/qq_34446485/zio-redis/-/commit/266abfb38e5216fbd74ce84cd8f0af37f40ff927 Remote UTF8 decoder (#796) 2023-03-31T15:23:45+02:00 Dejan Mijić dmijic@acm.org https://gitcode.net/qq_34446485/zio-redis/-/commit/940ce275828c0f985f0bdb0f1afc7ed5c2498449 Use ZIO option constructors (#797) 2023-03-31T16:13:40+02:00 Dejan Mijić dmijic@acm.org https://gitcode.net/qq_34446485/zio-redis/-/commit/dabc405ab4b2d19a828ae94c8a93cb21467be2b8 Use Chunk.foldLeft (#798) 2023-03-31T18:05:12+02:00 Dejan Mijić dmijic@acm.org https://gitcode.net/qq_34446485/zio-redis/-/commit/08b69e8dbaf6cdf60e5de44b4f516fce04e9787c Use specialized chunk builders (#799) 2023-03-31T21:57:12+02:00 Dejan Mijić dmijic@acm.org
......@@ -85,7 +85,7 @@ lazy val benchmarks =
crossScalaVersions -= Scala3,
publish / skip := true,
libraryDependencies ++= List(
"dev.profunktor" %% "redis4cats-effects" % "1.4.0",
"dev.profunktor" %% "redis4cats-effects" % "1.4.1",
"io.chrisdavenport" %% "rediculous" % "0.4.0",
"io.laserdisc" %% "laserdisc-fs2" % "0.6.0",
"dev.zio" %% "zio-schema-protobuf" % zioSchemaVersion
......@@ -100,8 +100,8 @@ lazy val example =
.settings(
publish / skip := true,
libraryDependencies ++= List(
"com.softwaremill.sttp.client3" %% "zio" % "3.8.13",
"com.softwaremill.sttp.client3" %% "zio-json" % "3.8.13",
"com.softwaremill.sttp.client3" %% "zio" % "3.8.14",
"com.softwaremill.sttp.client3" %% "zio-json" % "3.8.14",
"dev.zio" %% "zio-streams" % zioVersion,
"dev.zio" %% "zio-config-magnolia" % "3.0.7",
"dev.zio" %% "zio-config-typesafe" % "3.0.7",
......
......@@ -22,6 +22,8 @@ import zio.redis.options.Cluster.{Node, Partition, SlotRange}
import zio.schema.Schema
import zio.schema.codec.BinaryCodec
import java.nio.charset.StandardCharsets
sealed trait Output[+A] { self =>
protected def tryDecode(respValue: RespValue): A
......@@ -718,7 +720,7 @@ object Output {
}
private def decodeDouble(bytes: Chunk[Byte]): Double = {
val text = RespValue.decode(bytes)
val text = new String(bytes.toArray, StandardCharsets.UTF_8)
try text.toDouble
catch {
case _: NumberFormatException => throw ProtocolError(s"'$text' isn't a double.")
......
......@@ -46,23 +46,18 @@ final class SingleNodeExecutor private (
private def drainWith(e: RedisError): UIO[Unit] = responses.takeAll.flatMap(ZIO.foreachDiscard(_)(_.fail(e)))
private def send: IO[RedisError.IOError, Option[Unit]] =
requests.takeBetween(1, RequestQueueSize).flatMap { reqs =>
val buffer = ChunkBuilder.make[Byte]()
val it = reqs.iterator
while (it.hasNext) {
val req = it.next()
buffer ++= RespValue.Array(req.command).serialize
}
val bytes = buffer.result()
requests.takeBetween(1, RequestQueueSize).flatMap { requests =>
val bytes =
requests
.foldLeft(new ChunkBuilder.Byte())((buffer, req) => buffer ++= RespValue.Array(req.command).asBytes)
.result()
connection
.write(bytes)
.mapError(RedisError.IOError(_))
.tapBoth(
e => ZIO.foreachDiscard(reqs)(_.promise.fail(e)),
_ => ZIO.foreachDiscard(reqs)(req => responses.offer(req.promise))
e => ZIO.foreachDiscard(requests)(_.promise.fail(e)),
_ => ZIO.foreachDiscard(requests)(req => responses.offer(req.promise))
)
}
......
......@@ -27,24 +27,61 @@ private[redis] sealed trait RespValue extends Product with Serializable { self =
import RespValue._
import RespValue.internal.{CrLf, Headers, NullArrayEncoded, NullStringEncoded}
final def serialize: Chunk[Byte] =
final def asBytes: Chunk[Byte] =
self match {
case NullBulkString => NullStringEncoded
case NullArray => NullArrayEncoded
case SimpleString(s) => Headers.SimpleString +: encode(s)
case Error(s) => Headers.Error +: encode(s)
case Integer(i) => Headers.Integer +: encode(i.toString)
case NullBulkString => NullStringEncoded
case NullArray => NullArrayEncoded
case SimpleString(s) =>
val builder = new ChunkBuilder.Byte()
builder += Headers.SimpleString
builder ++= encode(s)
builder ++= CrLf
builder.result()
case Error(s) =>
val builder = new ChunkBuilder.Byte()
builder += Headers.Error
builder ++= encode(s)
builder ++= CrLf
builder.result()
case Integer(i) =>
val builder = new ChunkBuilder.Byte()
builder += Headers.Integer
builder ++= encode(i.toString)
builder ++= CrLf
builder.result()
case BulkString(bytes) =>
Headers.BulkString +: (encode(bytes.length.toString) ++ bytes ++ CrLf)
val builder = new ChunkBuilder.Byte()
builder += Headers.BulkString
builder ++= encode(bytes.length.toString)
builder ++= CrLf
builder ++= bytes
builder ++= CrLf
builder.result()
case Array(elements) =>
val data = elements.foldLeft[Chunk[Byte]](Chunk.empty)(_ ++ _.serialize)
Headers.Array +: (encode(elements.size.toString) ++ data)
val builder = new ChunkBuilder.Byte()
builder += Headers.Array
builder ++= encode(elements.size.toString)
builder ++= CrLf
elements.foreach(builder ++= _.asBytes)
builder.result()
}
private[this] def encode(s: String): Chunk[Byte] =
Chunk.fromArray(s.getBytes(StandardCharsets.US_ASCII)) ++ CrLf
private[this] def encode(s: String) = s.getBytes(StandardCharsets.US_ASCII)
}
private[redis] object RespValue {
......@@ -72,9 +109,9 @@ private[redis] object RespValue {
final case class Integer(value: Long) extends RespValue
final case class BulkString(value: Chunk[Byte]) extends RespValue {
def asLong: Long = internal.unsafeReadLong(asString, 0)
def asLong: Long = internal.unsafeReadLong(value, 0)
def asString: String = decode(value)
def asString: String = internal.decode(value)
}
final case class Array(values: Chunk[RespValue]) extends RespValue
......@@ -96,24 +133,20 @@ private[redis] object RespValue {
// ZSink fold will return a State.Start when contFn is false
val lineProcessor =
ZSink.fold[String, State](State.Start)(_.inProgress)(_ feed _).mapZIO {
case State.Done(value) => ZIO.succeed(Some(value))
ZSink.foldChunks[Byte, State](State.Start)(_.inProgress)(_ feed _).mapZIO {
case State.Done(value) => ZIO.some(value)
case State.Failed => ZIO.fail(RedisError.ProtocolError("Invalid data received."))
case State.Start => ZIO.succeed(None)
case State.Start => ZIO.none
case other => ZIO.dieMessage(s"Deserialization bug, should not get $other")
}
(ZPipeline.utf8Decode >>> ZPipeline.splitOn(internal.CrLfString))
.mapError(e => RedisError.ProtocolError(e.getLocalizedMessage))
.andThen(ZPipeline.fromSink(lineProcessor))
ZPipeline.splitOnChunk(internal.CrLf) >>> ZPipeline.fromSink(lineProcessor)
}
def array(values: RespValue*): Array = Array(Chunk.fromIterable(values))
def bulkString(s: String): BulkString = BulkString(Chunk.fromArray(s.getBytes(StandardCharsets.UTF_8)))
def decode(bytes: Chunk[Byte]): String = new String(bytes.toArray, StandardCharsets.UTF_8)
private object internal {
object Headers {
final val SimpleString: Byte = '+'
......@@ -124,11 +157,10 @@ private[redis] object RespValue {
}
final val CrLf: Chunk[Byte] = Chunk('\r', '\n')
final val CrLfString: String = "\r\n"
final val NullArrayEncoded: Chunk[Byte] = Chunk.fromArray("*-1\r\n".getBytes(StandardCharsets.US_ASCII))
final val NullArrayPrefix: String = "*-1"
final val NullStringEncoded: Chunk[Byte] = Chunk.fromArray("$-1\r\n".getBytes(StandardCharsets.US_ASCII))
final val NullStringPrefix: String = "$-1"
final val NullArrayEncoded: Chunk[Byte] = Chunk('*', '-', '1', '\r', '\n')
final val NullArrayPrefix: Chunk[Byte] = Chunk('*', '-', '1')
final val NullStringEncoded: Chunk[Byte] = Chunk('$', '-', '1', '\r', '\n')
final val NullStringPrefix: Chunk[Byte] = Chunk('$', '-', '1')
sealed trait State { self =>
import State._
......@@ -139,22 +171,22 @@ private[redis] object RespValue {
case _ => true
}
final def feed(line: String): State =
final def feed(bytes: Chunk[Byte]): State =
self match {
case Start if line.isEmpty() => Start
case Start if line == NullStringPrefix => Done(NullBulkString)
case Start if line == NullArrayPrefix => Done(NullArray)
case Start if line.nonEmpty =>
line.head match {
case Headers.SimpleString => Done(SimpleString(line.tail))
case Headers.Error => Done(Error(line.tail))
case Headers.Integer => Done(Integer(unsafeReadLong(line, 1)))
case Start if bytes.isEmpty => Start
case Start if bytes == NullStringPrefix => Done(NullBulkString)
case Start if bytes == NullArrayPrefix => Done(NullArray)
case Start if bytes.nonEmpty =>
bytes.head match {
case Headers.SimpleString => Done(SimpleString(decode(bytes.tail)))
case Headers.Error => Done(Error(decode(bytes.tail)))
case Headers.Integer => Done(Integer(unsafeReadLong(bytes, 1)))
case Headers.BulkString =>
val size = unsafeReadLong(line, 1).toInt
CollectingBulkString(size, new StringBuilder(size))
val size = unsafeReadSize(bytes)
CollectingBulkString(size, ChunkBuilder.make(size))
case Headers.Array =>
val size = unsafeReadLong(line, 1).toInt
val size = unsafeReadSize(bytes)
if (size > 0)
CollectingArray(size, ChunkBuilder.make(size), Start.feed)
......@@ -165,18 +197,20 @@ private[redis] object RespValue {
}
case CollectingArray(rem, vals, next) =>
next(line) match {
next(bytes) match {
case Done(v) if rem > 1 => CollectingArray(rem - 1, vals += v, Start.feed)
case Done(v) => Done(Array((vals += v).result()))
case state => CollectingArray(rem, vals, state.feed)
}
case CollectingBulkString(rem, vals) =>
if (line.length >= rem) {
val stringValue = vals.append(line.substring(0, rem)).toString
Done(BulkString(Chunk.fromArray(stringValue.getBytes(StandardCharsets.UTF_8))))
if (bytes.length >= rem) {
vals ++= bytes.take(rem)
Done(BulkString(vals.result()))
} else {
CollectingBulkString(rem - line.length - 2, vals.append(line).append(CrLfString))
vals ++= bytes
vals ++= CrLf
CollectingBulkString(rem - bytes.length - 2, vals)
}
case _ => Failed
......@@ -184,31 +218,50 @@ private[redis] object RespValue {
}
object State {
case object Start extends State
case object Failed extends State
final case class CollectingArray(rem: Int, vals: ChunkBuilder[RespValue], next: String => State) extends State
final case class CollectingBulkString(rem: Int, vals: StringBuilder) extends State
final case class Done(value: RespValue) extends State
case object Start extends State
case object Failed extends State
final case class CollectingArray(rem: Int, vals: ChunkBuilder[RespValue], next: Chunk[Byte] => State)
extends State
final case class CollectingBulkString(rem: Int, vals: ChunkBuilder[Byte]) extends State
final case class Done(value: RespValue) extends State
}
def unsafeReadLong(text: String, startFrom: Int): Long = {
def decode(bytes: Chunk[Byte]): String = new String(bytes.toArray, StandardCharsets.UTF_8)
def unsafeReadLong(bytes: Chunk[Byte], startFrom: Int): Long = {
var pos = startFrom
var res = 0L
var neg = false
if (text.charAt(pos) == '-') {
if (bytes(pos) == '-') {
neg = true
pos += 1
}
val len = text.length
val len = bytes.length
while (pos < len) {
res = res * 10 + text.charAt(pos) - '0'
res = res * 10 + bytes(pos) - '0'
pos += 1
}
if (neg) -res else res
}
def unsafeReadSize(bytes: Chunk[Byte]): Int = {
var pos = 1
var res = 0
val len = bytes.length
while (pos < len) {
res = res * 10 + bytes(pos) - '0'
pos += 1
}
res
}
}
}
......@@ -2,47 +2,35 @@ package zio.redis.internal
import zio.Chunk
import zio.redis._
import zio.stream.ZStream
import zio.test.Assertion._
import zio.test._
import java.nio.charset.StandardCharsets
object RespValueSpec extends BaseSpec {
def spec: Spec[Any, RedisError.ProtocolError] =
suite("RespValue")(
suite("serialization")(
test("array") {
val expected = Chunk.fromArray("*3\r\n$3\r\nabc\r\n:123\r\n$-1\r\n".getBytes(StandardCharsets.UTF_8))
val v = RespValue.array(RespValue.bulkString("abc"), RespValue.Integer(123), RespValue.NullBulkString)
assert(v.serialize)(equalTo(expected))
}
),
suite("deserialization")(
test("array") {
val values = Chunk(
RespValue.SimpleString("OK"),
test("serializes and deserializes messages") {
val values = Chunk(
RespValue.SimpleString("OK"),
RespValue.bulkString("test1"),
RespValue.array(
RespValue.bulkString("test1"),
RespValue.array(
RespValue.bulkString("test1"),
RespValue.Integer(42L),
RespValue.NullBulkString,
RespValue.array(RespValue.SimpleString("a"), RespValue.Integer(0L)),
RespValue.bulkString("in array"),
RespValue.SimpleString("test2")
),
RespValue.NullBulkString
)
RespValue.Integer(42L),
RespValue.NullBulkString,
RespValue.array(RespValue.SimpleString("a"), RespValue.Integer(0L)),
RespValue.bulkString("in array"),
RespValue.SimpleString("test2")
),
RespValue.NullBulkString
)
zio.stream.ZStream
.fromChunk(values)
.mapConcat(_.serialize)
.via(RespValue.Decoder)
.collect { case Some(value) =>
value
}
.runCollect
.map(assert(_)(equalTo(values)))
}
)
ZStream
.fromChunk(values)
.mapConcat(_.asBytes)
.via(RespValue.Decoder)
.collectSome
.runCollect
.map(assert(_)(equalTo(values)))
}
)
}
......@@ -31,7 +31,7 @@ object BuildHelper {
val Scala3 = versions("3")
val zioVersion = "2.0.10"
val zioSchemaVersion = "0.4.9"
val zioSchemaVersion = "0.4.10"
def buildInfoSettings(packageName: String) =
List(
......