未验证 提交 49dcd103 编写于 作者: 梦境迷离's avatar 梦境迷离 提交者: GitHub

Add support for the XINFO command (#301)

上级 fce0b47d
......@@ -280,6 +280,21 @@ object Input {
Chunk(encodeString("DELCONSUMER"), encodeString(data.key), encodeString(data.group), encodeString(data.consumer))
}
case object XInfoGroupInput extends Input[XInfoCommand.Group] {
def encode(data: XInfoCommand.Group): Chunk[RespValue.BulkString] =
Chunk(encodeString("GROUPS"), encodeString(data.key))
}
case object XInfoStreamInput extends Input[XInfoCommand.Stream] {
def encode(data: XInfoCommand.Stream): Chunk[RespValue.BulkString] =
Chunk(encodeString("STREAM"), encodeString(data.key))
}
case object XInfoConsumerInput extends Input[XInfoCommand.Consumer] {
def encode(data: XInfoCommand.Consumer): Chunk[RespValue.BulkString] =
Chunk(encodeString("CONSUMERS"), encodeString(data.key), encodeString(data.group))
}
case object BlockInput extends Input[Duration] {
def encode(data: Duration): Chunk[RespValue.BulkString] =
Chunk(encodeString("BLOCK"), encodeString(data.toMillis.toString))
......
......@@ -292,21 +292,158 @@ object Output {
case object StreamOutput extends Output[Map[String, Map[String, String]]] {
protected def tryDecode(respValue: RespValue): Map[String, Map[String, String]] =
respValue match {
case RespValue.NullArray => Map.empty[String, Map[String, String]]
case RespValue.Array(entities) =>
val output = collection.mutable.Map.empty[String, Map[String, String]]
entities.foreach {
case RespValue.Array(Seq(id @ RespValue.BulkString(_), value)) =>
output += (id.asString -> KeyValueOutput.unsafeDecode(value))
case RespValue.NullArray => Map.empty[String, Map[String, String]]
case RespValue.Array(entities) => extractMapMap(entities)
case other => throw ProtocolError(s"$other isn't an array")
}
}
private def extractMapMap(entities: Chunk[RespValue]): Map[String, Map[String, String]] = {
val output = collection.mutable.Map.empty[String, Map[String, String]]
entities.foreach {
case RespValue.Array(Seq(id @ RespValue.BulkString(_), value)) =>
output += (id.asString -> KeyValueOutput.unsafeDecode(value))
case other =>
throw ProtocolError(s"$other isn't a valid array")
}
output.toMap
}
case object StreamGroupInfoOutput extends Output[Chunk[StreamGroupInfo]] {
override protected def tryDecode(respValue: RespValue): Chunk[StreamGroupInfo] =
respValue match {
case RespValue.NullArray => Chunk.empty
case RespValue.Array(messages) =>
messages.collect {
// Note that you should not rely on the fields exact position. see https://redis.io/commands/xinfo
case RespValue.Array(elements) if elements.length % 2 == 0 =>
var streamGroupInfo: StreamGroupInfo = StreamGroupInfo.empty
val len = elements.length
var pos = 0
while (pos < len) {
(elements(pos), elements(pos + 1)) match {
case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_)) =>
if (key.asString == XInfoFields.Name)
streamGroupInfo = streamGroupInfo.copy(name = Some(value.asString))
else if (key.asString == XInfoFields.LastDelivered)
streamGroupInfo = streamGroupInfo.copy(lastDeliveredId = Some(value.asString))
case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) =>
if (key.asString == XInfoFields.Pending)
streamGroupInfo = streamGroupInfo.copy(pending = Some(value.value))
else if (key.asString == XInfoFields.Consumers)
streamGroupInfo = streamGroupInfo.copy(consumers = Some(value.value))
case _ =>
}
pos += 2
}
streamGroupInfo
case array @ RespValue.Array(_) =>
throw ProtocolError(s"$array doesn't have an even number of elements")
case other =>
throw ProtocolError(s"$other isn't a valid array")
throw ProtocolError(s"$other isn't an array")
}
output.toMap
case other => throw ProtocolError(s"$other isn't an array")
case other =>
throw ProtocolError(s"$other isn't an array")
}
}
case object StreamConsumerInfoOutput extends Output[Chunk[StreamConsumerInfo]] {
override protected def tryDecode(respValue: RespValue): Chunk[StreamConsumerInfo] =
respValue match {
case RespValue.NullArray => Chunk.empty
case RespValue.Array(messages) =>
messages.collect {
// Note that you should not rely on the fields exact position. see https://redis.io/commands/xinfo
case RespValue.Array(elements) if elements.length % 2 == 0 =>
var streamConsumerInfo: StreamConsumerInfo = StreamConsumerInfo.empty
val len = elements.length
var pos = 0
while (pos < len) {
(elements(pos), elements(pos + 1)) match {
case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_))
if key.asString == XInfoFields.Name =>
streamConsumerInfo = streamConsumerInfo.copy(name = Some(value.asString))
case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) =>
if (key.asString == XInfoFields.Pending)
streamConsumerInfo = streamConsumerInfo.copy(pending = Some(value.value))
else if (key.asString == XInfoFields.Idle)
streamConsumerInfo = streamConsumerInfo.copy(idle = Some(value.value))
case _ =>
}
pos += 2
}
streamConsumerInfo
case array @ RespValue.Array(_) =>
throw ProtocolError(s"$array doesn't have an even number of elements")
case other =>
throw ProtocolError(s"$other isn't an array")
}
case other =>
throw ProtocolError(s"$other isn't an array")
}
}
case object StreamInfoOutput extends Output[StreamInfo] {
override protected def tryDecode(respValue: RespValue): StreamInfo = {
var streamInfo: StreamInfo = StreamInfo.empty
respValue match {
// Note that you should not rely on the fields exact position. see https://redis.io/commands/xinfo
case RespValue.Array(elements) if elements.length % 2 == 0 =>
val len = elements.length
var pos = 0
while (pos < len) {
(elements(pos), elements(pos + 1)) match {
case (key @ RespValue.BulkString(_), value @ RespValue.Integer(_)) =>
if (key.asString == XInfoFields.Length)
streamInfo = streamInfo.copy(length = Some(value.value))
else if (key.asString == XInfoFields.RadixTreeNodes)
streamInfo = streamInfo.copy(radixTreeNodes = Some(value.value))
else if (key.asString == XInfoFields.RadixTreeKeys)
streamInfo = streamInfo.copy(radixTreeKeys = Some(value.value))
else if (key.asString == XInfoFields.Groups)
streamInfo = streamInfo.copy(groups = Some(value.value))
case (key @ RespValue.BulkString(_), value @ RespValue.BulkString(_))
if key.asString == XInfoFields.LastGeneratedId =>
streamInfo = streamInfo.copy(lastGeneratedId = Some(value.asString))
case (key @ RespValue.BulkString(_), value @ RespValue.Array(_)) =>
if (key.asString == XInfoFields.FirstEntry)
streamInfo = streamInfo.copy(firstEntry = Some(extractStreamEntry(value)))
else if (key.asString == XInfoFields.LastEntry)
streamInfo = streamInfo.copy(lastEntry = Some(extractStreamEntry(value)))
case _ =>
}
pos += 2
}
streamInfo
case array @ RespValue.Array(_) =>
throw ProtocolError(s"$array doesn't have an even number of elements")
case other =>
throw ProtocolError(s"$other isn't an array")
}
}
}
private def extractStreamEntry(es: RespValue): StreamEntry = {
val entry = collection.mutable.Map.empty[String, String]
var entryId: String = ""
es match {
case RespValue.Array(entities) =>
entities.foreach {
case id @ RespValue.BulkString(_) => entryId = id.asString
case RespValue.ArrayValues(id @ RespValue.BulkString(_), value @ RespValue.BulkString(_)) =>
entry += (id.asString -> value.asString)
case other =>
throw ProtocolError(s"$other isn't a valid array")
}
case other =>
throw ProtocolError(s"$other isn't a valid array")
}
StreamEntry(id = entryId, entry.toMap)
}
case object XPendingOutput extends Output[PendingInfo] {
protected def tryDecode(respValue: RespValue): PendingInfo =
respValue match {
......
package zio.redis.api
import zio.duration._
import zio.redis.Input._
import zio.redis.Output._
import zio.redis.Input.{ XInfoConsumerInput, _ }
import zio.redis.Output.{ StreamGroupInfoOutput, _ }
import zio.redis._
import zio.{ Chunk, ZIO }
......@@ -39,6 +39,38 @@ trait Streams {
): ZIO[RedisExecutor, RedisError, String] =
XAdd.run((key, None, id, (pair, pairs.toList)))
/**
* An introspection command used in order to retrieve different information about the group.
*
* @param key ID of the stream
* @return List of consumer groups associated with the stream stored at the specified key.
*/
final def xInfoGroup(
key: String
): ZIO[RedisExecutor, RedisError, Chunk[StreamGroupInfo]] = XInfoGroups.run(XInfoCommand.Group(key))
/**
* An introspection command used in order to retrieve different information about the consumers.
*
* @param key ID of the stream
* @param group ID of the consumer group
* @return List of every consumer in a specific consumer group.
*/
final def xInfoConsumers(
key: String,
group: String
): ZIO[RedisExecutor, RedisError, Chunk[StreamConsumerInfo]] = XInfoConsumers.run(XInfoCommand.Consumer(key, group))
/**
* An introspection command used in order to retrieve different information about the stream.
*
* @param key ID of the stream
* @return General information about the stream stored at the specified key.
*/
final def xInfoStream(
key: String
): ZIO[RedisExecutor, RedisError, StreamInfo] = XInfoStream.run(XInfoCommand.Stream(key))
/**
* Appends the specified stream entry to the stream at the specified key while limiting the size of the stream.
*
......@@ -468,7 +500,14 @@ private object Streams {
final val XGroupDelConsumer: RedisCommand[XGroupCommand.DelConsumer, Long] =
RedisCommand("XGROUP", XGroupDelConsumerInput, LongOutput)
// TODO: implement XINFO command
final val XInfoGroups: RedisCommand[XInfoCommand.Group, Chunk[StreamGroupInfo]] =
RedisCommand("XINFO", XInfoGroupInput, StreamGroupInfoOutput)
final val XInfoStream: RedisCommand[XInfoCommand.Stream, StreamInfo] =
RedisCommand("XINFO", XInfoStreamInput, StreamInfoOutput)
final val XInfoConsumers: RedisCommand[XInfoCommand.Consumer, Chunk[StreamConsumerInfo]] =
RedisCommand("XINFO", XInfoConsumerInput, StreamConsumerInfoOutput)
final val XLen: RedisCommand[String, Long] = RedisCommand("XLEN", StringInput, LongOutput)
......
......@@ -3,6 +3,7 @@ package zio.redis.options
import zio.duration._
trait Streams {
case object WithForce {
private[redis] def stringify = "FORCE"
}
......@@ -18,11 +19,28 @@ trait Streams {
sealed trait XGroupCommand
object XGroupCommand {
case class Create(key: String, group: String, id: String, mkStream: Boolean) extends XGroupCommand
case class SetId(key: String, group: String, id: String) extends XGroupCommand
case class Destroy(key: String, group: String) extends XGroupCommand
case class CreateConsumer(key: String, group: String, consumer: String) extends XGroupCommand
case class DelConsumer(key: String, group: String, consumer: String) extends XGroupCommand
case class SetId(key: String, group: String, id: String) extends XGroupCommand
case class Destroy(key: String, group: String) extends XGroupCommand
case class CreateConsumer(key: String, group: String, consumer: String) extends XGroupCommand
case class DelConsumer(key: String, group: String, consumer: String) extends XGroupCommand
}
sealed trait XInfoCommand
object XInfoCommand {
case class Group(key: String) extends XInfoCommand
case class Stream(key: String) extends XInfoCommand
case class Consumer(key: String, group: String) extends XInfoCommand
}
case object MkStream {
......@@ -54,4 +72,63 @@ trait Streams {
type NoAck = NoAck.type
case class MaxLen(approximate: Boolean, count: Long)
case class StreamEntry(id: String, fields: Map[String, String])
case class StreamInfo(
length: Option[Long],
radixTreeKeys: Option[Long],
radixTreeNodes: Option[Long],
groups: Option[Long],
lastGeneratedId: Option[String],
firstEntry: Option[StreamEntry],
lastEntry: Option[StreamEntry]
)
object StreamInfo {
def empty: StreamInfo = StreamInfo(None, None, None, None, None, None, None)
}
case class StreamGroupInfo(
name: Option[String],
consumers: Option[Long],
pending: Option[Long],
lastDeliveredId: Option[String]
)
object StreamGroupInfo {
def empty: StreamGroupInfo = StreamGroupInfo(None, None, None, None)
}
case class StreamConsumerInfo(
name: Option[String],
idle: Option[Long],
pending: Option[Long]
)
object StreamConsumerInfo {
def empty: StreamConsumerInfo = StreamConsumerInfo(None, None, None)
}
private[redis] object XInfoFields {
val Name: String = "name"
val Idle: String = "idle"
val Pending: String = "pending"
val Consumers: String = "consumers"
val LastDelivered: String = "last-delivered-id"
val Length: String = "length"
val RadixTreeKeys: String = "radix-tree-keys"
val RadixTreeNodes: String = "radix-tree-nodes"
val Groups: String = "groups"
val LastGeneratedId: String = "last-generated-id"
val FirstEntry: String = "first-entry"
val LastEntry: String = "last-entry"
val Entries: String = "entries"
val PelCount: String = "pel-count"
val SeenTime: String = "seen-time"
}
}
......@@ -947,6 +947,20 @@ object InputSpec extends BaseSpec {
.map(assert(_)(equalTo(respArgs("STREAMS", "a", "c", "b", "d"))))
}
),
suite("XInfo")(
testM("stream info") {
Task(XInfoStreamInput.encode(XInfoCommand.Stream("key")))
.map(assert(_)(equalTo(respArgs("STREAM", "key"))))
},
testM("group info") {
Task(XInfoGroupInput.encode(XInfoCommand.Group("key")))
.map(assert(_)(equalTo(respArgs("GROUPS", "key"))))
},
testM("consumer info") {
Task(XInfoConsumerInput.encode(XInfoCommand.Consumer("key", "group")))
.map(assert(_)(equalTo(respArgs("CONSUMERS", "key", "group"))))
}
),
suite("Group")(
testM("valid value") {
Task(GroupInput.encode(Group("group", "consumer")))
......
......@@ -2,7 +2,7 @@ package zio.redis
import zio.duration._
import zio.redis.Output._
import zio.redis.RedisError._
import zio.redis.RedisError.{ ProtocolError, _ }
import zio.test.Assertion._
import zio.test._
import zio.{ Chunk, Task }
......@@ -498,7 +498,159 @@ object OutputSpec extends BaseSpec {
)
)
Task(XReadOutput.unsafeDecode(input)).either.map(assert(_)(isLeft(isSubtype[ProtocolError](anything))))
}
},
suite("xInfoStream commands output")(
testM("extract valid value") {
val resp = RespValue.array(
RespValue.bulkString("length"),
RespValue.Integer(1),
RespValue.bulkString("radix-tree-keys"),
RespValue.Integer(2),
RespValue.bulkString("radix-tree-nodes"),
RespValue.Integer(3),
RespValue.bulkString("groups"),
RespValue.Integer(2),
RespValue.bulkString("last-generated-id"),
RespValue.bulkString("1-111"),
RespValue.bulkString("first-entry"),
RespValue.array(
RespValue.bulkString("id"),
RespValue.bulkString("1-0"),
RespValue.array(
RespValue.bulkString("key1"),
RespValue.bulkString("value1")
)
),
RespValue.bulkString("last-entry"),
RespValue.array(
RespValue.bulkString("id"),
RespValue.bulkString("1-0"),
RespValue.array(
RespValue.bulkString("key2"),
RespValue.bulkString("value2")
)
)
)
Task(StreamInfoOutput.unsafeDecode(resp)).map { f =>
assert(f)(
equalTo(
StreamInfo(
length = Option(1),
radixTreeKeys = Option(2),
radixTreeNodes = Option(3),
groups = Option(2),
lastGeneratedId = Option("1-111"),
firstEntry = Some(StreamEntry("1-0", Map("key1" -> "value1"))),
lastEntry = Some(StreamEntry("1-0", Map("key2" -> "value2")))
)
)
)
}
},
testM("success when message mis value") {
val resp = RespValue.array(
RespValue.bulkString("length"),
RespValue.Integer(1),
RespValue.bulkString("radix-tree-nodes"),
RespValue.Integer(3)
)
Task(StreamInfoOutput.unsafeDecode(resp)).map { f =>
assert(f)(
equalTo(
StreamInfo(Some(1), None, Some(3), None, None, None, None)
)
)
}
}
),
suite("xInfoGroup commands output")(
testM("extract valid value") {
val resp = RespValue.array(
RespValue.array(
RespValue.bulkString("name"),
RespValue.bulkString("name"),
RespValue.bulkString("consumers"),
RespValue.Integer(10),
RespValue.bulkString("pending"),
RespValue.Integer(100),
RespValue.bulkString("last-delivered-id"),
RespValue.bulkString("1111")
),
RespValue.array(
RespValue.bulkString("name"),
RespValue.bulkString("name2"),
RespValue.bulkString("consumers"),
RespValue.Integer(110),
RespValue.bulkString("pending"),
RespValue.Integer(1100),
RespValue.bulkString("last-delivered-id"),
RespValue.bulkString("1111")
)
)
Task(StreamGroupInfoOutput.unsafeDecode(resp)).map(x =>
assert(x)(
equalTo(
Chunk.apply(
StreamGroupInfo(Some("name"), Some(10), Some(100), Some("1111")),
StreamGroupInfo(Some("name2"), Some(110), Some(1100), Some("1111"))
)
)
)
)
},
testM("empty array when xinfo groups") {
val resp = RespValue.array()
Task(StreamGroupInfoOutput.unsafeDecode(resp)).map { f =>
assert(f)(
equalTo(
Chunk.empty
)
)
}
},
testM("empty array when xinfo consumers") {
val resp = RespValue.array()
Task(StreamConsumerInfoOutput.unsafeDecode(resp)).map { f =>
assert(f)(
equalTo(
Chunk.empty
)
)
}
}
),
suite("XInfoConsumers commands output")(
testM("extract valid value") {
val resp = RespValue.array(
RespValue.array(
RespValue.bulkString("name"),
RespValue.bulkString("name"),
RespValue.bulkString("pending"),
RespValue.Integer(100),
RespValue.bulkString("idle"),
RespValue.Integer(10)
),
RespValue.array(
RespValue.bulkString("name"),
RespValue.bulkString("name2"),
RespValue.bulkString("pending"),
RespValue.Integer(1100),
RespValue.bulkString("idle"),
RespValue.Integer(10)
)
)
Task(StreamConsumerInfoOutput.unsafeDecode(resp)).map(x =>
assert(x)(
equalTo(
Chunk.apply(
StreamConsumerInfo(Some("name"), Some(10), Some(100)),
StreamConsumerInfo(Some("name2"), Some(10), Some(1100))
)
)
)
)
}
)
)
)
......
......@@ -1230,6 +1230,50 @@ trait StreamsSpec extends BaseSpec {
result <- xTrim(stream, 1000L).either
} yield assert(result)(isLeft(isSubtype[WrongType](anything)))
}
),
suite("XInfo")(
testM("stream info") {
for {
stream <- uuid
group <- uuid
_ <- xGroupCreate(stream, group, "$", mkStream = true)
id <- xAdd(stream, "*", "a" -> "b")
result <- xInfoStream(stream)
} yield assert(result.lastEntry.map(_.id))(equalTo(Some(id)))
},
testM("ERR no such key") {
for {
stream <- uuid
result <- xInfoStream(stream).either
} yield assert(result)(isLeft(isSubtype[ProtocolError](anything)))
},
testM("ERR not a stream") {
for {
maybeStream <- uuid
_ <- set(maybeStream, "helloworld")
result <- xInfoStream(maybeStream).either
} yield assert(result)(isLeft(isSubtype[WrongType](anything)))
},
testM("consumers info") {
for {
stream <- uuid
group <- uuid
consumer <- uuid
_ <- xGroupCreate(stream, group, "$", mkStream = true)
_ <- xAdd(stream, "*", "a" -> "b")
_ <- xReadGroup(group, consumer)(stream -> ">")
result <- xInfoConsumers(stream, group)
} yield assert(result.toList.head.name)(equalTo(Some(consumer)))
},
testM("groups info") {
for {
stream <- uuid
group <- uuid
_ <- xGroupCreate(stream, group, "$", mkStream = true)
_ <- xAdd(stream, "*", "a" -> "b")
result <- xInfoGroup(stream)
} yield assert(result.toList.head.name)(equalTo(Some(group)))
}
)
)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册