Code Monkey home page Code Monkey logo

riemann-scala-client's Introduction

riemann-scala-client

Scala client library for sending events to Riemann, featuring strong typing, asynchronous API (using Akka under the hood) and a DSL to avoid cluttering the application codebase with metrics-related code.

Build Status

  • Current Stable version: 0.4.0 for scala 2.11 and scala 2.10, on master.
  • Old Stable version: 0.3.4 for scala 2.11 and scala 2.10, on master.
  • Current Stable version: 0.3.2 for scala 2.10 on the v0.3-scala210 branch.
  • Previous Stable version: 0.2.1 for scala 2.9 on the v0.2-scala29 branch.

Usage

Build System

In build.sbt (scala 2.11.7):

# riemann-java-client comes from clojars.org
resolvers += "clojars.org" at "http://clojars.org/repo"

libraryDependencies += "net.benmur" %% "riemann-scala-client" % "0.4.0"

Or in pom.xml if you are using maven:

<dependency>
  <groupId>net.benmur</groupId>
  <artifactId>riemann-scala-client_2.11</artifactId>
  <version>0.4.0</version>
</dependency>
<dependency>
  <groupId>net.benmur</groupId>
  <artifactId>riemann-scala-client_2.10</artifactId>
  <version>0.4.0</version>
</dependency>

Minimum viable use case

The imports list is somewhat longer than it used to be, because Scala 2.11 became more picky about choosing implicits (having both Reliable and Unreliable variants of SendOff in scope makes it unable to choose either one).

Issue #10 is open about simplifying imports again.

import net.benmur.riemann.client.RiemannClient.{riemannConnectAs, Unreliable}
import net.benmur.riemann.client.UnreliableIO._
import net.benmur.riemann.client.EventSenderDSL._
import net.benmur.riemann.client.EventDSL._

import akka.actor.ActorSystem
import akka.util.Timeout
import scala.concurrent.duration.DurationInt
import java.net.InetSocketAddress

object RiemannSendTest extends App {
    implicit val system = ActorSystem()
    implicit val timeout = Timeout(5.seconds)

    val metrics = riemannConnectAs[Unreliable] to new InetSocketAddress("localhost", 5555)
    service("service name") | state("warning") |>> metrics
}

Change Unreliable to Reliable and UnreliableIO to ReliableIO as needed, which will make the |>< sending operation (returning a Future) available.

Connecting

val tcpDestination = riemannConnectAs[Reliable] to new InetSocketAddress("localhost", 5555)
val udpDestination = riemannConnectAs[Unreliable] to new InetSocketAddress("localhost", 5555)

Please note that operations returning a Future won't compile if the connection is created with an Unreliable type parameter, this is intentional. (Well, it will compile if you have an implicit in scope implementing SendAndExpectFeedback[_, _, Unreliable]).

Building events

Building an event is done by combining event parts. Each part is optional, as per the Protocol Buffers definition. Here is how to build a completely populated event:

val event = oneEvent() | host("hostname") | service("service xyz") | state("warning") | time(1234L) | 
            description("metric is way too high") | tags("performance", "slow", "provider-xyz") | 
            metric(0.8f) | ttl(120L)

// which is the same as (but with less intermediate objects instanciated):
val event2 = EventPart(host=Some("hostname"), service=Some("service xyz"), state=Some("warning"),
             description=Some("metric is way too high"), time=Some(1234L),
             tags=Seq("performance", "slow", "provider-xyz"), metric=Some(0.8f), ttl=Some(120L))

Setting default event values

// given these declarations:
val destination = riemannConnectAs[Reliable] to new InetSocketAddress("localhost", 5555)
val destinationWithDefaults = destination withValues(host("host") | service("service response time"))

// this:
state("warning") | metric(0.5) |>> destinationWithDefaults
// is the same as:
host("host") | service("service response time") | state("warning") | metric(0.5) |>> destination

// implementing a counter or heartbeat is easy:
val signupCounter = destination withValues(service("Successful user registration"))
oneEvent() |>> signupCounter

Sending events: fire-and-forget mode (over UDP)

val metricsDestination = riemannConnectAs[Unreliable] to new
  InetSocketAddress("localhost", 5555) withValues(host("host") | service("service response time"))

state("warning") | metric(0.5) |>> metricsDestination

Sending events: fire-and-forget mode (over TCP)

val metricsDestination = riemannConnectAs[Reliable] to new
  InetSocketAddress("localhost", 5555) withValues(host("host") | service("service response time"))

state("warning") | metric(0.5) |>> metricsDestination

Sending events and waiting for a Future (over TCP)

val metricsDestination = riemannConnectAs[Reliable] to new
  InetSocketAddress("localhost", 5555) withValues(host("host") | service("service response time"))

state("warning") | metric(0.5) |>< metricsDestination onComplete {
  case Left(exception) => // ...
  case Right(false)    => println("not sent ok!!")
  case Right(true)     => println("sent ok")
}

Sending a text query (over TCP)

val metricsDestination = riemannConnectAs[Reliable] to new InetSocketAddress("localhost", 5555)

Query("tagged \"slow\"") |>< metricsDestination onComplete {
  case Left(exception) => // ...
  case Right(events)   => events foreach println
}

Please note that operations returning a Future won't compile if the connection is created with an Unreliable type parameter, this is intentional. (Well, it will compile if you have an implicit in scope implementing SendAndExpectFeedback[_, _, Unreliable]).

Dependencies

Status

Pull requests are very welcome.

This version is intended to work with Scala 2.11 and Akka 2.3.

Care has been taken to be as reliable as possible, because sending metrics should not impact your application's stability. In particular:

  • Unit test coverage is fairly good. No metrics are available yet, but the only code not tested is the actual socket code (which amounts to a total of 5 lines), for which the different conditions are mocked.
  • All API-visible data structures are immutable and concurrency-friendly
  • Network writes are serialized through Akka actors
  • Exceptions are ignored silently (only logged to the akka event bus)
  • Failed connections are retried at most twice per second

Please see next milestone's open issues list for items pending implementation.

Authors/Licensing

  • (c) 2012-2015 Rached Ben Mustapha [email protected]
  • licensed under the MIT license, please see the LICENSE file for details.
  • thanks to Kyle Kingsbury for Riemann and riemann-java-client
  • thanks to Pavel Minchenkov who started the scala 2.10 port
  • thanks to Michael Allman for the scala 2.11 port, TCP reconnection improvements, float values roundtripping fixes and an UDP connection fix
  • thanks to Matt Sullivan who started the akka 2.3 move
  • thanks to github.com/janlisse for event attributes support

riemann-scala-client's People

Contributors

benmur avatar chris-martin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

riemann-scala-client's Issues

Question about development status

Hi,

are you still actively pursuing this? Or did you let the project silently die? If so, what were the reasons? Did you loose interest or just hit a problem with it?

Jan

Retry failed Writes

Retry failed Writes after reconnecting (with a counter and backoff time, to be gentle)

Metric deserialization does not roundtrip long values

I've run into an issue with roundtripping a Riemann event with a metric of type Long. The scenario is roughly as follows:

  1. I create and send an Event with a metric of type Long.
  2. I query for that event and receive a metric of type Float.
  3. :(

I've debugged the issue and it appears to be caused by a difference in the way riemann-scala-client transcodes metrics versus https://github.com/aphyr/riemann-java-client. Note in particular that when writing a metric value of type Long, the Riemann Java client sets both the int64 and float metric values on the underlying protobuf. Why? I don't know. But it does. Therefore, when the Riemann server answers a query for an event with a long metric, it sets the float value as well.

When riemann-scala-client decodes the event protobuf, it looks for a float value before a long: https://github.com/benmur/riemann-scala-client/blob/master/src/main/scala/net/benmur/riemann/client/Serializers.scala#L53. And therein lies the confusion.

I looked carefully but I actually couldn't find a method in the Riemann Java client code which decodes an event protobuf's metric into a single canonical value. I guess it's up to the user of the library to decide how to decode metric values. However, I did find code within the Riemann Ruby client codebase which decodes an event protobuf's metric: https://github.com/aphyr/riemann-ruby-client/blob/master/lib/riemann/event.rb#L192. It looks for a double value first, then a long, then a float. In that scheme, a long metric encoded by the java client would indeed be decoded as a long (even though the float metric value exists as well in the underlying protobuf).

I've thought about how to rectify this. The easiest fix (by far) is to simply change the metric decoding to work like the java client. However, this would introduce an incompatibility with previous versions. (Perhaps a warning and a bump in version to 0.4.0 would suffice.)

A more ambitious fix would be to support both decoding schemes, at least as a bridge to cross-compatibility: a "legacy" mode preserving the previous decoding behavior and a "standard" or "java" mode emulating the java client behavior. This is something I've sketched out, but it's pretty ambitious and touches a lot of code.

What do you think?

Minimum viable use case does not compile

The following class, taken from the "Minimum viable use case" does not compile. Errors are below.
Using 0.3.3-SNAPSHOT.

package com.example.myproject

import net.benmur.riemann.client._
//import net.benmur.riemann.client.EventSenderDSL._
import RiemannClient._

import akka.actor.ActorSystem
import akka.util.Timeout

import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit._

import com.twitter.inject.Logging

/**
 * Riemann proxy client.
 */
object Metrics extends Logging
{
  private implicit val system = ActorSystem()
  private implicit def timeout = Timeout(5, SECONDS)

  val metrics = riemannConnectAs[Unreliable] to new InetSocketAddress("localhost", 5555)
  service("service name") | state("warning") |>> metrics
}

compile
[info] Compiling 9 Scala sources to /.../target/scala-2.11/classes...
[error] /.../Metrics.scala:23: value |>> is not a member of net.benmur.riemann.client.EventPart
[error] service("service name") | state("warning") |>> metrics
[error] ^
[error] one error found
error Compilation failed
[error] Total time: 2 s, completed Jul 22, 2015 10:56:48 AM

Add support for event attributes

Currently there is no way to send event attribute values (key-value pairs) to Riemann.
Would be nice to have this (The java Proto.Event#Builder already contains setAttributes methods ).
I can create a PR to add support for this if you like.

Never recovers from connection failure

I just restarted my Riemann server and saw my client app enter a failure state wherein every attempt to send data to Riemann failed, logging this:

2016-01-19 04:34:51,895 ERROR n.b.r.c.ReliableIO$TcpConnectionActor akka://foo/user/riemann-tcp-client-1/io - could not send or receive data
java.io.EOFException: null
        at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.7.0_65]
        at net.benmur.riemann.client.ReliableIO$TcpConnectionActor$$anonfun$receive$2.applyOrElse(ReliableIO.scala:90) ~[foo-249.jar:0.0]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467) ~[foo-249.jar:0.0]
        at net.benmur.riemann.client.ReliableIO$TcpConnectionActor.aroundReceive(ReliableIO.scala:79) ~[foo-249.jar:0.0]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) [foo-249.jar:0.0]
        at akka.actor.ActorCell.invoke(ActorCell.scala:487) [foo-249.jar:0.0]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) [foo-249.jar:0.0]
        at akka.dispatch.Mailbox.run(Mailbox.scala:220) [foo-249.jar:0.0]
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) [foo-249.jar:0.0]
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [foo-249.jar:0.0]
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [foo-249.jar:0.0]
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [foo-249.jar:0.0]
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [foo-249.jar:0.0]
2016-01-19 04:34:51,896 INFO  akka.actor.DeadLetterActorRef akka://foo/deadLetters - Message [com.aphyr.riemann.Proto$Msg] from Actor[akka://foo/user/riemann-tcp-client-1/io#916728015] to Actor[akka://foo/deadLetters] was not delivered. [270] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Restarting the client app fixed the problem, leading me to assume that there's some state in the Riemann client library that isn't getting reset after a connection fails.

riemann-scala-client version 0.3.4

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.