Code Monkey home page Code Monkey logo

brando's Introduction

Brando

Build Status

A lightweight Redis client for use with Akka.

Using

In your build.sbt

resolvers += "chrisdinn" at "http://chrisdinn.github.io/releases/"

libraryDependencies += "com.digital-achiever" %% "brando" % "3.0.3"

Getting started

Brando is a lightweight wrapper around the Redis protocol.

Create a Redis actor with your server host and port.

  import brando._

  val redis = system.actorOf(Redis("localhost", 6379))

You should specify a database and password if you intend to use them.

  val redis = system.actorOf(Redis("localhost", 6379, database = 5, auth = "password"))

This is important; if your Redis actor restarts you want be sure it reconnects successfully and to the same database.

Next, send it a command and get your response as a reply.

  redis ! Request("PING")

  // Response: Some(Pong)

The Redis protocol supports 5 standard types of reply: Status, Error, Integer, Bulk and Multi Bulk as well as a special NULL Bulk/Multi Bulk reply.

Status replies are returned as case objects, such as Pong and Ok.

  redis ! Request("SET", "some-key", "this-value")

  // Response: Some(Ok)

Error replies are returned as akka.actor.Status.Failure objects containing an an exception with server's response as its message.

  redis ! Request("EXPIRE", "1", "key")
  
  // Response: Failure(brando.RedisException: ERR value is not an integer or out of range)

Integer replies are returned as Option[Long].

  redis ! Request("SADD", "some-set", "one", "two")

  // Response: Some(2)

Bulk replies as Option[akka.util.ByteString].

  redis ! Request("GET", "some-key")

  // Response: Some(ByteString("this-value"))

Multi Bulk replies as Option[List[Option[ByteString]]].

  redis ! Request("SMEMBERS", "some-set")

  // Response: Some(List(Some(ByteString("one")), Some(ByteString("two"))))

NULL replies are returned as None and may appear either on their own or nested inside a Multi Bulk reply.

  redis ! Request("GET", "non-existent-key")

  // Response: None

If you're not sure what to expect in response to a request, please refer to the Redis command documentation at http://redis.io/commands where the reply type for each is clearly stated.

To ensure that a list of requests are executed back to back, the Redis actor can receive the following message :

redis ! Batch(Request("MULTI"), Request("SET", "mykey", "somevalue"), Request("GET", "mykey"), Request("EXEC"))

// Response : List(Some(Ok), Some(Queued), Some(Queued), Some("somevalue"))

This is very usefull in that case since it'll make sure no other requests are executed between the MULTI and EXEC commands. Responses will also be grouped in a single list of the same size as the Batch requests.

Response extractors

Use the provided response extractors to map your Redis reply to a more appropriate Scala type.

  for{ Response.AsString(value) ← redis ? Request("GET", "key") } yield value
  
  //value: String
  
  for{ Response.AsStrings(values) ← redis ? Request("KEYS", "*") } yield values
  
  //values: Seq[String]
  
  for{ Response.AsByteSeqs(value) ← redis ? Request("GET", "key") } yield value
  
  //value: Seq[Byte]
  
  for{ Response.AsStringsHash(fields) ← redis ? Request("HGETALL", "hash-key") } yield fields
  
  //fields: Map[String,String]

Monitoring Connection State Changes

If a set of listeners is provided to the Redis actor when it is created , it will inform the those listeners about state changes to the underlying Redis connection. For example (from inside an actor):

  val redis = context.actorOf(Redis("localhost", 6379, listeners = Set(self)))

Currently, the possible messages sent to each listener include the following:

  • Connecting: When creating a TCP connection.
  • Connected: When a TCP connection has been created, and Authentication (if applicable) has succeeded.
  • Disconnected: The connection has been lost. Redis transparently handles disconnects and will automatically reconnect, so typically no user action at all is needed here. During the time that Redis is disconnected, Redis commands sent will be queued be processed once the connection is reestablished.
  • AuthenticationFailed: The TCP connected was made, but Redis auth failed.
  • ConnectionFailed: A connection could not be established after the number of attempts defined during creation connectionRetryAttempts. Brando will not attempt to recover from this state; the user should take action.

All these messages inherit from the Connection.StateChange trait.

Requests stashing when connection is not established

Current Brando implementation throws an exception when the connection is not established and it receives a request. However, sometimes you do not need the answer right now and you can wait a bit before the connection gets established. The StashingRedis actor provides this functionality, it is designed as a proxy to be used. When it receives requests while the connection is not established, it stashes them. Once the connection gets established, it unstashes them and passed them to the actual Redis actor. When the connection is established, all incoming requests are passed right through.

This was the built-in behavior of Brando 2.x.x

  // create the Redis actor
  val brando = system.actorOf(Redis(listeners = Set(self)))
  // create the stashing proxy
  val proxy = system.actorOf(StashingRedis(brando))
  // query the proxy
  proxy ? Request("PING")

Note: StashingRedis cannot be used with a sharded connection.

Sentinel

Sentinel Client

Sentinel provides support for monitoring, notification and automatic failover using sentinel. It is implemented based on the following guidelines and requires redis 2.8.12 or later.

A sentinel client can be created like this. Here, we are using two servers and we provide a listener to receive Connection.StateChange events.

val sentinel = system.actorOf(Sentinel(Seq(
      Server("localhost", 26380),
      Server("localhost", 26379)), Set(probe.ref)))

You can listen for events using the following:

sentinel ! Request("SENTINEL","SUBSCRIBE", "failover-end")

You can also send commands such as

sentinel ! Request("SENTINEL", "MASTERS")

Redis with Sentinel

Redis can be used with Sentinel to provide automatic failover and discovery. To do so you need to create a Sentinel and a RedisSentinel actor. In this example we are connecting to the master mymaster

	val sentinel = system.actorOf(Sentinel(Seq(
      Server("localhost", 26380),
      Server("localhost", 26379))))

    val redis = system.actorOf(RedisSentinel("mymaster", sentinel))

    redis ! Request("PING")

For reliability we encourage to pass connectionHeartbeatDelay when using RedisSentinel, this will generate a heartbeat to Redis and will improve failures detections in the case of network partitions.

Sharding

Brando provides support for sharding, as outlined in the Redis documentation and in this blog post from antirez.

To use it, simply create an instance of ShardManager, passing it a list of Redis shards you'd like it to connect to. From there, we create a pool of Redis instances - one for each shard.

val shards = Seq(RedisShard("redis1", "10.0.0.1", 6379),
				 RedisShard("redis2", "10.0.0.2", 6379),
				 RedisShard("redis3", "10.0.0.3", 6379))

val shardManager = context.actorOf(ShardManager(shards))

Once an instance of ShardManager has been created, it can be sent several types of messages:

  • Request objects for inferring the shard key from the params
  • Tuple2[String, Request] objects for specifying the shard key explicitly
  • ShardBroadcast objects for broadcasting requests to all shards

Here are some examples,

shardManager ! Request("SET", "mykey", "some_value")
shardManager ! ("myshardkey", Request("SET", "mykey", "some_value"))
shardManager ! BroadcastRequest("LPOP", "mylistkey") // don't use the ask pattern

Note that the ShardManager explicitly requires a key for all operations except for the BroadcastRequest. This is because the key is used to determined which shard each request should be forwarded to. In this context, operations which operate on multiple keys (e.g. MSET, MGET) or no keys at all (e.g. SELECT, FLUSHDB) should be avoided, as they break the Redis sharding model. Also note that the BroadcastRequest must not be used with the ask pattern in Akka or responses will be lost!

Individual shards can have their configuration updated on the fly. To do this, send a Shard message to ShardManager.

shardManager ! RedisShard("redis1", "10.0.0.4", 6379)

val shardManager = context.actorOf(ShardManager(shards, listeners = Set(self)))

The ShardManager will forward all Connection.StateChange messages when a shard changes state.

Sharding with sentinel

It's possible to use sharding with Sentinel, to do so you need to use SentinelShard instead of RedisShard

    val shards = Seq(
      SentinelShard("mymaster1"),
      SentinelShard("mymaster2"))

    val sentinel = system.actorOf(Sentinel()) //defaults host and port are localhost:26379

    val shardManager = context.actorOf(ShardManager(shards,sentinel))

Run the tests

  • Start sentinel

      sudo redis-sentinel redis-config/sentinel.conf --sentinel
    
  • Start a Redis master and slave

      sudo redis-server test-config/redis.conf --loglevel verbose
      sudo mkdir /var/lib/redis-slave
      sudo redis-server test-config/redis-slave.conf --loglevel verbose
    
  • Run the tests

      sbt test
    

Documentation

Read the API documentation here: http://chrisdinn.github.io/api/brando-3.0.3/

Mailing list

Send questions, comments or discussion topics to the mailing list [email protected].

License

This project is released under the Apache License v2, for more details see the 'LICENSE' file.

Contributing

Fork the project, add tests if possible and send a pull request.

Contributors

Chris Dinn, Jason Goodwin, Tyson Hamilton, Gaetan Hervouet, Damien Levin, Matt MacAulay, Arron Norwell

Changelog

v3.x.x

Brando no longer implements akka.actor.Stash. In consequence, all incoming requests throw a RedisDisconnectedException if the Connection is not established. This version delegates the responsibility to the sender, it is no longer handled by the Brando itself. In older versions, when the connection was not established, the requests were stashed. When established, all stashed requests were unstashed and processed. For smoother migration, there is StashingRedis providing the same behavior. For more details see Requests stashing when connection is not established

brando's People

Contributors

alexbool avatar chrisdinn avatar ghouet avatar gramk avatar jasongoodwin avatar karelcemus avatar paintcan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

brando's Issues

Migration guide from v2 to v3

Previous versions of brando implemented Akka Stash and queued requests when the redis was not connected. As part of #46 this behavior was silently dropped and version 3 throws an exception instead, which is significantly different behavior.

  1. I suggest to document this change because I spent several hours debugging why the application is not working after the upgrade.
  2. It would be wonderful to provide some kind of migration guide or and simple option to re-enable this behavior. I really miss it. In asynchronous connection, I don't really think it must be handled by the application itself if the connection can be (re-)establish within a certain timeout.

Thanks!

HA issue - ShardManager will try to send a connect message to a dead actor if it receives an updated shard

It appears if brando has a shard connected to a dead redis instance that actor will die.
If we try to send a new shard to the shard manager for that host, then it will send a Connect message to the dead actor so it can never resolve in some failover scenarios.

I'll look at the code a bit more later - these are high level observations.

Course of action is to make the connections immutable and just poisonpill and recreate connections on Shard messages to the shard manager.

Add Batch Support for Sharding

accept something like (key:String, batch: Batch).
Would be useful to have a way to do multi/exec, watch etc with sharding when doing multiple operations on a shard.

Batched requests can get sent out of order

Batched requests can get sent out of order when a given request fails to write to the tcp socket. The connection will get a Tcp.CommandFailed message and then retry the message. But at this point it is possible other requests have already been sent successfully.

Strange Exception

Do you know what might generate this exception? I am finding this using play-redis, which has a dependency on brando, on Heroku.

a.a.OneForOneStrategy - queue empty java.util.NoSuchElementException: queue empty at scala.collection.mutable.Queue.dequeue(Queue.scala:66) ~[org.scala-lang.scala-library-2.11.7.jar:na] at brando.Connection$$anonfun$receive$1$$anonfun$applyOrElse$4.apply(Connection.scala:91) ~[com.digital-achiever.brando_2.11-3.0.3.jar:3.0.3] at brando.Connection$$anonfun$receive$1$$anonfun$applyOrElse$4.apply(Connection.scala:78) ~[com.digital-achiever.brando_2.11-3.0.3.jar:3.0.3] at brando.ReplyParser$class.parseReply(ReplyParser.scala:145) ~[com.digital-achiever.brando_2.11-3.0.3.jar:3.0.3] at brando.Connection.parseReply(Connection.scala:27) ~[com.digital-achiever.brando_2.11-3.0.3.jar:3.0.3] at brando.Connection$$anonfun$receive$1.applyOrElse(Connection.scala:78) ~[com.digital-achiever.brando_2.11-3.0.3.jar:3.0.3] at akka.actor.Actor$class.aroundReceive(Actor.scala:467) ~[com.typesafe.akka.akka-actor_2.11-2.3.13.jar:na] at brando.Connection.aroundReceive(Connection.scala:27) ~[com.digital-achiever.brando_2.11-3.0.3.jar:3.0.3] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) [com.typesafe.akka.akka-actor_2.11-2.3.13.jar:na] at akka.actor.ActorCell.invoke(ActorCell.scala:487) [com.typesafe.akka.akka-actor_2.11-2.3.13.jar:na]

Authentication doesn't work

I traced issues reported as Brando #64 and play-redis #44 and #29

Backstory

We noticed the first occurrence of the issue on Heroku because it secures its redis instance. It caused excessive logging without any reasonable error message.

However, I successfully reproduced in on localhost when I secured my redis instance.

How to reproduce

Enable redis authentication in redis.conf, e.g., uncomment

requirepass foobared

Code reproducing the issue

Example of Play framework controller:

package controllers

import javax.inject.Inject

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

import play.api.mvc._

import akka.actor.ActorSystem
import akka.pattern._
import brando.Redis

class Application @Inject( )( system: ActorSystem ) extends Controller {

  implicit val timeout = akka.util.Timeout( 1.second )

  val actor = system actorOf Redis( "localhost", 6379, database = 1, auth = Some( "foobared" ) )

  def ping( ) = actor ? brando.Request( "PING" )

  def index = Action.async {

    ping( ).map { case message =>
      Ok( views.html.index( "done", message ) )
    }.recover { case ex =>
      Ok( views.html.index( "error", ex.toString ) )
    }
  }

}

Observation

When used Redis actor the response is the redis is disconnected. When the StashingRedis is used, requests timeout. Both no further reason provided.

Tracing the issue

To trace the issue, I inserted several logging statements into Brando. I figured out two things:

  1. Authentication method is never called even when proper auth is provided. The reason is it stucks in Connecting state but the authentication is executed on Connected.
  2. The following logging statements suggest that Brando always creates new connection with every attempt but probably somehow does not closes the previous. First, actor id changes, second, when deployed on Heroku, it dies after 30 attempts due to 'ERR max number of clients reached'.
2016-04-21T13:07:53.071807+00:00 app[web.1]: Queue(Actor[akka://application/temp/$v])
2016-04-21T13:07:54.081759+00:00 app[web.1]: connecting
2016-04-21T13:07:54.093458+00:00 app[web.1]: Some(Failure(brando.RedisException: NOAUTH Authentication required))
2016-04-21T13:07:54.093624+00:00 app[web.1]: Queue(Actor[akka://application/temp/$w])
2016-04-21T13:07:55.112304+00:00 app[web.1]: connecting
2016-04-21T13:07:55.119680+00:00 app[web.1]: Some(Failure(brando.RedisException: NOAUTH Authentication required))
2016-04-21T13:07:55.119837+00:00 app[web.1]: Queue(Actor[akka://application/temp/$x])
2016-04-21T13:07:56.135040+00:00 app[web.1]: connecting
2016-04-21T13:07:56.140825+00:00 app[web.1]: Some(Failure(brando.RedisException: NOAUTH Authentication required))
2016-04-21T13:07:56.141029+00:00 app[web.1]: Queue(Actor[akka://application/temp/$y])
2016-04-21T13:07:57.152416+00:00 app[web.1]: connecting
2016-04-21T13:07:57.161871+00:00 app[web.1]: Some(Failure(brando.RedisException: NOAUTH Authentication required))
2016-04-21T13:07:57.162046+00:00 app[web.1]: Queue(Actor[akka://application/temp/$z])
2016-04-21T13:07:58.184895+00:00 app[web.1]: connecting
2016-04-21T13:07:58.200840+00:00 app[web.1]: Some(Failure(brando.RedisException: NOAUTH Authentication required))
2016-04-21T13:07:58.210118+00:00 app[web.1]: Queue(Actor[akka://application/temp/$A])
2016-04-21T13:07:59.221666+00:00 app[web.1]: connecting
2016-04-21T13:07:59.226342+00:00 app[web.1]: Some(Failure(brando.RedisException: NOAUTH Authentication required))
2016-04-21T13:07:59.226689+00:00 app[web.1]: Queue(Actor[akka://application/temp/$B])
2016-04-21T13:08:00.243840+00:00 app[web.1]: connecting
2016-04-21T13:08:00.250774+00:00 app[web.1]: Some(Failure(brando.RedisException: NOAUTH Authentication required))
2016-04-21T13:08:00.251051+00:00 app[web.1]: Queue(Actor[akka://application/temp/$C])
2016-04-21T13:08:01.264740+00:00 app[web.1]: connecting
2016-04-21T13:08:01.273008+00:00 app[web.1]: Some(Failure(brando.RedisException: NOAUTH Authentication required))
2016-04-21T13:08:01.273249+00:00 app[web.1]: Queue(Actor[akka://application/temp/$D])
2016-04-21T13:08:02.293235+00:00 app[web.1]: connecting
2016-04-21T13:08:02.297606+00:00 app[web.1]: Some(Failure(brando.RedisException: ERR max number of clients reached))
2016-04-21T13:08:02.297748+00:00 app[web.1]: Queue(Actor[akka://application/temp/$E])

Subsequent issues

When max number of clients is reached, it crashes at this line.

Finally, it goes out of memory after while. However, I am not sure what exactly causes the memory leaking, just guessing.

ReplyParser should cache parse progress on failure

ReplyParser re-parses the entire reply bytestring every time it gets new bytes from Redis. If you get lots of small responses from Redis (which happens, especially with commands like SMEMBERS) it can be surprisingly slow.

Cannot recover from `Connection refused` during connect/reconnect

When testing the recovering of disconnects by creating a ipfw rule on OSX, I find that Brando doesn't seems to recover.

To block/allow a port I'm using :
sudo ipfw add 1 deny tcp from any to any 6379
sudo ipfw del 1 deny tcp from any to any 6379

Seems like when the connection is setup, Brando is able to detect the disconnection when the rule is set. However, it cannot connect or reconnect.

It might be an akka IO issue I just posted something on the user list : https://groups.google.com/forum/#!topic/akka-user/sL_wL_OQgbo

Add Event Messages for Connection Status

It's currently difficult to determine the state of connections in Brando.
Tcp.ConnectionClosed, Tcp.Connected etc should send events (possibly to the parent or to listeners passed in) so that services can respond accordingly.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.