Code Monkey home page Code Monkey logo

kafka-statsd-metrics2's Introduction

Build Status


Send Kafka Metrics to StatsD.


Let us know! If you fork this, or if you use it, or if it helps in anyway, we'd love to hear from you! [email protected]

What is it about?

Kafka uses Yammer Metrics (now part of the Dropwizard project) for metrics reporting in both the server and the client. This can be configured to report stats using pluggable stats reporters to hook up to your monitoring system.

This project provides a simple integration between Kafka and a StatsD reporter for Metrics.

Metrics can be filtered based on the metric name and the metric dimensions (min, max, percentiles, etc).

Supported Kafka versions

  • For Kafka or later use kafka-statsd-metrics2-0.4.0
  • For Kafka or prior use kafka-statsd-metrics2-0.3.0



  • 0.4.0 adds support for tags on metrics. See dogstatsd extensions. If your statsd server does not support tags, you can disable them in the Kafka configuration. See property external.kafka.statsd.tag.enabled below.

  • The statsd client is com.indeed:java-dogstatsd-client:2.0.11.

  • support new MetricNames introduced by kafka 0.8.2.x


  • initial release

How to install?

  • Download or build the shadow jar for kafka-statsd-metrics.
  • Install the jar in Kafka classpath, typically ./kafka_2.9.2-
  • In the Kafka config file,, add the following properties. Default values are in parenthesis.
    # declare the reporter

    # enable the reporter, (false)

    # the host of the StatsD server (localhost)

    # the port of the StatsD server (8125)

    # enable the support of statsd tag extension, e.g. datadog statsd (true)

    # a prefix for all metrics names (empty)
    # note that the StatsD reporter follows the global polling interval (10)
    # kafka.metrics.polling.interval.secs=10

    # A regex to exclude some metrics
    # Default is: (kafka\.consumer\.FetchRequestAndResponseMetrics.*)|(.*ReplicaFetcherThread.*)|(kafka\.server\.FetcherLagMetrics\..*)|(kafka\.log\.Log\..*)|(kafka\.cluster\.Partition\..*)
    # The metric name is formatted with this template:
    # external.kafka.statsd.metrics.exclude_regex=
    # Each metric provides multiple dimensions: min, max, meanRate, etc
    # This might be too much data.
    # It is possible to disable some metric dimensions with the following properties:
    # By default all dimenstions are enabled.
    # external.kafka.statsd.dimension.enabled.count=true
    # external.kafka.statsd.dimension.enabled.meanRate=true
    # external.kafka.statsd.dimension.enabled.rate1m=true
    # external.kafka.statsd.dimension.enabled.rate5m=true
    # external.kafka.statsd.dimension.enabled.rate15m=true
    # external.kafka.statsd.dimension.enabled.min=true
    # external.kafka.statsd.dimension.enabled.max=true
    # external.kafka.statsd.dimension.enabled.mean=true
    # external.kafka.statsd.dimension.enabled.stddev=true
    # external.kafka.statsd.dimension.enabled.median=true
    # external.kafka.statsd.dimension.enabled.p75=true
    # external.kafka.statsd.dimension.enabled.p95=true
    # external.kafka.statsd.dimension.enabled.p98=true
    # external.kafka.statsd.dimension.enabled.p99=true
    # external.kafka.statsd.dimension.enabled.p999=true
  • finally restart the Kafka server

How to test your configuration?

You can check your configuration in different ways:

  • During Kafka startup, the reporter class will be instantiated and initialized. The logs should contain a message similar to: "Kafka Statsd metrics reporter is enabled"
  • A JMX MBean named kafka:type=com.airbnb.kafka.KafkaStatsdMetricsReporter should also exist.
  • Check the logs of your StatsD server
  • Finally, on the configured StatsD host, you could listen on the configured port and check for incoming data:
    # assuming the Statsd server has been stopped...
    $ nc -ul 8125

List of metrics for Kafka 0.8.2

Below are the metrics in Kafka 0.8.2

    | Metrics kind | Metric Name                                                                 | Metric Tags                                                                                                 |
    | Gauge        | kafka.server.ReplicaManager.LeaderCount                                     |                                                                                                             |
    | Gauge        | kafka.server.ReplicaManager.PartitionCount                                  |                                                                                                             |
    | Gauge        | kafka.server.ReplicaManager.UnderReplicatedPartitions                       |                                                                                                             |
    | Gauge        | kafka.controller.KafkaController.ActiveControllerCount                      |                                                                                                             |
    | Gauge        | kafka.controller.KafkaController.OfflinePartitionsCount                     |                                                                                                             |
    | Gauge        | kafka.controller.KafkaController.PreferredReplicaImbalanceCount             |                                                                                                             |
    | Gauge        |                               |                                                                                                             |
    | Gauge        | kafka.server.ReplicaFetcherManager.Replica_MaxLag                           | {"clientId" -> clientId}                                                                                    |
    | Gauge        | kafka.server.ReplicaFetcherManager.Replica_MinFetchRate                     | {"clientId" -> clientId}                                                                                    |
    | Gauge        | kafka.server.FetchRequestPurgatory.PurgatorySize                            |                                                                                                             |
    | Gauge        | kafka.server.FetchRequestPurgatory.NumDelayedRequests                       |                                                                                                             |
    | Gauge        | kafka.server.ProducerRequestPurgatory.PurgatorySize                         |                                                                                                             |
    | Gauge        | kafka.server.ProducerRequestPurgatory.NumDelayedRequests                    |                                                                                                             |
    | Gauge        | kafka.consumer.ConsumerFetcherManager.MaxLag                                | {"clientId" -> clientId}                                                                                    |
    | Gauge        | kafka.consumer.ConsumerFetcherManager.MinFetchRate                          | {"clientId" -> clientId}                                                                                    |
    | Gauge        | kafka.consumer.ZookeeperConsumerConnector.FetchQueueSize                    | {"clientId" -> config.clientId, "topic" -> topic, "threadId" -> thread}                                     |
    | Gauge        |                              | {"Processor" -> i}                                                                                          |
    | Timer        | kafka.log.LogFlushStats.LogFlushRateAndTimeMs                               |                                                                                                             |
    | Meter        | kafka.server.ReplicaManager.IsrExpandsPerSec                                |                                                                                                             |
    | Meter        | kafka.server.ReplicaManager.IsrShrinksPerSec                                |                                                                                                             |
    | Meter        | kafka.server.DelayedFetchRequestMetrics.FollowerExpiresPerSecond            |                                                                                                             |
    | Meter        | kafka.server.DelayedFetchRequestMetrics.ConsumerExpiresPerSecond            |                                                                                                             |
    | Meter        | kafka.controller.ControllerStats.UncleanLeaderElectionsPerSec               |                                                                                                             |
    | Timer        | kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs                |                                                                                                             |
    | Meter        | kafka.producer.ProducerStats.SerializationErrorsPerSec                      | {"clientId" -> clientId}                                                                                    |
    | Meter        | kafka.producer.ProducerStats.ResendsPerSec                                  | {"clientId" -> clientId}                                                                                    |
    | Meter        | kafka.producer.ProducerStats.FailedSendsPerSec                              | {"clientId" -> clientId}                                                                                    |
    | Meter        | kafka.producer.ProducerTopicMetrics.MessagesPerSec_all                      | {"clientId" -> clientId}                                                                                    |
    | Meter        | kafka.producer.ProducerTopicMetrics.BytesPerSec_all                         | {"clientId" -> clientId}                                                                                    |
    | Meter        | kafka.producer.ProducerTopicMetrics.DroppedMessagesPerSec_all               | {"clientId" -> clientId}                                                                                    |
    | Meter        | kafka.producer.ProducerTopicMetrics.MessagesPerSec                          | {"clientId" -> clientId, "topic" -> topic}                                                                  |
    | Meter        | kafka.producer.ProducerTopicMetrics.BytesPerSec                             | {"clientId" -> clientId, "topic" -> topic}                                                                  |
    | Meter        | kafka.producer.ProducerTopicMetrics.DroppedMessagesPerSec                   | {"clientId" -> clientId, "topic" -> topic}                                                                  |
    | Meter        | kafka.server.FetcherStats.RequestsPerSec                                    | {"clientId" -> metricId.clientId, "brokerHost" -> metricId.brokerHost, "brokerPort" -> metricId.brokerPort} |
    | Meter        | kafka.server.FetcherStats.BytesPerSec                                       | {"clientId" -> metricId.clientId, "brokerHost" -> metricId.brokerHost, "brokerPort" -> metricId.brokerPort} |
    | Meter        | kafka.server.BrokerTopicMetrics.MessagesInPerSec_all                        |                                                                                                             |
    | Meter        | kafka.server.BrokerTopicMetrics.BytesInPerSec_all                           |                                                                                                             |
    | Meter        | kafka.server.BrokerTopicMetrics.BytesOutPerSec_all                          |                                                                                                             |
    | Meter        | kafka.server.BrokerTopicMetrics.LogBytesAppendedPerSec_all                  |                                                                                                             |
    | Meter        | kafka.server.BrokerTopicMetrics.FailedProduceRequestsPerSec_all             |                                                                                                             |
    | Meter        | kafka.server.BrokerTopicMetrics.FailedFetchRequestsPerSec_all               |                                                                                                             |
    | Meter        | kafka.server.BrokerTopicMetrics.MessagesInPerSec                            | {"topic" -> topic}                                                                                          |
    | Meter        | kafka.server.BrokerTopicMetrics.BytesInPerSec                               | {"topic" -> topic}                                                                                          |
    | Meter        | kafka.server.BrokerTopicMetrics.BytesOutPerSec                              | {"topic" -> topic}                                                                                          |
    | Meter        | kafka.server.BrokerTopicMetrics.LogBytesAppendedPerSec                      | {"topic" -> topic}                                                                                          |
    | Meter        | kafka.server.BrokerTopicMetrics.FailedProduceRequestsPerSec                 | {"topic" -> topic}                                                                                          |
    | Meter        | kafka.server.BrokerTopicMetrics.FailedFetchRequestsPerSec                   | {"topic" -> topic}                                                                                          |
    | Meter        | kafka.server.DelayedProducerRequestMetrics.ExpiresPerSecond_all             |                                                                                                             |
    | Meter        | kafka.server.DelayedProducerRequestMetrics.ExpiresPerSecond                 | {"topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition}                            |
    | Timer        | kafka.producer.ProducerRequestMetrics.ProducerRequestRateAndTimeMs_all      | {"clientId" -> clientId}                                                                                    |
    | Histogram    | kafka.producer.ProducerRequestMetrics.ProducerRequestSize_all               | {"clientId" -> clientId}                                                                                    |
    | Timer        | kafka.consumer.FetchRequestAndResponseMetrics.FetchRequestRateAndTimeMs_all | {"clientId" -> clientId}                                                                                    |
    | Histogram    | kafka.consumer.FetchRequestAndResponseMetrics.FetchResponseSize_all         | {"clientId" -> clientId}                                                                                    |
    | Timer        | kafka.producer.ProducerRequestMetrics.ProducerRequestRateAndTimeMs          | {"clientId" -> clientId, "brokerHost" -> brokerHost, "brokerPort" -> brokerPort}                            |
    | Histogram    | kafka.producer.ProducerRequestMetrics.ProducerRequestSize                   | {"clientId" -> clientId, "brokerHost" -> brokerHost, "brokerPort" -> brokerPort}                            |
    | Timer        | kafka.consumer.FetchRequestAndResponseMetrics.FetchRequestRateAndTimeMs     | {"clientId" -> clientId, "brokerHost" -> brokerHost, "brokerPort" -> brokerPort}                            |
    | Histogram    | kafka.consumer.FetchRequestAndResponseMetrics.FetchResponseSize             | {"clientId" -> clientId, "brokerHost" -> brokerHost, "brokerPort" -> brokerPort}                            |
    | Meter        | kafka.consumer.ConsumerTopicMetrics.MessagesPerSec_all                      | {"clientId" -> clientId}                                                                                    |
    | Meter        | kafka.consumer.ConsumerTopicMetrics.BytesPerSec_all                         | {"clientId" -> clientId}                                                                                    |
    | Meter        | kafka.consumer.ConsumerTopicMetrics.MessagesPerSec                          | {"clientId" -> clientId, "topic" -> topic}                                                                  |
    | Meter        | kafka.consumer.ConsumerTopicMetrics.BytesPerSec                             | {"clientId" -> clientId, "topic" -> topic}                                                                  |
    | Meter        |                                 | {"request" -> name}                                                                                         |
    | Histogram    |                             | {"request" -> name}                                                                                         |
    | Histogram    |                                    | {"request" -> name}                                                                                         |
    | Histogram    |                                   | {"request" -> name}                                                                                         |
    | Histogram    |                            | {"request" -> name}                                                                                         |
    | Histogram    |                             | {"request" -> name}                                                                                         |
    | Histogram    |                                    | {"request" -> name}                                                                                         |
    | Gauge        | kafka.server.FetcherLagMetrics.ConsumerLag                                  | {"clientId" -> clientId, "topic" -> topic, "partition" -> partitionId}                                      |
    | Gauge        | kafka.producer.async.ProducerSendThread.ProducerQueueSize                   | {"clientId" -> clientId}                                                                                    |
    | Gauge        | kafka.log.Log.NumLogSegments                                                | {"topic" -> topic, "partition" -> partition}                                                                |
    | Gauge        | kafka.log.Log.LogEndOffset                                                  | {"topic" -> topic, "partition" -> partition}                                                                |
    | Gauge        | kafka.cluster.Partition.UnderReplicated                                     | {"topic" -> topic, "partition" -> partitionId}                                                              |
    | Gauge        |                               |                                                                                                             |
    | Meter        |                   |                                                                                                             |
    | Meter        |                                      | {"networkProcessor" -> i.toString}                                                                          |
    | Gauge        | kafka.server.OffsetManager.NumOffsets                                       |                                                                                                             |
    | Gauge        | kafka.server.OffsetManager.NumGroups                                        |                                                                                                             |
    | Gauge        | kafka.consumer.ZookeeperConsumerConnector.OwnedPartitionsCount              | {"clientId" -> config.clientId, "groupId" -> config.groupId}                                                |
    | Gauge        | kafka.consumer.ZookeeperConsumerConnector.OwnedPartitionsCount              | {"clientId" -> config.clientId, "groupId" -> config.groupId, "topic" -> topic}                              |
    | Meter        | kafka.consumer.ZookeeperConsumerConnector.KafkaCommitsPerSec                | {"clientId" -> clientId}                                                                                    |
    | Meter        | kafka.consumer.ZookeeperConsumerConnector.ZooKeeperCommitsPerSec            | {"clientId" -> clientId}                                                                                    |
    | Meter        | kafka.consumer.ZookeeperConsumerConnector.RebalanceRateAndTime              | {"clientId" -> clientId}                                                                                    |
    | Meter        |                   |                                                                                                             |
    | Meter        |                  |                                                                                                             |
    | Histogram    |                        |                                                                                                             |
    | Gauge        | kafka.common.AppInfo.Version                                                |                                                                                                             |
    | Meter        | kafka.server.KafkaRequestHandlerPool.RequestHandlerAvgIdlePercent           |                                                                                                             |
    | Meter        | kafka.util.Throttler."""a input string not with small cardinality"""        |                                                                                                             |
    | Gauge        | kafka.log.LogCleaner.max-buffer-utilization-percent                         |                                                                                                             |
    | Gauge        | kafka.log.LogCleaner.cleaner-recopy-percent                                 |                                                                                                             |
    | Gauge        | kafka.log.LogCleaner.max-clean-time-secs                                    |                                                                                                             |
    | Timer        | other.kafka.FetchThread.fetch-thread                                        |                                                                                                             |
    | Timer        | other.kafka.CommitThread.commit-thread                                      |                                                                                                             |
    | Gauge        | kafka.log.Log.LogStartOffset                                                | {"topic" -> topic, "partition" -> partition}                                                                |
    | Gauge        | kafka.log.Log.Size                                                          | {"topic" -> topic, "partition" -> partition)                                                                |
    | Gauge        | kafka.server.KafkaServer.BrokerState                                        |                                                                                                             |
    | Gauge        | kafka.log.LogCleanerManager.max-dirty-percent                               |                                                                                                             |

Metrics-2.x vs Metrics-3.x

The metrics project has two main versions: v2 and v3. Version 3 is not backward compatible.

Version and, Kafka depends on metrics-2.2.0.

In a future release, Kafka might upgrade to Metrics-3.x. Due to the incompatibilities between Metrics versions, a new Statsd reporter for metrics-3 will be required.
All contributions welcome!

How to build

After cloning the repo, type

    ./gradlew shadowJar

This produces a jar file in build/libs/.

The shallow jar is a standalone jar.

License & Attributions

This project is released under the Apache License Version 2.0 (APLv2).

kafka-statsd-metrics2's People


alexism avatar jun-he avatar


James Cloos avatar satchel avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.