未验证 提交 6b492417 编写于 作者: D Dragutin Marjanović 提交者: GitHub

Remove useless connection commands (#825)

上级 617e6a2d
......@@ -44,12 +44,11 @@ lazy val redis =
.settings(stdSettings("zio-redis"))
.settings(
libraryDependencies ++= List(
"dev.zio" %% "zio-streams" % zioVersion,
"dev.zio" %% "zio-schema" % zioSchemaVersion,
"dev.zio" %% "zio-schema-protobuf" % zioSchemaVersion % Test,
"dev.zio" %% "zio-test" % zioVersion % Test,
"dev.zio" %% "zio-test-sbt" % zioVersion % Test,
"org.scala-lang.modules" %% "scala-collection-compat" % "2.9.0"
"dev.zio" %% "zio-streams" % zioVersion,
"dev.zio" %% "zio-schema" % zioSchemaVersion,
"dev.zio" %% "zio-schema-protobuf" % zioSchemaVersion % Test,
"dev.zio" %% "zio-test" % zioVersion % Test,
"dev.zio" %% "zio-test-sbt" % zioVersion % Test
),
testFrameworks := List(new TestFramework("zio.test.sbt.ZTestFramework"))
)
......
......@@ -139,63 +139,6 @@ object Input {
RespCommand(RespCommandArgument.Literal(data.asString))
}
case object ClientKillInput extends Input[ClientKillFilter] {
def encode(data: ClientKillFilter): RespCommand = data match {
case addr: ClientKillFilter.Address =>
RespCommand(RespCommandArgument.Literal("ADDR"), RespCommandArgument.Value(addr.asString))
case laddr: ClientKillFilter.LocalAddress =>
RespCommand(RespCommandArgument.Literal("LADDR"), RespCommandArgument.Value(laddr.asString))
case ClientKillFilter.Id(clientId) =>
RespCommand(RespCommandArgument.Literal("ID"), RespCommandArgument.Value(clientId.toString))
case ClientKillFilter.Type(clientType) =>
RespCommand(RespCommandArgument.Literal("TYPE"), RespCommandArgument.Literal(clientType.asString))
case ClientKillFilter.User(username) =>
RespCommand(RespCommandArgument.Literal("USER"), RespCommandArgument.Value(username))
case ClientKillFilter.SkipMe(skip) =>
RespCommand(RespCommandArgument.Literal("SKIPME"), RespCommandArgument.Literal(if (skip) "YES" else "NO"))
}
}
case object ClientPauseModeInput extends Input[ClientPauseMode] {
def encode(data: ClientPauseMode): RespCommand =
RespCommand(RespCommandArgument.Literal(data.asString))
}
case object ClientTrackingInput
extends Input[Option[(Option[Long], Option[ClientTrackingMode], Boolean, Chunk[String])]] {
def encode(
data: Option[(Option[Long], Option[ClientTrackingMode], Boolean, Chunk[String])]
): RespCommand =
data match {
case Some((clientRedir, mode, noLoop, prefixes)) =>
val modeChunk = mode match {
case Some(ClientTrackingMode.OptIn) => RespCommand(RespCommandArgument.Literal("OPTIN"))
case Some(ClientTrackingMode.OptOut) => RespCommand(RespCommandArgument.Literal("OPTOUT"))
case Some(ClientTrackingMode.Broadcast) => RespCommand(RespCommandArgument.Literal("BCAST"))
case None => RespCommand.empty
}
val loopChunk = if (noLoop) RespCommand(RespCommandArgument.Literal("NOLOOP")) else RespCommand.empty
RespCommand(RespCommandArgument.Literal("ON")) ++
clientRedir.fold(RespCommand.empty)(id =>
RespCommand(RespCommandArgument.Literal("REDIRECT"), RespCommandArgument.Value(id.toString))
) ++
RespCommand(
prefixes.flatMap(prefix =>
Chunk(RespCommandArgument.Literal("PREFIX"), RespCommandArgument.Value(prefix))
)
) ++
modeChunk ++
loopChunk
case None =>
RespCommand(RespCommandArgument.Literal("OFF"))
}
}
case object ClientTypeInput extends Input[ClientType] {
def encode(data: ClientType): RespCommand =
RespCommand(RespCommandArgument.Literal("TYPE"), RespCommandArgument.Literal(data.asString))
}
case object CommandNameInput extends Input[String] {
def encode(data: String): RespCommand =
RespCommand(RespCommandArgument.CommandName(data))
......@@ -616,11 +559,6 @@ object Input {
_9.encode(data._9) ++ _10.encode(data._10) ++ _11.encode(data._11)
}
case object UnblockBehaviorInput extends Input[UnblockBehavior] {
def encode(data: UnblockBehavior): RespCommand =
RespCommand(RespCommandArgument.Value(data.asString))
}
case object UpdateInput extends Input[Update] {
def encode(data: Update): RespCommand =
RespCommand(RespCommandArgument.Value(data.asString))
......@@ -735,9 +673,4 @@ object Input {
RespCommandArgument.Value(data.id)
)
}
case object YesNoInput extends Input[Boolean] {
def encode(data: Boolean): RespCommand =
RespCommand(RespCommandArgument.Literal(if (data) "YES" else "NO"))
}
}
......@@ -92,96 +92,6 @@ object Output {
}
}
case object ClientInfoOutput extends Output[ClientInfo] {
protected def tryDecode(respValue: RespValue): ClientInfo =
respValue match {
case RespValue.BulkString(s) => ClientInfo.from(s.asString)
case other => throw ProtocolError(s"$other isn't a bulk string")
}
}
case object ClientListOutput extends Output[Chunk[ClientInfo]] {
protected def tryDecode(respValue: RespValue): Chunk[ClientInfo] =
respValue match {
case RespValue.BulkString(s) => ClientInfo.from(s.asString.split("\r\n").filter(_.nonEmpty))
case other => throw ProtocolError(s"$other isn't a bulk string")
}
}
case object ClientTrackingInfoOutput extends Output[ClientTrackingInfo] {
protected def tryDecode(respValue: RespValue): ClientTrackingInfo =
respValue match {
case RespValue.NullArray => throw ProtocolError("Array must not be empty")
case RespValue.Array(values) if values.length % 2 == 0 =>
val fields = values.toList
.grouped(2)
.map {
case (bulk @ RespValue.BulkString(_)) :: value :: Nil => (bulk.asString, value)
case other => throw ProtocolError(s"$other isn't a valid format")
}
.toMap
ClientTrackingInfo(
fields
.get("flags")
.fold(throw ProtocolError("Missing flags field")) {
case RespValue.Array(value) =>
val set = value.map {
case bulk @ RespValue.BulkString(_) => bulk.asString
case other => throw ProtocolError(s"$other isn't a string")
}.toSet
ClientTrackingFlags(
set.contains("on"),
set match {
case s if s.contains("optin") => Some(ClientTrackingMode.OptIn)
case s if s.contains("optout") => Some(ClientTrackingMode.OptOut)
case s if s.contains("bcast") => Some(ClientTrackingMode.Broadcast)
case _ => None
},
set.contains("noloop"),
set match {
case s if s.contains("caching-yes") => Some(true)
case s if s.contains("caching-no") => Some(false)
case _ => None
},
set.contains("broken_redirect")
)
case other => throw ProtocolError(s"$other isn't an array with elements")
},
fields
.get("redirect")
.fold(throw ProtocolError("Missing redirect field")) {
case RespValue.Integer(-1L) => ClientTrackingRedirect.NotEnabled
case RespValue.Integer(0L) => ClientTrackingRedirect.NotRedirected
case RespValue.Integer(v) if v > 0L => ClientTrackingRedirect.RedirectedTo(v)
case other => throw ProtocolError(s"$other isn't an integer >= -1")
},
fields
.get("prefixes")
.fold(throw ProtocolError("Missing prefixes field")) {
case RespValue.NullArray => Set.empty[String]
case RespValue.Array(value) =>
value.map {
case bulk @ RespValue.BulkString(_) => bulk.asString
case other => throw ProtocolError(s"$other isn't a string")
}.toSet[String]
case other => throw ProtocolError(s"$other isn't an array")
}
)
case array @ RespValue.Array(_) => throw ProtocolError(s"$array doesn't have an even number of elements")
case other => throw ProtocolError(s"$other isn't an array")
}
}
case object ClientTrackingRedirectOutput extends Output[ClientTrackingRedirect] {
protected def tryDecode(respValue: RespValue): ClientTrackingRedirect =
respValue match {
case RespValue.Integer(-1L) => ClientTrackingRedirect.NotEnabled
case RespValue.Integer(0L) => ClientTrackingRedirect.NotRedirected
case RespValue.Integer(v) if v > 0L => ClientTrackingRedirect.RedirectedTo(v)
case other => throw ProtocolError(s"$other isn't an integer >= -1")
}
}
case object ClusterPartitionNodeOutput extends Output[Node] {
protected def tryDecode(respValue: RespValue): Node =
respValue match {
......
......@@ -60,21 +60,6 @@ trait Connection extends RedisEnvironment {
command.run(Auth(Some(username), password))
}
/**
* Controls the tracking of the keys in the next command executed by the connection, when tracking is enabled in Optin
* or Optout mode.
*
* @param track
* specifies whether to enable the tracking of the keys in the next command or not
* @return
* the Unit value.
*/
final def clientCaching(track: Boolean): IO[RedisError, Unit] = {
val command = RedisCommand(ClientCaching, YesNoInput, UnitOutput, executor)
command.run(track)
}
/**
* Returns the name of the current connection as set by clientSetName
*
......@@ -87,18 +72,6 @@ trait Connection extends RedisEnvironment {
command.run(())
}
/**
* Returns the client ID we are redirecting our tracking notifications to
*
* @return
* the client ID if the tracking is enabled and the notifications are being redirected
*/
final def clientGetRedir: IO[RedisError, ClientTrackingRedirect] = {
val command = RedisCommand(ClientGetRedir, NoInput, ClientTrackingRedirectOutput, executor)
command.run(())
}
/**
* Returns the ID of the current connection. Every connection ID has certain guarantees:
* - It is never repeated, so if clientID returns the same number, the caller can be sure that the underlying client
......@@ -115,112 +88,6 @@ trait Connection extends RedisEnvironment {
command.run(())
}
/**
* The command returns information and statistics about the current client connection in a mostly human readable
* format.
*
* @return
* information and statistics about the current client
*/
final def clientInfo: IO[RedisError, ClientInfo] = {
val command = RedisCommand(ClientInfo, NoInput, ClientInfoOutput, executor)
command.run(())
}
/**
* Closes a given client connection with the specified address
*
* @param address
* the address of the client to kill
* @return
* the Unit value.
*/
final def clientKill(address: Address): IO[RedisError, Unit] = {
val command = RedisCommand(ClientKill, AddressInput, UnitOutput, executor)
command.run(address)
}
/**
* Closes client connections with the specified filters.The following filters are available:
* - Address(ip, port). Kill all clients connected to specified address
* - LocalAddress(ip, port). Kill all clients connected to specified local (bind) address
* - Id(id). Allows to kill a client by its unique ID field. Client ID's are retrieved using the CLIENT LIST command
* - ClientType, where the type is one of normal, master, replica and pubsub. This closes the connections of all the
* clients in the specified class. Note that clients blocked into the MONITOR command are considered to belong to
* the normal class
* - User(username). Closes all the connections that are authenticated with the specified ACL username, however it
* returns an error if the username does not map to an existing ACL user
* - SkipMe(skip). By default this option is set to yes, that is, the client calling the command will not get
* killed, however setting this option to no will have the effect of also killing the client calling the command
* It is possible to provide multiple filters at the same time. The command will handle multiple filters via
* logical AND
*
* @param filters
* the specified filters for killing clients
* @return
* the number of clients killed.
*/
final def clientKill(filters: ClientKillFilter*): IO[RedisError, Long] = {
val command = RedisCommand(ClientKill, Varargs(ClientKillInput), LongOutput, executor)
command.run(filters)
}
/**
* The command returns information and statistics about the client connections server in a mostly human readable
* format.
*
* @param clientType
* filters the list by client's type
* @param clientIds
* filters the list by client IDs
* @return
* a chunk of information and statistics about clients
*/
final def clientList(
clientType: Option[ClientType] = None,
clientIds: Option[(Long, List[Long])] = None
): IO[RedisError, Chunk[ClientInfo]] = {
val command =
RedisCommand(
ClientList,
Tuple2(OptionalInput(ClientTypeInput), OptionalInput(IdsInput)),
ClientListOutput,
executor
)
command.run((clientType, clientIds))
}
/**
* Able to suspend all the Redis clients for the specified amount of time (in milliseconds). Currently supports two
* modes:
* - All: This is the default mode. All client commands are blocked
* - Write: Clients are only blocked if they attempt to execute a write command
*
* @param timeout
* the length of the pause in milliseconds
* @param mode
* option to specify the client pause mode
* @return
* the Unit value.
*/
final def clientPause(
timeout: Duration,
mode: Option[ClientPauseMode] = None
): IO[RedisError, Unit] = {
val command = RedisCommand(
ClientPause,
Tuple2(DurationMillisecondsInput, OptionalInput(ClientPauseModeInput)),
UnitOutput,
executor
)
command.run((timeout, mode))
}
/**
* Assigns a name to the current connection
*
......@@ -235,141 +102,6 @@ trait Connection extends RedisEnvironment {
command.run(name)
}
/**
* Returns information about the current client connection's use of the server assisted client side caching feature
*
* @return
* tracking information.
*/
final def clientTrackingInfo: IO[RedisError, ClientTrackingInfo] = {
val command = RedisCommand(ClientTrackingInfo, NoInput, ClientTrackingInfoOutput, executor)
command.run(())
}
/**
* Disables the tracking feature of the Redis server, that is used for server assisted client side caching
*
* @return
* the Unit value.
*/
final def clientTrackingOff: IO[RedisError, Unit] = {
val command = RedisCommand(ClientTracking, ClientTrackingInput, UnitOutput, executor)
command.run(None)
}
/**
* Enables the tracking feature of the Redis server, that is used for server assisted client side caching. The feature
* will remain active in the current connection for all its life, unless tracking is turned off with clientTrackingOff
*
* @param redirect
* the ID of the connection we want to send invalidation messages to
* @param trackingMode
* the mode used for tracking
* @param noLoop
* no loop option
* @param prefixes
* the prefixes registered
* @return
* the Unit value.
*/
final def clientTrackingOn(
redirect: Option[Long] = None,
trackingMode: Option[ClientTrackingMode] = None,
noLoop: Boolean = false,
prefixes: Set[String] = Set.empty
): IO[RedisError, Unit] = {
val command = RedisCommand(ClientTracking, ClientTrackingInput, UnitOutput, executor)
command.run(Some((redirect, trackingMode, noLoop, Chunk.fromIterable(prefixes))))
}
/**
* Unblocks, from a different connection, a client blocked in a blocking operation
*
* @param clientId
* the ID of the client to unblock
* @param error
* option to specify the unblocking behavior
* @return
* true if the client was unblocked successfully, or false if the client wasn't unblocked.
*/
final def clientUnblock(
clientId: Long,
error: Option[UnblockBehavior] = None
): IO[RedisError, Boolean] = {
val command =
RedisCommand(ClientUnblock, Tuple2(LongInput, OptionalInput(UnblockBehaviorInput)), BoolOutput, executor)
command.run((clientId, error))
}
/**
* Resumes command processing for all clients that were paused by clientPause
*
* @return
* the Unit value.
*/
final def clientUnpause: IO[RedisError, Unit] = {
val command = RedisCommand(ClientUnpause, NoInput, UnitOutput, executor)
command.run(())
}
/**
* Echoes the given string.
*
* @param message
* the message to be echoed
* @return
* the message.
*/
final def echo(message: String): IO[RedisError, String] = {
val command = RedisCommand(Echo, StringInput, MultiStringOutput, executor)
command.run(message)
}
/**
* Pings the server.
*
* @param message
* the optional message to receive back from server
* @return
* PONG if no argument is provided, otherwise return a copy of the argument as a bulk. This command is often used to
* test if a connection is still alive, or to measure latency.
*/
final def ping(message: Option[String] = None): IO[RedisError, String] = {
val command = RedisCommand(Ping, OptionalInput(StringInput), SingleOrMultiStringOutput, executor)
command.run(message)
}
/**
* Ask the server to close the connection. The connection is closed as soon as all pending replies have been written
* to the client
*
* @return
* the Unit value.
*/
final def quit: IO[RedisError, Unit] = {
val command = RedisCommand(Quit, NoInput, UnitOutput, executor)
command.run(())
}
/**
* Performs a full reset of the connection's server-side context, mimicking the effects of disconnecting and
* reconnecting again
*
* @return
* the Unit value.
*/
final def reset: IO[RedisError, Unit] = {
val command = RedisCommand(Reset, NoInput, ResetOutput, executor)
command.run(())
}
/**
* Changes the database for the current connection to the database having the specified numeric index. The currently
* selected database is a property of the connection; clients should track the selected database and re-select it on
......@@ -388,23 +120,9 @@ trait Connection extends RedisEnvironment {
}
private[redis] object Connection {
final val Auth = "AUTH"
final val ClientCaching = "CLIENT CACHING"
final val ClientGetName = "CLIENT GETNAME"
final val ClientGetRedir = "CLIENT GETREDIR"
final val ClientId = "CLIENT ID"
final val ClientInfo = "CLIENT INFO"
final val ClientKill = "CLIENT KILL"
final val ClientList = "CLIENT LIST"
final val ClientPause = "CLIENT PAUSE"
final val ClientSetName = "CLIENT SETNAME"
final val ClientTracking = "CLIENT TRACKING"
final val ClientTrackingInfo = "CLIENT TRACKINGINFO"
final val ClientUnblock = "CLIENT UNBLOCK"
final val ClientUnpause = "CLIENT UNPAUSE"
final val Echo = "ECHO"
final val Ping = "PING"
final val Quit = "QUIT"
final val Reset = "RESET"
final val Select = "SELECT"
final val Auth = "AUTH"
final val ClientGetName = "CLIENT GETNAME"
final val ClientId = "CLIENT ID"
final val ClientSetName = "CLIENT SETNAME"
final val Select = "SELECT"
}
......@@ -16,221 +16,10 @@
package zio.redis.options
import zio.{Chunk, Duration}
import java.net.InetAddress
import scala.collection.compat._
trait Connection {
sealed case class Address(ip: InetAddress, port: Int) {
private[redis] final def asString: String = s"${ip.getHostAddress}:$port"
}
object Address {
private[redis] final def fromString(addr: String): Option[Address] =
addr.split(":").toList match {
case ip :: port :: Nil => port.toIntOption.map(new Address(InetAddress.getByName(ip), _))
case _ => None
}
}
sealed case class ClientEvents(readable: Boolean = false, writable: Boolean = false)
sealed trait ClientFlag
object ClientFlag {
case object Blocked extends ClientFlag
case object BroadcastTrackingMode extends ClientFlag
case object IsMaster extends ClientFlag
case object KeysTrackingEnabled extends ClientFlag
case object MonitorMode extends ClientFlag
case object MultiExecContext extends ClientFlag
case object PubSub extends ClientFlag
case object ReadOnlyMode extends ClientFlag
case object Replica extends ClientFlag
case object ToBeClosedAfterReply extends ClientFlag
case object ToBeClosedAsap extends ClientFlag
case object TrackingTargetClientInvalid extends ClientFlag
case object Unblocked extends ClientFlag
case object UnixDomainSocket extends ClientFlag
case object WatchedKeysModified extends ClientFlag
private[redis] lazy val Flags =
Map(
'A' -> ClientFlag.ToBeClosedAsap,
'b' -> ClientFlag.Blocked,
'B' -> ClientFlag.BroadcastTrackingMode,
'c' -> ClientFlag.ToBeClosedAfterReply,
'd' -> ClientFlag.WatchedKeysModified,
'M' -> ClientFlag.IsMaster,
'O' -> ClientFlag.MonitorMode,
'P' -> ClientFlag.PubSub,
'r' -> ClientFlag.ReadOnlyMode,
'R' -> ClientFlag.TrackingTargetClientInvalid,
'S' -> ClientFlag.Replica,
't' -> ClientFlag.KeysTrackingEnabled,
'u' -> ClientFlag.Unblocked,
'U' -> ClientFlag.UnixDomainSocket,
'x' -> ClientFlag.MultiExecContext
)
}
sealed case class ClientInfo(
id: Option[Long] = None,
name: Option[String] = None,
address: Option[Address] = None,
localAddress: Option[Address] = None,
fileDescriptor: Option[Long] = None,
age: Option[Duration] = None,
idle: Option[Duration] = None,
flags: Set[ClientFlag] = Set.empty,
databaseId: Option[Long] = None,
subscriptions: Option[Int] = None,
patternSubscriptions: Option[Int] = None,
multiCommands: Option[Int] = None,
queryBufferLength: Option[Int] = None,
queryBufferFree: Option[Int] = None,
outputBufferLength: Option[Int] = None,
outputListLength: Option[Int] = None,
outputBufferMem: Option[Long] = None,
events: ClientEvents = ClientEvents(),
lastCommand: Option[String] = None,
argvMemory: Option[Long] = None,
totalMemory: Option[Long] = None,
redirectionClientId: Option[Long] = None,
user: Option[String] = None
)
object ClientInfo {
private[redis] final def from(line: String): ClientInfo = {
val data = line.trim.split(" ").map(_.split("=").toList).collect { case k :: v :: Nil => k -> v }.toMap
val events = data.get("events")
new ClientInfo(
id = data.get("id").flatMap(_.toLongOption),
name = data.get("name"),
address = data.get("addr").flatMap(Address.fromString),
localAddress = data.get("laddr").flatMap(Address.fromString),
fileDescriptor = data.get("fd").flatMap(_.toLongOption),
age = data.get("age").flatMap(_.toLongOption).map(Duration.fromSeconds),
idle = data.get("idle").flatMap(_.toLongOption).map(Duration.fromSeconds),
flags = data
.get("flags")
.fold(Set.empty[ClientFlag])(_.foldLeft(Set.empty[ClientFlag])((fs, f) => fs ++ ClientFlag.Flags.get(f))),
databaseId = data.get("id").flatMap(_.toLongOption),
subscriptions = data.get("sub").flatMap(_.toIntOption),
patternSubscriptions = data.get("psub").flatMap(_.toIntOption),
multiCommands = data.get("multi").flatMap(_.toIntOption),
queryBufferLength = data.get("qbuf").flatMap(_.toIntOption),
queryBufferFree = data.get("qbuf-free").flatMap(_.toIntOption),
outputListLength = data.get("oll").flatMap(_.toIntOption),
outputBufferMem = data.get("omem").flatMap(_.toLongOption),
events = ClientEvents(readable = events.exists(_.contains("r")), writable = events.exists(_.contains("w"))),
lastCommand = data.get("cmd"),
argvMemory = data.get("argv-mem").flatMap(_.toLongOption),
totalMemory = data.get("total-mem").flatMap(_.toLongOption),
redirectionClientId = data.get("redir").flatMap(_.toLongOption),
user = data.get("user")
)
}
private[redis] final def from(lines: Array[String]): Chunk[ClientInfo] =
Chunk.fromArray(lines.map(from))
}
sealed trait ClientKillFilter
object ClientKillFilter {
sealed case class Address(ip: InetAddress, port: Int) extends ClientKillFilter {
private[redis] final def asString: String = s"${ip.getHostAddress}:$port"
}
sealed case class LocalAddress(ip: InetAddress, port: Int) extends ClientKillFilter {
private[redis] final def asString: String = s"${ip.getHostAddress}:$port"
}
sealed case class Id(id: Long) extends ClientKillFilter
sealed case class Type(clientType: ClientType) extends ClientKillFilter
sealed case class User(username: String) extends ClientKillFilter
sealed case class SkipMe(skip: Boolean) extends ClientKillFilter
}
sealed trait ClientPauseMode { self =>
private[redis] final def asString: String =
self match {
case ClientPauseMode.All => "ALL"
case ClientPauseMode.Write => "WRITE"
}
}
object ClientPauseMode {
case object All extends ClientPauseMode
case object Write extends ClientPauseMode
}
sealed case class ClientTrackingFlags(
clientSideCaching: Boolean,
trackingMode: Option[ClientTrackingMode] = None,
noLoop: Boolean = false,
caching: Option[Boolean] = None,
brokenRedirect: Boolean = false
)
sealed case class ClientTrackingInfo(
flags: ClientTrackingFlags,
redirect: ClientTrackingRedirect,
prefixes: Set[String] = Set.empty
)
sealed trait ClientTrackingMode { self =>
private[redis] final def asString: String =
self match {
case ClientTrackingMode.OptIn => "OPTIN"
case ClientTrackingMode.OptOut => "OPTOUT"
case ClientTrackingMode.Broadcast => "BCAST"
}
}
object ClientTrackingMode {
case object OptIn extends ClientTrackingMode
case object OptOut extends ClientTrackingMode
case object Broadcast extends ClientTrackingMode
}
sealed trait ClientTrackingRedirect
object ClientTrackingRedirect {
case object NotEnabled extends ClientTrackingRedirect
case object NotRedirected extends ClientTrackingRedirect
sealed case class RedirectedTo(clientId: Long) extends ClientTrackingRedirect
}
sealed trait ClientType { self =>
private[redis] final def asString: String =
self match {
case ClientType.Normal => "NORMAL"
case ClientType.Master => "MASTER"
case ClientType.Replica => "REPLICA"
case ClientType.PubSub => "PUBSUB"
}
}
object ClientType {
case object Normal extends ClientType
case object Master extends ClientType
case object Replica extends ClientType
case object PubSub extends ClientType
}
sealed trait UnblockBehavior { self =>
private[redis] final def asString: String =
self match {
case UnblockBehavior.Timeout => "TIMEOUT"
case UnblockBehavior.Error => "ERROR"
}
}
object UnblockBehavior {
case object Timeout extends UnblockBehavior
case object Error extends UnblockBehavior
}
}
......@@ -5,8 +5,6 @@ import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._
import java.net.InetAddress
trait ConnectionSpec extends BaseSpec {
def connectionSuite: Spec[Redis, RedisError] =
suite("connection")(
......@@ -18,26 +16,6 @@ trait ConnectionSpec extends BaseSpec {
} yield assert(res)(isUnit)
}
),
suite("clientCaching")(
test("track keys") {
for {
redis <- ZIO.service[Redis]
_ <- redis.clientTrackingOff
_ <- redis.clientTrackingOn(trackingMode = Some(ClientTrackingMode.OptIn))
_ <- redis.clientCaching(true)
trackingInfo <- redis.clientTrackingInfo
} yield assert(trackingInfo.flags.caching)(isSome(isTrue))
},
test("don't track keys") {
for {
redis <- ZIO.service[Redis]
_ <- redis.clientTrackingOff
_ <- redis.clientTrackingOn(trackingMode = Some(ClientTrackingMode.OptOut))
_ <- redis.clientCaching(false)
trackingInfo <- redis.clientTrackingInfo
} yield assert(trackingInfo.flags.caching)(isSome(isFalse))
}
),
suite("clientId")(
test("get client id") {
for {
......@@ -45,189 +23,12 @@ trait ConnectionSpec extends BaseSpec {
} yield assert(id)(isGreaterThan(0L))
}
),
suite("clientInfo")(
test("get client info") {
for {
redis <- ZIO.service[Redis]
id <- redis.clientId
info <- ZIO.serviceWithZIO[Redis](_.clientInfo)
} yield assert(info.id)(isSome(equalTo(id))) && assert(info.name)(isNone)
}
),
suite("clientKill")(
test("error when a connection with the specified address doesn't exist") {
for {
error <- ZIO.serviceWithZIO[Redis](_.clientKill(Address(InetAddress.getByName("0.0.0.0"), 0)).either)
} yield assert(error)(isLeft)
},
test("specify filters that don't kill the connection") {
for {
clientsKilled <-
ZIO.serviceWithZIO[Redis](_.clientKill(ClientKillFilter.SkipMe(false), ClientKillFilter.Id(3341L)))
} yield assert(clientsKilled)(equalTo(0L))
},
test("specify filters that kill the connection but skipme is enabled") {
for {
redis <- ZIO.service[Redis]
id <- redis.clientId
clientsKilled <- redis.clientKill(ClientKillFilter.SkipMe(true), ClientKillFilter.Id(id))
} yield assert(clientsKilled)(equalTo(0L))
}
),
suite("clientList")(
test("get clients' info") {
for {
info <- ZIO.serviceWithZIO[Redis](_.clientList())
} yield assert(info)(isNonEmpty)
},
test("get clients' info filtered by type") {
for {
redis <- ZIO.service[Redis]
infoNormal <- redis.clientList(Some(ClientType.Normal))
infoPubSub <- redis.clientList(Some(ClientType.PubSub))
} yield assert(infoNormal)(isNonEmpty) && assert(infoPubSub)(isEmpty)
},
test("get clients' info filtered by client IDs") {
for {
redis <- ZIO.service[Redis]
id <- redis.clientId
nonExistingId = id + 1
info <- redis.clientList(clientIds = Some((id, Nil)))
infoNonExisting <- redis.clientList(clientIds = Some((nonExistingId, Nil)))
} yield assert(info)(isNonEmpty) && assert(info.head.id)(isSome(equalTo(id))) && assert(infoNonExisting)(
isEmpty
)
}
),
suite("clientGetRedir")(
test("tracking disabled") {
for {
redis <- ZIO.service[Redis]
_ <- redis.clientTrackingOff
redir <- redis.clientGetRedir
} yield assert(redir)(equalTo(ClientTrackingRedirect.NotEnabled))
},
test("tracking enabled but not redirecting") {
for {
redis <- ZIO.service[Redis]
_ <- redis.clientTrackingOn()
redir <- redis.clientGetRedir
} yield assert(redir)(equalTo(ClientTrackingRedirect.NotRedirected))
}
),
suite("client pause and unpause")(
test("clientPause") {
for {
unit <- ZIO.serviceWithZIO[Redis](_.clientPause(1.second, Some(ClientPauseMode.All)))
} yield assert(unit)(isUnit)
},
test("clientUnpause") {
for {
unit <- ZIO.serviceWithZIO[Redis](_.clientUnpause)
} yield assert(unit)(isUnit)
}
),
test("set and get name") {
for {
redis <- ZIO.service[Redis]
_ <- redis.clientSetName("foo")
name <- redis.clientGetName
} yield assert(name.getOrElse(""))(equalTo("foo"))
} @@ clusterExecutorUnsupported,
suite("clientTracking")(
test("enable tracking in broadcast mode and with prefixes") {
for {
redis <- ZIO.service[Redis]
_ <- redis.clientTrackingOff
_ <- redis.clientTrackingOn(None, Some(ClientTrackingMode.Broadcast), prefixes = Set("foo"))
trackingInfo <- redis.clientTrackingInfo
} yield assert(trackingInfo.redirect)(equalTo(ClientTrackingRedirect.NotRedirected)) &&
assert(trackingInfo.flags)(
equalTo(ClientTrackingFlags(clientSideCaching = true, trackingMode = Some(ClientTrackingMode.Broadcast)))
) &&
assert(trackingInfo.prefixes)(equalTo(Set("foo")))
},
test("disable tracking") {
for {
redis <- ZIO.service[Redis]
_ <- redis.clientTrackingOff
trackingInfo <- redis.clientTrackingInfo
} yield assert(trackingInfo.redirect)(equalTo(ClientTrackingRedirect.NotEnabled)) &&
assert(trackingInfo.flags)(
equalTo(ClientTrackingFlags(clientSideCaching = false))
) &&
assert(trackingInfo.prefixes)(equalTo(Set.empty[String]))
}
),
suite("clientTrackingInfo")(
test("get tracking info when tracking is disabled") {
for {
redis <- ZIO.service[Redis]
_ <- redis.clientTrackingOff
trackingInfo <- redis.clientTrackingInfo
} yield assert(trackingInfo)(
equalTo(
ClientTrackingInfo(
flags = ClientTrackingFlags(clientSideCaching = false),
redirect = ClientTrackingRedirect.NotEnabled
)
)
)
},
test("get tracking info when tracking is enabled in optin mode with noloop and caching on") {
for {
redis <- ZIO.service[Redis]
_ <- redis.clientTrackingOff
_ <- redis.clientTrackingOn(trackingMode = Some(ClientTrackingMode.OptIn), noLoop = true)
_ <- redis.clientCaching(true)
trackingInfo <- redis.clientTrackingInfo
} yield assert(trackingInfo)(
equalTo(
ClientTrackingInfo(
flags = ClientTrackingFlags(
clientSideCaching = true,
trackingMode = Some(ClientTrackingMode.OptIn),
caching = Some(true),
noLoop = true
),
redirect = ClientTrackingRedirect.NotRedirected
)
)
)
}
),
suite("clientUnblock")(
test("unblock client that isn't blocked") {
for {
redis <- ZIO.service[Redis]
id <- redis.clientId
bool <- redis.clientUnblock(id)
} yield assert(bool)(equalTo(false))
}
),
suite("ping")(
test("PING with no input") {
ZIO.serviceWithZIO[Redis](_.ping(None).map(assert(_)(equalTo("PONG"))))
} @@ clusterExecutorUnsupported,
test("PING with input") {
ZIO.serviceWithZIO[Redis](_.ping(Some("Hello")).map(assert(_)(equalTo("Hello"))))
},
test("PING with a string argument will not lock executor") {
ZIO.serviceWithZIO[Redis](
_.ping(Some("Hello with a newline\n")).map(assert(_)(equalTo("Hello with a newline\n")))
)
},
test("PING with a multiline string argument will not lock executor") {
ZIO.serviceWithZIO[Redis](
_.ping(Some("Hello with a newline\r\nAnd another line\n"))
.map(assert(_)(equalTo("Hello with a newline\r\nAnd another line\n")))
)
}
),
test("reset") {
for {
unit <- ZIO.serviceWithZIO[Redis](_.reset)
} yield assert(unit)(isUnit)
} @@ clusterExecutorUnsupported
) @@ sequential
}
......@@ -226,85 +226,6 @@ object InputSpec extends BaseSpec {
} yield assert(result)(equalTo(RespCommand(Literal("CH"))))
}
),
suite("ClientKill")(
test("address") {
for {
address <- ZIO.succeed(InetAddress.getByName("127.0.0.1"))
port <- ZIO.succeed(42)
result <- ZIO.attempt(ClientKillInput.encode(ClientKillFilter.Address(address, port)))
} yield assert(result)(equalTo(RespCommand(Literal("ADDR"), Value("127.0.0.1:42"))))
},
test("local address") {
for {
address <- ZIO.succeed(InetAddress.getByName("127.0.0.1"))
port <- ZIO.succeed(42)
result <- ZIO.attempt(ClientKillInput.encode(ClientKillFilter.LocalAddress(address, port)))
} yield assert(result)(equalTo(RespCommand(Literal("LADDR"), Value(s"127.0.0.1:42"))))
},
test("client id") {
for {
id <- ZIO.succeed(42L)
result <- ZIO.attempt(ClientKillInput.encode(ClientKillFilter.Id(id)))
} yield assert(result)(equalTo(RespCommand(Literal("ID"), Value("42"))))
},
test("type") {
for {
clientType <- ZIO.succeed(ClientType.PubSub)
result <- ZIO.attempt(ClientKillInput.encode(ClientKillFilter.Type(clientType)))
} yield assert(result)(equalTo(RespCommand(Literal("TYPE"), Literal("PUBSUB"))))
},
test("user") {
for {
user <- ZIO.succeed("Foo Bar")
result <- ZIO.attempt(ClientKillInput.encode(ClientKillFilter.User(user)))
} yield assert(result)(equalTo(RespCommand(Literal("USER"), Value("Foo Bar"))))
},
test("skip me") {
for {
result <- ZIO.attempt(ClientKillInput.encode(ClientKillFilter.SkipMe(true)))
} yield assert(result)(equalTo(RespCommand(Literal("SKIPME"), Literal("YES"))))
}
),
suite("ClientPauseMode")(
test("all") {
for {
result <- ZIO.attempt(ClientPauseModeInput.encode(ClientPauseMode.All))
} yield assert(result)(equalTo(RespCommand(Literal("ALL"))))
},
test("write") {
for {
result <- ZIO.attempt(ClientPauseModeInput.encode(ClientPauseMode.Write))
} yield assert(result)(equalTo(RespCommand(Literal("WRITE"))))
}
),
suite("ClientTracking")(
test("off") {
for {
result <- ZIO.attempt(ClientTrackingInput.encode(None))
} yield assert(result)(equalTo(RespCommand(Literal("OFF"))))
},
test("client redirect with noloop and prefixes") {
for {
clientId <- ZIO.succeed(42L)
prefixes <- ZIO.succeed(Chunk("prefix1", "prefix2", "prefix3"))
result <- ZIO.attempt(ClientTrackingInput.encode(Some((Some(clientId), None, true, prefixes))))
} yield assert(result)(
equalTo(
RespCommand(Literal("ON"), Literal("REDIRECT"), Value(clientId.toString)) ++ prefixes
.map(p => RespCommand(Literal("PREFIX"), Value(p)))
.fold(RespCommand.empty)(_ ++ _) ++ RespCommand(Literal("NOLOOP"))
)
)
},
test("broadcast mode") {
for {
result <-
ZIO.attempt(
ClientTrackingInput.encode(Some((None, Some(ClientTrackingMode.Broadcast), false, Chunk.empty)))
)
} yield assert(result)(equalTo(RespCommand(Literal("ON"), Literal("BCAST"))))
}
),
suite("Copy")(
test("valid value") {
for {
......@@ -1094,18 +1015,6 @@ object InputSpec extends BaseSpec {
)
}
),
suite("UnblockBehavior")(
test("timeout") {
for {
result <- ZIO.attempt(UnblockBehaviorInput.encode(UnblockBehavior.Timeout))
} yield assert(result)(equalTo(RespCommand(Value("TIMEOUT"))))
},
test("error") {
for {
result <- ZIO.attempt(UnblockBehaviorInput.encode(UnblockBehavior.Error))
} yield assert(result)(equalTo(RespCommand(Value("ERROR"))))
}
),
suite("Varargs")(
test("with multiple elements") {
for {
......@@ -1387,18 +1296,6 @@ object InputSpec extends BaseSpec {
} yield assert(result)(equalTo(RespCommand(Key("key"), Literal("PERSIST")))) &&
assert(resultWithoutOption)(equalTo(RespCommand(Key("key"))))
}
),
suite("YesNo")(
test("yes") {
for {
result <- ZIO.attempt(YesNoInput.encode(true))
} yield assert(result)(equalTo(RespCommand(Literal("YES"))))
},
test("no") {
for {
result <- ZIO.attempt(YesNoInput.encode(false))
} yield assert(result)(equalTo(RespCommand(Literal("NO"))))
}
)
)
}
......@@ -886,156 +886,6 @@ object OutputSpec extends BaseSpec {
)
}
)
),
suite("ClientTrackingInfo")(
test("extract with tracking off") {
for {
resp <- ZIO.succeed(
RespValue
.array(
RespValue.bulkString("flags"),
RespValue.array(RespValue.bulkString("off")),
RespValue.bulkString("redirect"),
RespValue.Integer(-1L),
RespValue.bulkString("prefixes"),
RespValue.NullArray
)
)
expectedInfo <- ZIO.succeed(
ClientTrackingInfo(
ClientTrackingFlags(
clientSideCaching = false
),
ClientTrackingRedirect.NotEnabled
)
)
res <- ZIO.attempt(ClientTrackingInfoOutput.unsafeDecode(resp))
} yield assert(res)(equalTo(expectedInfo))
},
test("extract with flags set") {
for {
resp <- ZIO.succeed(
RespValue
.array(
RespValue.bulkString("flags"),
RespValue.array(
RespValue.bulkString("on"),
RespValue.bulkString("optin"),
RespValue.bulkString("caching-yes"),
RespValue.bulkString("noloop")
),
RespValue.bulkString("redirect"),
RespValue.Integer(0L),
RespValue.bulkString("prefixes"),
RespValue.NullArray
)
)
expectedInfo <- ZIO.succeed(
ClientTrackingInfo(
ClientTrackingFlags(
clientSideCaching = true,
trackingMode = Some(ClientTrackingMode.OptIn),
noLoop = true,
caching = Some(true)
),
ClientTrackingRedirect.NotRedirected
)
)
res <- ZIO.attempt(ClientTrackingInfoOutput.unsafeDecode(resp))
} yield assert(res)(equalTo(expectedInfo))
},
test("extract with redirect id and broken redirect flag") {
for {
resp <- ZIO.succeed(
RespValue
.array(
RespValue.bulkString("flags"),
RespValue.array(RespValue.bulkString("on"), RespValue.bulkString("broken_redirect")),
RespValue.bulkString("redirect"),
RespValue.Integer(42L),
RespValue.bulkString("prefixes"),
RespValue.NullArray
)
)
expectedInfo <- ZIO.succeed(
ClientTrackingInfo(
ClientTrackingFlags(clientSideCaching = true, brokenRedirect = true),
ClientTrackingRedirect.RedirectedTo(42L)
)
)
res <- ZIO.attempt(ClientTrackingInfoOutput.unsafeDecode(resp))
} yield assert(res)(equalTo(expectedInfo))
},
test("extract with specified prefixes") {
for {
resp <- ZIO.succeed(
RespValue
.array(
RespValue.bulkString("flags"),
RespValue.array(RespValue.bulkString("on"), RespValue.bulkString("bcast")),
RespValue.bulkString("redirect"),
RespValue.Integer(0L),
RespValue.bulkString("prefixes"),
RespValue.array(
RespValue.bulkString("prefix1"),
RespValue.bulkString("prefix2"),
RespValue.bulkString("prefix3")
)
)
)
expectedInfo <- ZIO.succeed(
ClientTrackingInfo(
ClientTrackingFlags(
clientSideCaching = true,
trackingMode = Some(ClientTrackingMode.Broadcast)
),
ClientTrackingRedirect.NotRedirected,
Set("prefix1", "prefix2", "prefix3")
)
)
res <- ZIO.attempt(ClientTrackingInfoOutput.unsafeDecode(resp))
} yield assert(res)(equalTo(expectedInfo))
},
test("error when fields are missing") {
for {
resp <- ZIO.succeed(
RespValue
.array(
RespValue.bulkString("redirect"),
RespValue.Integer(42L),
RespValue.bulkString("prefixes"),
RespValue.NullArray
)
)
res <- ZIO.attempt(ClientTrackingInfoOutput.unsafeDecode(resp)).either
} yield assert(res)(isLeft(isSubtype[ProtocolError](anything)))
}
),
suite("ClientTrackingRedirect")(
test("extract not enabled") {
for {
resp <- ZIO.succeed(RespValue.Integer(-1L))
res <- ZIO.attempt(ClientTrackingRedirectOutput.unsafeDecode(resp))
} yield assert(res)(equalTo(ClientTrackingRedirect.NotEnabled))
},
test("extract not redirected") {
for {
resp <- ZIO.succeed(RespValue.Integer(0L))
res <- ZIO.attempt(ClientTrackingRedirectOutput.unsafeDecode(resp))
} yield assert(res)(equalTo(ClientTrackingRedirect.NotRedirected))
},
test("extract redirect id") {
for {
resp <- ZIO.succeed(RespValue.Integer(42L))
res <- ZIO.attempt(ClientTrackingRedirectOutput.unsafeDecode(resp))
} yield assert(res)(equalTo(ClientTrackingRedirect.RedirectedTo(resp.value)))
},
test("error when redirect id is invalid") {
for {
resp <- ZIO.succeed(RespValue.Integer(-42L))
res <- ZIO.attempt(ClientTrackingRedirectOutput.unsafeDecode(resp)).either
} yield assert(res)(isLeft(isSubtype[ProtocolError](anything)))
}
)
)
......
......@@ -88,12 +88,9 @@ object BuildHelper {
private def extraOptions(scalaVersion: String, optimize: Boolean) =
CrossVersion.partialVersion(scalaVersion) match {
case Some((3, _)) =>
List("-language:implicitConversions", "-Xignore-scala2-macros", "-Wconf:origin=scala.collection.compat.*:s")
List("-language:implicitConversions", "-Xignore-scala2-macros")
case Some((2, 13)) =>
List(
"-Ywarn-unused:params,-implicits",
"-Wconf:origin=scala.collection.compat.*:s"
) ++ std2xOptions ++ optimizerOptions(optimize)
List("-Ywarn-unused:params,-implicits") ++ std2xOptions ++ optimizerOptions(optimize)
case Some((2, 12)) =>
List(
"-opt-warnings",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册