Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
梦境迷离
Zio Redis
比较版本
af2db87382ff732066747eb22cc2dbefdc08cc75...08b69e8dbaf6cdf60e5de44b4f516fce04e9787c
Z
Zio Redis
项目概览
梦境迷离
/
Zio Redis
9 个月 前同步成功
通知
4
Star
2
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
Z
Zio Redis
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
源分支
08b69e8dbaf6cdf60e5de44b4f516fce04e9787c
选择Git版本
...
目标分支
af2db87382ff732066747eb22cc2dbefdc08cc75
选择Git版本
比较
Commits (7)
https://gitcode.net/qq_34446485/zio-redis/-/commit/15858bba8b6aa213ab42ca57f8a1fb2b65e93627
Update zio-schema, zio-schema-protobuf to 0.4.10 (#795)
2023-03-31T06:17:45+00:00
Scala Steward
43047562+scala-steward@users.noreply.github.com
https://gitcode.net/qq_34446485/zio-redis/-/commit/fb1e84589f635e96d41284eb42fa5273287c2a94
Update zio, zio-json to 3.8.14 (#793)
2023-03-31T08:11:19+00:00
Scala Steward
43047562+scala-steward@users.noreply.github.com
https://gitcode.net/qq_34446485/zio-redis/-/commit/a993b0b51ba2b8b6fb88f4f9d2461f5ed49dda5a
Update redis4cats-effects to 1.4.1 (#794)
2023-03-31T08:38:19+00:00
Scala Steward
43047562+scala-steward@users.noreply.github.com
https://gitcode.net/qq_34446485/zio-redis/-/commit/266abfb38e5216fbd74ce84cd8f0af37f40ff927
Remote UTF8 decoder (#796)
2023-03-31T15:23:45+02:00
Dejan Mijić
dmijic@acm.org
https://gitcode.net/qq_34446485/zio-redis/-/commit/940ce275828c0f985f0bdb0f1afc7ed5c2498449
Use ZIO option constructors (#797)
2023-03-31T16:13:40+02:00
Dejan Mijić
dmijic@acm.org
https://gitcode.net/qq_34446485/zio-redis/-/commit/dabc405ab4b2d19a828ae94c8a93cb21467be2b8
Use Chunk.foldLeft (#798)
2023-03-31T18:05:12+02:00
Dejan Mijić
dmijic@acm.org
https://gitcode.net/qq_34446485/zio-redis/-/commit/08b69e8dbaf6cdf60e5de44b4f516fce04e9787c
Use specialized chunk builders (#799)
2023-03-31T21:57:12+02:00
Dejan Mijić
dmijic@acm.org
隐藏空白更改
内联
并排
Showing
6 changed file
with
142 addition
and
104 deletion
+142
-104
build.sbt
build.sbt
+3
-3
modules/redis/src/main/scala/zio/redis/Output.scala
modules/redis/src/main/scala/zio/redis/Output.scala
+3
-1
modules/redis/src/main/scala/zio/redis/SingleNodeExecutor.scala
...s/redis/src/main/scala/zio/redis/SingleNodeExecutor.scala
+7
-12
modules/redis/src/main/scala/zio/redis/internal/RespValue.scala
...s/redis/src/main/scala/zio/redis/internal/RespValue.scala
+106
-53
modules/redis/src/test/scala/zio/redis/internal/RespValueSpec.scala
...dis/src/test/scala/zio/redis/internal/RespValueSpec.scala
+22
-34
project/BuildHelper.scala
project/BuildHelper.scala
+1
-1
未找到文件。
build.sbt
浏览文件 @
08b69e8d
...
...
@@ -85,7 +85,7 @@ lazy val benchmarks =
crossScalaVersions
-=
Scala3
,
publish
/
skip
:=
true
,
libraryDependencies
++=
List
(
"dev.profunktor"
%%
"redis4cats-effects"
%
"1.4.
0
"
,
"dev.profunktor"
%%
"redis4cats-effects"
%
"1.4.
1
"
,
"io.chrisdavenport"
%%
"rediculous"
%
"0.4.0"
,
"io.laserdisc"
%%
"laserdisc-fs2"
%
"0.6.0"
,
"dev.zio"
%%
"zio-schema-protobuf"
%
zioSchemaVersion
...
...
@@ -100,8 +100,8 @@ lazy val example =
.
settings
(
publish
/
skip
:=
true
,
libraryDependencies
++=
List
(
"com.softwaremill.sttp.client3"
%%
"zio"
%
"3.8.1
3
"
,
"com.softwaremill.sttp.client3"
%%
"zio-json"
%
"3.8.1
3
"
,
"com.softwaremill.sttp.client3"
%%
"zio"
%
"3.8.1
4
"
,
"com.softwaremill.sttp.client3"
%%
"zio-json"
%
"3.8.1
4
"
,
"dev.zio"
%%
"zio-streams"
%
zioVersion
,
"dev.zio"
%%
"zio-config-magnolia"
%
"3.0.7"
,
"dev.zio"
%%
"zio-config-typesafe"
%
"3.0.7"
,
...
...
modules/redis/src/main/scala/zio/redis/Output.scala
浏览文件 @
08b69e8d
...
...
@@ -22,6 +22,8 @@ import zio.redis.options.Cluster.{Node, Partition, SlotRange}
import
zio.schema.Schema
import
zio.schema.codec.BinaryCodec
import
java.nio.charset.StandardCharsets
sealed
trait
Output
[
+A
]
{
self
=>
protected
def
tryDecode
(
respValue
:
RespValue
)
:
A
...
...
@@ -718,7 +720,7 @@ object Output {
}
private
def
decodeDouble
(
bytes
:
Chunk
[
Byte
])
:
Double
=
{
val
text
=
RespValue
.
decode
(
bytes
)
val
text
=
new
String
(
bytes
.
toArray
,
StandardCharsets
.
UTF_8
)
try
text
.
toDouble
catch
{
case
_:
NumberFormatException
=>
throw
ProtocolError
(
s
"'$text' isn't a double."
)
...
...
modules/redis/src/main/scala/zio/redis/SingleNodeExecutor.scala
浏览文件 @
08b69e8d
...
...
@@ -46,23 +46,18 @@ final class SingleNodeExecutor private (
private
def
drainWith
(
e
:
RedisError
)
:
UIO
[
Unit
]
=
responses
.
takeAll
.
flatMap
(
ZIO
.
foreachDiscard
(
_
)(
_
.
fail
(
e
)))
private
def
send
:
IO
[
RedisError.IOError
,
Option
[
Unit
]]
=
requests
.
takeBetween
(
1
,
RequestQueueSize
).
flatMap
{
reqs
=>
val
buffer
=
ChunkBuilder
.
make
[
Byte
]()
val
it
=
reqs
.
iterator
while
(
it
.
hasNext
)
{
val
req
=
it
.
next
()
buffer
++=
RespValue
.
Array
(
req
.
command
).
serialize
}
val
bytes
=
buffer
.
result
()
requests
.
takeBetween
(
1
,
RequestQueueSize
).
flatMap
{
requests
=>
val
bytes
=
requests
.
foldLeft
(
new
ChunkBuilder
.
Byte
())((
buffer
,
req
)
=>
buffer
++=
RespValue
.
Array
(
req
.
command
).
asBytes
)
.
result
()
connection
.
write
(
bytes
)
.
mapError
(
RedisError
.
IOError
(
_
))
.
tapBoth
(
e
=>
ZIO
.
foreachDiscard
(
reqs
)(
_
.
promise
.
fail
(
e
)),
_
=>
ZIO
.
foreachDiscard
(
reqs
)(
req
=>
responses
.
offer
(
req
.
promise
))
e
=>
ZIO
.
foreachDiscard
(
req
uest
s
)(
_
.
promise
.
fail
(
e
)),
_
=>
ZIO
.
foreachDiscard
(
req
uest
s
)(
req
=>
responses
.
offer
(
req
.
promise
))
)
}
...
...
modules/redis/src/main/scala/zio/redis/internal/RespValue.scala
浏览文件 @
08b69e8d
...
...
@@ -27,24 +27,61 @@ private[redis] sealed trait RespValue extends Product with Serializable { self =
import
RespValue._
import
RespValue.internal.
{
CrLf
,
Headers
,
NullArrayEncoded
,
NullStringEncoded
}
final
def
serialize
:
Chunk
[
Byte
]
=
final
def
asBytes
:
Chunk
[
Byte
]
=
self
match
{
case
NullBulkString
=>
NullStringEncoded
case
NullArray
=>
NullArrayEncoded
case
SimpleString
(
s
)
=>
Headers
.
SimpleString
+:
encode
(
s
)
case
Error
(
s
)
=>
Headers
.
Error
+:
encode
(
s
)
case
Integer
(
i
)
=>
Headers
.
Integer
+:
encode
(
i
.
toString
)
case
NullBulkString
=>
NullStringEncoded
case
NullArray
=>
NullArrayEncoded
case
SimpleString
(
s
)
=>
val
builder
=
new
ChunkBuilder
.
Byte
()
builder
+=
Headers
.
SimpleString
builder
++=
encode
(
s
)
builder
++=
CrLf
builder
.
result
()
case
Error
(
s
)
=>
val
builder
=
new
ChunkBuilder
.
Byte
()
builder
+=
Headers
.
Error
builder
++=
encode
(
s
)
builder
++=
CrLf
builder
.
result
()
case
Integer
(
i
)
=>
val
builder
=
new
ChunkBuilder
.
Byte
()
builder
+=
Headers
.
Integer
builder
++=
encode
(
i
.
toString
)
builder
++=
CrLf
builder
.
result
()
case
BulkString
(
bytes
)
=>
Headers
.
BulkString
+:
(
encode
(
bytes
.
length
.
toString
)
++
bytes
++
CrLf
)
val
builder
=
new
ChunkBuilder
.
Byte
()
builder
+=
Headers
.
BulkString
builder
++=
encode
(
bytes
.
length
.
toString
)
builder
++=
CrLf
builder
++=
bytes
builder
++=
CrLf
builder
.
result
()
case
Array
(
elements
)
=>
val
data
=
elements
.
foldLeft
[
Chunk
[
Byte
]](
Chunk
.
empty
)(
_
++
_
.
serialize
)
Headers
.
Array
+:
(
encode
(
elements
.
size
.
toString
)
++
data
)
val
builder
=
new
ChunkBuilder
.
Byte
()
builder
+=
Headers
.
Array
builder
++=
encode
(
elements
.
size
.
toString
)
builder
++=
CrLf
elements
.
foreach
(
builder
++=
_
.
asBytes
)
builder
.
result
()
}
private
[
this
]
def
encode
(
s
:
String
)
:
Chunk
[
Byte
]
=
Chunk
.
fromArray
(
s
.
getBytes
(
StandardCharsets
.
US_ASCII
))
++
CrLf
private
[
this
]
def
encode
(
s
:
String
)
=
s
.
getBytes
(
StandardCharsets
.
US_ASCII
)
}
private
[
redis
]
object
RespValue
{
...
...
@@ -72,9 +109,9 @@ private[redis] object RespValue {
final
case
class
Integer
(
value
:
Long
)
extends
RespValue
final
case
class
BulkString
(
value
:
Chunk
[
Byte
])
extends
RespValue
{
def
asLong
:
Long
=
internal
.
unsafeReadLong
(
asString
,
0
)
def
asLong
:
Long
=
internal
.
unsafeReadLong
(
value
,
0
)
def
asString
:
String
=
decode
(
value
)
def
asString
:
String
=
internal
.
decode
(
value
)
}
final
case
class
Array
(
values
:
Chunk
[
RespValue
])
extends
RespValue
...
...
@@ -96,24 +133,20 @@ private[redis] object RespValue {
// ZSink fold will return a State.Start when contFn is false
val
lineProcessor
=
ZSink
.
fold
[
String
,
State
](
State
.
Start
)(
_
.
inProgress
)(
_
feed
_
).
mapZIO
{
case
State
.
Done
(
value
)
=>
ZIO
.
s
ucceed
(
Some
(
value
)
)
ZSink
.
fold
Chunks
[
Byte
,
State
](
State
.
Start
)(
_
.
inProgress
)(
_
feed
_
).
mapZIO
{
case
State
.
Done
(
value
)
=>
ZIO
.
s
ome
(
value
)
case
State
.
Failed
=>
ZIO
.
fail
(
RedisError
.
ProtocolError
(
"Invalid data received."
))
case
State
.
Start
=>
ZIO
.
succeed
(
None
)
case
State
.
Start
=>
ZIO
.
none
case
other
=>
ZIO
.
dieMessage
(
s
"Deserialization bug, should not get $other"
)
}
(
ZPipeline
.
utf8Decode
>>>
ZPipeline
.
splitOn
(
internal
.
CrLfString
))
.
mapError
(
e
=>
RedisError
.
ProtocolError
(
e
.
getLocalizedMessage
))
.
andThen
(
ZPipeline
.
fromSink
(
lineProcessor
))
ZPipeline
.
splitOnChunk
(
internal
.
CrLf
)
>>>
ZPipeline
.
fromSink
(
lineProcessor
)
}
def
array
(
values
:
RespValue*
)
:
Array
=
Array
(
Chunk
.
fromIterable
(
values
))
def
bulkString
(
s
:
String
)
:
BulkString
=
BulkString
(
Chunk
.
fromArray
(
s
.
getBytes
(
StandardCharsets
.
UTF_8
)))
def
decode
(
bytes
:
Chunk
[
Byte
])
:
String
=
new
String
(
bytes
.
toArray
,
StandardCharsets
.
UTF_8
)
private
object
internal
{
object
Headers
{
final
val
SimpleString
:
Byte
=
'+'
...
...
@@ -124,11 +157,10 @@ private[redis] object RespValue {
}
final
val
CrLf
:
Chunk
[
Byte
]
=
Chunk
(
'\r'
,
'\n'
)
final
val
CrLfString
:
String
=
"\r\n"
final
val
NullArrayEncoded
:
Chunk
[
Byte
]
=
Chunk
.
fromArray
(
"*-1\r\n"
.
getBytes
(
StandardCharsets
.
US_ASCII
))
final
val
NullArrayPrefix
:
String
=
"*-1"
final
val
NullStringEncoded
:
Chunk
[
Byte
]
=
Chunk
.
fromArray
(
"$-1\r\n"
.
getBytes
(
StandardCharsets
.
US_ASCII
))
final
val
NullStringPrefix
:
String
=
"$-1"
final
val
NullArrayEncoded
:
Chunk
[
Byte
]
=
Chunk
(
'*'
,
'-'
,
'1'
,
'\r'
,
'\n'
)
final
val
NullArrayPrefix
:
Chunk
[
Byte
]
=
Chunk
(
'*'
,
'-'
,
'1'
)
final
val
NullStringEncoded
:
Chunk
[
Byte
]
=
Chunk
(
'$'
,
'-'
,
'1'
,
'\r'
,
'\n'
)
final
val
NullStringPrefix
:
Chunk
[
Byte
]
=
Chunk
(
'$'
,
'-'
,
'1'
)
sealed
trait
State
{
self
=>
import
State._
...
...
@@ -139,22 +171,22 @@ private[redis] object RespValue {
case
_
=>
true
}
final
def
feed
(
line
:
String
)
:
State
=
final
def
feed
(
bytes
:
Chunk
[
Byte
]
)
:
State
=
self
match
{
case
Start
if
line
.
isEmpty
()
=>
Start
case
Start
if
line
==
NullStringPrefix
=>
Done
(
NullBulkString
)
case
Start
if
line
==
NullArrayPrefix
=>
Done
(
NullArray
)
case
Start
if
line
.
nonEmpty
=>
line
.
head
match
{
case
Headers
.
SimpleString
=>
Done
(
SimpleString
(
line
.
tail
))
case
Headers
.
Error
=>
Done
(
Error
(
line
.
tail
))
case
Headers
.
Integer
=>
Done
(
Integer
(
unsafeReadLong
(
line
,
1
)))
case
Start
if
bytes
.
isEmpty
=>
Start
case
Start
if
bytes
==
NullStringPrefix
=>
Done
(
NullBulkString
)
case
Start
if
bytes
==
NullArrayPrefix
=>
Done
(
NullArray
)
case
Start
if
bytes
.
nonEmpty
=>
bytes
.
head
match
{
case
Headers
.
SimpleString
=>
Done
(
SimpleString
(
decode
(
bytes
.
tail
)
))
case
Headers
.
Error
=>
Done
(
Error
(
decode
(
bytes
.
tail
)
))
case
Headers
.
Integer
=>
Done
(
Integer
(
unsafeReadLong
(
bytes
,
1
)))
case
Headers
.
BulkString
=>
val
size
=
unsafeRead
Long
(
line
,
1
).
toInt
CollectingBulkString
(
size
,
new
StringBuilder
(
size
))
val
size
=
unsafeRead
Size
(
bytes
)
CollectingBulkString
(
size
,
ChunkBuilder
.
make
(
size
))
case
Headers
.
Array
=>
val
size
=
unsafeRead
Long
(
line
,
1
).
toInt
val
size
=
unsafeRead
Size
(
bytes
)
if
(
size
>
0
)
CollectingArray
(
size
,
ChunkBuilder
.
make
(
size
),
Start
.
feed
)
...
...
@@ -165,18 +197,20 @@ private[redis] object RespValue {
}
case
CollectingArray
(
rem
,
vals
,
next
)
=>
next
(
line
)
match
{
next
(
bytes
)
match
{
case
Done
(
v
)
if
rem
>
1
=>
CollectingArray
(
rem
-
1
,
vals
+=
v
,
Start
.
feed
)
case
Done
(
v
)
=>
Done
(
Array
((
vals
+=
v
).
result
()))
case
state
=>
CollectingArray
(
rem
,
vals
,
state
.
feed
)
}
case
CollectingBulkString
(
rem
,
vals
)
=>
if
(
line
.
length
>=
rem
)
{
val
stringValue
=
vals
.
append
(
line
.
substring
(
0
,
rem
)).
toString
Done
(
BulkString
(
Chunk
.
fromArray
(
stringValue
.
getBytes
(
StandardCharsets
.
UTF_8
)
)))
if
(
bytes
.
length
>=
rem
)
{
val
s
++=
bytes
.
take
(
rem
)
Done
(
BulkString
(
vals
.
result
(
)))
}
else
{
CollectingBulkString
(
rem
-
line
.
length
-
2
,
vals
.
append
(
line
).
append
(
CrLfString
))
vals
++=
bytes
vals
++=
CrLf
CollectingBulkString
(
rem
-
bytes
.
length
-
2
,
vals
)
}
case
_
=>
Failed
...
...
@@ -184,31 +218,50 @@ private[redis] object RespValue {
}
object
State
{
case
object
Start
extends
State
case
object
Failed
extends
State
final
case
class
CollectingArray
(
rem
:
Int
,
vals
:
ChunkBuilder
[
RespValue
],
next
:
String
=>
State
)
extends
State
final
case
class
CollectingBulkString
(
rem
:
Int
,
vals
:
StringBuilder
)
extends
State
final
case
class
Done
(
value
:
RespValue
)
extends
State
case
object
Start
extends
State
case
object
Failed
extends
State
final
case
class
CollectingArray
(
rem
:
Int
,
vals
:
ChunkBuilder
[
RespValue
],
next
:
Chunk
[
Byte
]
=>
State
)
extends
State
final
case
class
CollectingBulkString
(
rem
:
Int
,
vals
:
ChunkBuilder
[
Byte
])
extends
State
final
case
class
Done
(
value
:
RespValue
)
extends
State
}
def
unsafeReadLong
(
text
:
String
,
startFrom
:
Int
)
:
Long
=
{
def
decode
(
bytes
:
Chunk
[
Byte
])
:
String
=
new
String
(
bytes
.
toArray
,
StandardCharsets
.
UTF_8
)
def
unsafeReadLong
(
bytes
:
Chunk
[
Byte
],
startFrom
:
Int
)
:
Long
=
{
var
pos
=
startFrom
var
res
=
0L
var
neg
=
false
if
(
text
.
charAt
(
pos
)
==
'-'
)
{
if
(
bytes
(
pos
)
==
'-'
)
{
neg
=
true
pos
+=
1
}
val
len
=
text
.
length
val
len
=
bytes
.
length
while
(
pos
<
len
)
{
res
=
res
*
10
+
text
.
charAt
(
pos
)
-
'0'
res
=
res
*
10
+
bytes
(
pos
)
-
'0'
pos
+=
1
}
if
(
neg
)
-
res
else
res
}
def
unsafeReadSize
(
bytes
:
Chunk
[
Byte
])
:
Int
=
{
var
pos
=
1
var
res
=
0
val
len
=
bytes
.
length
while
(
pos
<
len
)
{
res
=
res
*
10
+
bytes
(
pos
)
-
'0'
pos
+=
1
}
res
}
}
}
modules/redis/src/test/scala/zio/redis/internal/RespValueSpec.scala
浏览文件 @
08b69e8d
...
...
@@ -2,47 +2,35 @@ package zio.redis.internal
import
zio.Chunk
import
zio.redis._
import
zio.stream.ZStream
import
zio.test.Assertion._
import
zio.test._
import
java.nio.charset.StandardCharsets
object
RespValueSpec
extends
BaseSpec
{
def
spec
:
Spec
[
Any
,
RedisError.ProtocolError
]
=
suite
(
"RespValue"
)(
suite
(
"serialization"
)(
test
(
"array"
)
{
val
expected
=
Chunk
.
fromArray
(
"*3\r\n$3\r\nabc\r\n:123\r\n$-1\r\n"
.
getBytes
(
StandardCharsets
.
UTF_8
))
val
v
=
RespValue
.
array
(
RespValue
.
bulkString
(
"abc"
),
RespValue
.
Integer
(
123
),
RespValue
.
NullBulkString
)
assert
(
v
.
serialize
)(
equalTo
(
expected
))
}
),
suite
(
"deserialization"
)(
test
(
"array"
)
{
val
values
=
Chunk
(
RespValue
.
SimpleString
(
"OK"
),
test
(
"serializes and deserializes messages"
)
{
val
values
=
Chunk
(
RespValue
.
SimpleString
(
"OK"
),
RespValue
.
bulkString
(
"test1"
),
RespValue
.
array
(
RespValue
.
bulkString
(
"test1"
),
RespValue
.
array
(
RespValue
.
bulkString
(
"test1"
),
RespValue
.
Integer
(
42L
),
RespValue
.
NullBulkString
,
RespValue
.
array
(
RespValue
.
SimpleString
(
"a"
),
RespValue
.
Integer
(
0L
)),
RespValue
.
bulkString
(
"in array"
),
RespValue
.
SimpleString
(
"test2"
)
),
RespValue
.
NullBulkString
)
RespValue
.
Integer
(
42L
),
RespValue
.
NullBulkString
,
RespValue
.
array
(
RespValue
.
SimpleString
(
"a"
),
RespValue
.
Integer
(
0L
)),
RespValue
.
bulkString
(
"in array"
),
RespValue
.
SimpleString
(
"test2"
)
),
RespValue
.
NullBulkString
)
zio
.
stream
.
ZStream
.
fromChunk
(
values
)
.
mapConcat
(
_
.
serialize
)
.
via
(
RespValue
.
Decoder
)
.
collect
{
case
Some
(
value
)
=>
value
}
.
runCollect
.
map
(
assert
(
_
)(
equalTo
(
values
)))
}
)
ZStream
.
fromChunk
(
values
)
.
mapConcat
(
_
.
asBytes
)
.
via
(
RespValue
.
Decoder
)
.
collectSome
.
runCollect
.
map
(
assert
(
_
)(
equalTo
(
values
)))
}
)
}
project/BuildHelper.scala
浏览文件 @
08b69e8d
...
...
@@ -31,7 +31,7 @@ object BuildHelper {
val
Scala3
=
versions
(
"3"
)
val
zioVersion
=
"2.0.10"
val
zioSchemaVersion
=
"0.4.
9
"
val
zioSchemaVersion
=
"0.4.
10
"
def
buildInfoSettings
(
packageName
:
String
)
=
List
(
...
...