Code Monkey home page Code Monkey logo

kafka-lag-exporter's Introduction

Kafka Lag Exporter gh-release-badge gh-actions-badge license-badge patreon-badge

Monitor Kafka Consumer Group Latency with Kafka Lag Exporter

Overview

Kafka Lag Exporter makes it easy to view the offset lag and calculate an estimate of latency (residence time) of your Apache Kafka consumer groups. It can run anywhere, but it provides features to run easily on Kubernetes clusters against Strimzi Kafka clusters using the Prometheus and Grafana monitoring stack. Kafka Lag Exporter is an Akka Typed application written in Scala.

Kafka Lag Exporter is maintained by Sean Glover (@seglo) and a community of contributors. If you like using this project and would like to support its development, please consider a donation using Patreon.

Kafka Lag Exporter interpolates latency based on observed latest committed offset measurements of consumer groups.

Interpolation

For more information about Kafka Lag Exporter's features see Lightbend's blog post: Monitor Kafka Consumer Group Latency with Kafka Lag Exporter.

Contents

Metrics

Prometheus is a standard way to represent metrics in a modern cross-platform manner. Kafka Lag Exporter exposes several metrics as an HTTP endpoint that can be readily scraped by Prometheus. When installed using Helm and when enabling the Kubernetes pod self-discovery features within Prometheus server, Prometheus server will automatically detect the HTTP endpoint and scrape its data.

kafka_consumergroup_group_offset

Labels: cluster_name, group, topic, partition, member_host, consumer_id, client_id

The offset of the last consumed offset for this partition in this topic partition for this group.

kafka_consumergroup_group_lag

Labels: cluster_name, group, topic, partition, member_host, consumer_id, client_id

The difference between the last produced offset and the last consumed offset for this partition in this topic partition for this group.

kafka_consumergroup_group_lag_seconds

Labels: cluster_name, group, topic, partition, member_host, consumer_id, client_id

The estimated lag in seconds. This metric correlates with lag in offsets. For more information on how this is calculated read the Estimate consumer group lag in time section below.

kafka_consumergroup_group_max_lag

Labels: cluster_name, group, is_simple_consumer

The highest (maximum) lag in offsets for a given consumer group.

kafka_consumergroup_group_max_lag_seconds

Labels: cluster_name, group, is_simple_consumer

The highest (maximum) lag in time for a given consumer group.

kafka_consumergroup_group_sum_lag

Labels: cluster_name, group

The sum of the difference between the last produced offset and the last consumed offset of all partitions for this group.

kafka_consumergroup_group_topic_sum_lag

Labels: cluster_name, group, topic

The sum of the difference between the last produced offset and the last consumed offset of all partitions in this topic for this group.

kafka_partition_latest_offset

Labels: cluster_name, topic, partition

The latest offset available for topic partition. Kafka Lag Exporter will calculate a set of partitions for all consumer groups available and then poll for the last produced offset. The last produced offset is used in the calculation of other metrics provided, so it is exported for informational purposes. For example, the accompanying Grafana dashboard makes use of it to visualize the last produced offset and the last consumed offset in certain panels.

kafka_partition_earliest_offset

Labels: cluster_name, topic, partition

The earliest offset available for topic partition. Kafka Lag Exporter will calculate a set of partitions for all consumer groups available and then poll for the earliest available offset. The earliest available offset is used in the calculation of other metrics provided, so it is exported for informational purposes. For example, the accompanying Grafana dashboard makes use of it to visualize the offset-based volume of a partition in certain panels.

kafka_consumergroup_poll_time_ms

Labels: cluster_name

The time taken to poll (milli seconds) all the information from all consumer groups for every cluster.

Labels

Each metric may include the following labels when reported. If you define the labels property for configuration of a cluster then those labels will also be included. The superset of all labels defined for all cluster configurations are used for each metric. This is due to a restriction in the Java Prometheus client library that only allows us to define one set of labels per metric. Therefore, if the label names across cluster configurations are not consistent then the missing labels for each cluster will appear as blank values ("") in the reported metric. An alternative to defining labels in Kafka Lag Exporter is to define relabeling rules in your Prometheus server configuration.

  • cluster_name - Either the statically defined Kafka cluster name, or the metadata.name of the Strimzi Kafka cluster that was discovered with the Strimzi auto discovery feature.
  • topic - The Kafka topic.
  • partition - The Kafka partition.
  • group - The Kafka consumer group.id.

The rest of the labels are passed along from the consumer group metadata requests.

  • member_host - The hostname or IP of the machine or container running the consumer group member that is assigned this partition.
  • client_id - The id of the consumer group member. This is usually generated automatically by the group coordinator.
  • consumer_id - The globally unique id of the consumer group member. This is usually a combination of the client_id and a GUID generated by the group coordinator.

Prometheus server may add additional labels based on your configuration. For example, Kubernetes pod information about the Kafka Lag Exporter pod where the metrics were scraped from.

Run on Kubernetes

Configuration

Details for configuration for the Helm Chart can be found in the values.yaml file of the accompanying Helm Chart.

Install with Helm

You can install the chart from the chart repository at the following location

helm repo add kafka-lag-exporter https://seglo.github.io/kafka-lag-exporter/repo/
helm repo update

helm install kafka-lag-exporter/kafka-lag-exporter 

Examples

Install with the Strimzi Kafka discovery feature. See Strimzi Kafka Cluster Watcher for more details.

helm install kafka-lag-exporter/kafka-lag-exporter \
  --name kafka-lag-exporter \
  --namespace kafka-lag-exporter \
  --set watchers.strimzi=true

Install with statically defined cluster at the CLI.

helm install kafka-lag-exporter/kafka-lag-exporter \
  --name kafka-lag-exporter \
  --namespace myproject \
  --set clusters\[0\].name=my-cluster \
  --set clusters\[0\].bootstrapBrokers=my-cluster-kafka-bootstrap:9092

Install with statically defined cluster at the CLI, but with a non-default service account assigned to the deployment.

helm install kafka-lag-exporter/kafka-lag-exporter \
  --name kafka-lag-exporter \
  --namespace myproject \
  --set clusters\[0\].name=my-cluster \
  --set clusters\[0\].bootstrapBrokers=my-cluster-kafka-bootstrap:9092 \
  --set serviceAccount.create=true

Install with Redis persistence enabled

helm install kafka-lag-exporter/kafka-lag-exporter \
  --name kafka-lag-exporter \
  --namespace myproject \
  --set redis.enabled=true \
  --set redis.host=myredisserver \
  --set clusters\[0\].name=my-cluster \
  --set clusters\[0\].bootstrapBrokers=my-cluster-kafka-bootstrap.myproject:9092

Run a debug install (DEBUG logging, debug helm chart install, force docker pull policy to Always).

helm repo update  # force refresh chart version
helm install kafka-lag-exporter/kafka-lag-exporter \
  --name kafka-lag-exporter \
  --namespace myproject \
  --set image.pullPolicy=Always \
  --set logLevel=DEBUG \
  --set clusters\[0\].name=my-cluster \
  --set clusters\[0\].bootstrapBrokers=my-cluster-kafka-bootstrap.myproject:9092 \
  --debug

View the health endpoint

To view the Prometheus health endpoint from outside your Kubernetes cluster, use kubectl port-forward.

Ex)

kubectl port-forward service/kafka-lag-exporter-service 8080:8000 --namespace myproject

View exporter logs

To view the logs of the exporter, identify the pod name of the exporter and use the kubectl logs command.

Ex)

kubectl logs {POD_ID} --namespace myproject -f

Run Standalone

To run the project in standalone mode you must first define a configuration application.conf. This configuration must contain at least connection info to your Kafka cluster (kafka-lag-exporter.clusters). All other configuration has defaults defined in the project itself. See reference.conf for defaults.

Reporters

It is possible to report (either one, multiple or all):

  • to influxdb via the config kafka-lag-exporter.reporters.influxdb
  • to graphite via the config kafka-lag-exporter.reporters.graphite
  • as prometheus via the config kafka-lag-exporter.reporters.prometheus

You must also specify the active reporters in the kafka-lag-exporter.sinks config.

See section below for more information.

Configuration

General Configuration (kafka-lag-exporter{})

Key Default Description
reporters.prometheus.port 8000 The port to run the Prometheus endpoint on
reporters.graphite.host None The graphite host to send metrics to (if not set, will not output to graphite)
reporters.graphite.port None The graphite port to send metrics to (if not set, will not output to graphite)
reporters.graphite.prefix None The graphite metric prefix (if not set, prefix will be empty)
reporters.influxdb.endpoint None The influxdb host to send metrics to (if not set, will not output to influxdb)
reporters.influxdb.port None The influxdb port to send metrics to (if not set, will not output to influxdb)
reporters.influxdb.database kafka_lag_exporter The influxdb database to send metrics to
reporters.influxdb.username None The influxdb username to connect (if not set, username will be empty)
reporters.influxdb.password None The influxdb password to connect (if not set, password will be empty)
reporters.influxdb.async true Flag to enable influxdb async non-blocking write mode to send metrics
sinks ["PrometheusEndpointSink"] Specify which reporters must be used to send metrics. Possible values are: PrometheusEndpointSink, InfluxDBPusherSink, GraphiteEndpointSink. (if not set, only Prometheus is activated)
poll-interval 30 seconds How often to poll Kafka for latest and group offsets
lookup-table.memory.size 60 The maximum window size of the in memory look up table per partition
lookup-table.redis {} Configuration for the Redis persistence. This category is optional and will override use of the in memory lookup table if defined
client-group-id kafkalagexporter Consumer group id of kafka-lag-exporter's client connections
kafka-client-timeout 10 seconds Connection timeout when making API calls to Kafka
clusters [] A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature
watchers {} Settings for Kafka cluster "watchers" used for auto-discovery.
metric-whitelist [".*"] Regex of metrics to be exposed via Prometheus endpoint. Eg. [".*_max_lag.*", "kafka_partition_latest_offset"]

Kafka Cluster Connection Details (kafka-lag-exporter.clusters[])

Key Default Required Description
name "" Yes A unique cluster name to for this Kafka connection detail object
bootstrap-brokers "" Yes Kafka bootstrap brokers. Comma delimited list of broker hostnames
group-whitelist [".*"] No A list of Regex of consumer groups monitored. For example, if you only wish to expose certain groups with input and output prefixes, use ["^input-.+", "^output-.+"].
group-blacklist [] No A list of Regex of consumer groups not monitored. For example, if you wish to not expose certain groups, use either ["^unmonitored-group.+"] or ["unmonitored-group1", "unmonitored-group2"].
topic-whitelist [".*"] No A list of Regex of topics monitored. For example, if you only wish to expose certain topics, use either ["^topic.+"] or ["topic1", "topic2"].
topic-blacklist [] No A list of Regex of topics not monitored. For example, if you wish to not expose certain topics, use either ["^unmonitored-topic.+"] or ["unmonitored-topic1", "unmonitored-topic2"].
consumer-properties {} No A map of key value pairs used to configure the KafkaConsumer. See the Consumer Config section of the Kafka documentation for options.
admin-client-properties {} No A map of key value pairs used to configure the AdminClient. See the Admin Config section of the Kafka documentation for options.
labels {} No A map of key value pairs will be set as additional custom labels per cluster for all the metrics in prometheus.

Redis Details (kafka-lag-exporter.lookup-table.redis{})

Key Default Required Description
database 0 No Redis database number.
host "localhost" No Redis server to use.
port 6379 No Redis port to use.
timeout 60 No Redis connection timeout.
prefix "kafka-lag-exporter" No Prefix used by all the keys.
separator ":" No Separator used to build the keys.
retention "1 day" No Retention of the lookup table. Points will get removed from the table after that.
expiration "1 day" No Expiration (TTL) of all the keys.

Watchers (kafka-lag-exporters.watchers{})

Key Default Description
strimzi false Toggle for using Strimzi auto-discovery.

Ex) Expose metrics on port 9999, double the default lookup table size, and define client.id's for the KafkaConsumer and AdminClient used by the project.

kafka-lag-exporter {
  reporters {
    prometheus {
      port = 9999
    }
  }
  lookup-table.memory.size = 120
  clusters = [
    {
      name = "a-cluster"
      bootstrap-brokers = "a-1.cluster-a.xyzcorp.com:9092,a-2.cluster-a.xyzcorp.com:9092,a-3.cluster-a.xyzcorp.com:9092"
      topic-whitelist = [
        "widgets-.+"
      ]
      consumer-properties = {
        client.id = "consumer-client-id"
      }
      admin-client-properties = {
        client.id = "admin-client-id"
      }
      labels = {
        location = "ny"
        zone = "us-east"
      }
    }
  ]
}

Run as Java App

Download the release zip file (kafka-lag-exporter-{VERSION}.zip) from the GitHub release page. Extract its contents and run the ./bin/kafka-lag-exporter shell script.

Ex)

./bin/kafka-lag-exporter \
    -Dconfig.file=/opt/docker/conf/application.conf \ 
    -Dlogback.configurationFile=/opt/docker/conf/logback.xml

Run as Docker Image

Define an application.conf and optionally a logback.xml with your configuration.

Run the Docker image. Expose metrics endpoint on the host port 8000. Mount a config dir with your application.conf and logback.xml into the container.

Ex)

docker run -p 8000:8000 \
    -v $(pwd):/opt/docker/conf/ \
    seglo/kafka-lag-exporter:0.7.0 \
    /opt/docker/bin/kafka-lag-exporter \
    -Dconfig.file=/opt/docker/conf/application.conf \
    -Dlogback.configurationFile=/opt/docker/conf/logback.xml

See full example in ./examples/standalone.

Troubleshooting

If you observe Kafka Lag Exporter reporting odd or inconsistent metric data then before creating an issue please enable DEBUG logging to get raw data consumed from Kafka used to calculate metrics that are exported. If this logging does not help you resolve the problem then include logs, and your application configuration in a new GitHub issue.

Ex)

2020-08-31 16:14:06,478 DEBUG [default-dispatcher-3] [c.l.k.ConsumerGroupCollector$       ]  Received Offsets Snapshot:

Timestamp: 1598904846431
Groups: group-1-1
Earliest Offsets:
  Topic                                                           Partition  Earliest
  topic-1-2                                                       0          0
Latest Offsets:
  Topic                                                           Partition  Offset
  topic-1-2                                                       0          11
Last Group Offsets:
  Group                                                           Topic                                                           Partition  Offset
  group-1-1                                                       topic-1-2                                                       0          5

If installing with Helm then you can enable DEBUG logging with the kafkaLogLevel configuration in the chart's [values.yaml](https://github.com/seglo/kafka-lag-exporter/blob/master/charts/kafka-lag-exporter/values.yaml).

When running in standalone mode you can either define assign the KAFKA_LAG_EXPORTER_KAFKA_LOG_LEVEL environment variable to DEBUG, or override the log level of com.lightbend.kafkalagexporter directly in the logback.xml.

Required Permissions for Kafka ACL

Kafka Lag Exporter (kafka-lag-exporter) requires the DESCRIBE operation permission for consumer groups and topics at the cluster level.

ACLs for principal `User:kafka-lag-exporter`
Current ACLs for resource `Cluster:LITERAL:kafka-cluster`: 
 	User:kafka-lag-exporter has Allow permission for operations: Describe from hosts: * 

Current ACLs for resource `Group:LITERAL:*`: 
 	User:kafka-lag-exporter has Allow permission for operations: Describe from hosts: * 

Current ACLs for resource `Topic:LITERAL:*`: 
 	User:kafka-lag-exporter has Allow permission for operations: Describe from hosts: * 

This can be added using the following command (authorizer-properties depends on the Kafka installation):

kafka-acls --authorizer-properties "zookeeper.connect=localhost:2181" --add --allow-principal "User:kafka-lag-exporter" --operation DESCRIBE --group '*' --topic '*' --cluster

Strimzi Kafka Cluster Watcher

When you install the chart with --set watchers.strimzi=true then the exporter will create a new ClusterRole and ClusterRoleBinding to allow for the automatic discovery of Strimzi Kafka clusters. The exporter will watch for Kafka resources to be created or destroyed. If the cluster already exists, or was created while the exporter was online then it will automatically begin to collect consumer group metadata and export it. If a Kafka resource is destroyed then it will stop collecting consumer group metadata for that cluster.

The exporter will name the cluster the same as Kafka resources metadata.name field.

Monitoring with Grafana

A sample Grafana dashboard is provided in ./grafana/. It can be imported into a Grafana server that is configured with a Prometheus datasource that is reading the Kafka Lag Exporter's Prometheus health endpoint.

The dashboard contains several high level user-configurable variables.

  • Namespace - The namespace of the Kafka Lag Exporter. Only 1 namespace can be selected at a time.
  • Cluster Name - The name of the Kafka cluster. Only 1 cluster name can be selected at a time.
  • Consumer Group - The name of the Consumer Group. This is a multi-select list which allows you to view the dashboard for 1 to All consumer groups.

This dashboard has 4 rows that are described below.

  1. All Consumer Group Lag - A high level set of 4 panels.
  • Consumer Group Max Time Lag
  • Consumer Group Time Lag Top Partitions
  • Consumer Group Max Offset Lag
  • Consumer Group Offset Lag Top Partitions Consumer Group Max Time Lag
  1. Max Consumer Group Time Lag Over Offset Lag - One panel for each consumer group that shows the max lag in time on the left Y axis and max lag in offsets on the right Y axis. Ex) Max Consumer Group Time Lag Over Offset Lag Example
  2. Max Consumer Group Time Lag Over Summed Offsets - One panel for each consumer group that shows the max lag in time on the left Y axis. The right Y axis has the sum of latest and last consumed offsets for all group partitions. Ex) Max Consumer Group Time Lag Over Summed Offsets
  3. Kafka Lag Exporter JVM Metrics - JVM metrics for the Kafka Lag Exporter itself.

Filtering Metrics without Prometheus Server

It's possible to filter specific metric names using HTTP query parameters to the metrics health endpoint.

To filter 1 or more metrics use the query parameter pattern of name[]=prometheus_metric_name.

Ex)

$ curl -X GET -g http://localhost:8080?name[]=kafka_consumergroup_group_max_lag
# HELP kafka_consumergroup_group_max_lag Max group offset lag
# TYPE kafka_consumergroup_group_max_lag gauge
kafka_consumergroup_group_max_lag{cluster_name="pipelines-strimzi",group="variable-throughput-runtime.f3-merge.in01",} 52.0
...

This is an undocumented feature of the Prometheus HTTP server. For reference consult the parseQuery method for the HTTP server in the prometheus/client_java GitHub repository.

Health Check

kafka_consumergroup_poll_time_ms metric exposes the time taken the poll all the consumer group information for every cluster. This can be used as health check endpoint and optionally fail the health check if it's greater than some value (longer than the poll interval) Ex: $ curl -X GET -g http://localhost:8000/metrics?name[]=kafka_consumergroup_poll_time_ms

Development

Tests

Kafka Lag Exporter has unit and integration tests. The integration tests use Alpakka Kafka Testkit to provide an embedded Kafka instance and simulate consumer group lag.

Run all tests with SBT.

sbt test

Testing with local docker-compose.yaml

A Docker Compose cluster with producers and multiple consumer groups is defined in ./docker/docker-compose.yaml. This is useful to manually test the project locally, without K8s infrastructure. These images are based on the popular wurstmeister Apache Kafka Docker images. Confirm you match up the version of these images with the correct version of Kafka you wish to test.

To configure cluster connection info either create an application.conf or pass environment variables.

KAFKA_LAG_EXPORTER_CLUSTERS.0.name=default
KAFKA_LAG_EXPORTER_CLUSTERS.0.bootstrap-brokers=localhost:9094

Remove any previous volume state.

docker-compose rm -f

Start up the cluster in the foreground.

docker-compose up

Building your own Helm Chart

If you want to build your own Helm Chart and accompanying docker images you can override the Docker repository and username with environment variables.

DOCKER_REPOSITORY - A custom Docker repository, such as a private company's docker repository (defaults to DockerHub) DOCKER_USERNAME - A custom Docker username (defaults to seglo)

Run the updateHelmChart sbt task to update the Helm Chart with the appropriate Docker repository and username.

Run the docker:publishLocal sbt task to publish a local Docker image.

Run the docker:publish sbt task to publish the Docker image to the specified Docker repository.

For example, to update the Helm Chart to use a custom docker registry and username and to publish the chart locally.

$ export DOCKER_REPOSITORY="docker.xyzcorp.com"
$ export DOCKER_USERNAME="foobar"
$ sbt updateHelmChart docker:publishLocal
[info] Loading settings for project global-plugins from idea.sbt ...
[info] Loading global plugins from /home/seglo/.sbt/1.0/plugins
[info] Loading settings for project kafka-lag-exporter-build from plugins.sbt ...
[info] Loading project definition from /home/seglo/source/kafka-lag-exporter/project
[info] Loading settings for project kafka-lag-exporter from version.sbt,build.sbt ...
[info] Set current project to kafka-lag-exporter (in build file:/home/seglo/source/kafka-lag-exporter/)
Update Chart.yaml appVersion to 0.4.0-SNAPSHOT and version to 0.4.0
Update values.yaml docker image tag to 0.4.0-SNAPSHOT
Update values.yaml docker repository to docker.xyzcorp.com/foobar/kafka-lag-exporter
...
[info] Successfully built f392402958b7
[info] Successfully tagged docker.xyzcorp.com/foobar/kafka-lag-exporter:0.4.0-SNAPSHOT
[info] Built image docker.xyzcorp.com/foobar/kafka-lag-exporter with tags [0.4.0-SNAPSHOT]
[success] Total time: 17 s, completed 1-May-2019 2:37:28 PM

Deploy the local chart to K8s:

helm install ./charts/kafka-lag-exporter \
  --name kafka-lag-exporter \
  --namespace kafka-lag-exporter \
  --set watchers.strimzi=true \
  --set kafkaLagExporterLogLevel=DEBUG \
  --set image.pullPolicy=Always

Release

The release process is run when a new tag is pushed to the repository. Release steps:

  1. Run doctoc README.md
  2. Update change log docker run -it --rm -v "$(pwd)":/usr/local/src/your-app githubchangeloggenerator/github-changelog-generator -u seglo -p kafka-lag-exporter -t $(cat ~/.ghtoken-personal) --no-unreleased --no-issues --since-tag v0.6.7
  3. Push a new tag git tag -a v0.7.0 -m "v0.7.0" && git push origin --tags

Change log

See CHANGELOG.md

kafka-lag-exporter's People

Contributors

abhishekjiitr avatar anbarasantr avatar github-actions[bot] avatar graphex avatar guillaume-roland-cd avatar jorgelbg avatar judomu avatar julien-lafont avatar khorshuheng avatar killuazhu avatar lilyevsky avatar lukaszkrawiec avatar msravan avatar nbontempo avatar nequissimus avatar nlamirault avatar robsonpeixoto avatar rovernekar avatar ryan-dyer-sp avatar saminahbab avatar scala-steward avatar seglo avatar slachiewicz avatar soloradish avatar sverrehu avatar terjesannum avatar thiagosilvasilveira avatar uishon avatar warprat avatar yazgoo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-lag-exporter's Issues

consumers without stored offsets in partition reported wrongly as lag

exporter and kafka-consumer-groups command line tools give different answers on lag.

When topic is not used anymore, but still exists and is subscribed to by consumer (kafka mirror maker, no events from that).

It is reported by kafka-consumer-groups command line tool like following:
CURRENT-OFFSET=-
LOG-END-OFFSET=1234234
LAG=-

But with this exporter reports it reported as super high consumer lag, 1234234.

This causes confusion as there is nothing consumer can do to fix it.

Select individual metrics

I am looking to use this to build a Kubernetes HPA based on consumer lag.

In order for this to be much easier, it would be nice if I could select a single specific metric.

If something like http://localhost:8000/kafka_consumergroup_group_lag_seconds?cluster_name=dev&topic=high-volume-topic&group=consumer-group returned just those metrics I am querying for, I could easily grab the max value and use that to have Kubernetes decide whether I need more pods. Similarly, I could spin down pods if that number is low.

Support for turning off some metrics

Problem: The amount of data exposed to prometheus grows very large when cluster contains a lot topics (100+) multiplied by partitions count (3...32).

I'd like to turn off some verbose metrics, but keep the ones that show group max. Right now did commented it out from code, but configuration option would rock (I removed LatestOffsetMetric, LastGroupOffsetMetric, TimeLagMetric)

Add blacklist of consumer groups

As mentioned in #4 a consumer group name blacklist in addition to the whitelist would also be useful for ignoring console consumers, for example.

Add /health endpoint

Currently / and /metrics return the full metrics. It'd be handy if there was a simple /health endpoint returning 200 OK to indicate service availability instead of returning metrics at all time.

Metadata poll timer metric

Add a metric to kafka-lag-exporter that reports how long it takes to complete a poll of all information from all consumer groups.

Add option to always collect metrics for all topics

As mentioned in #66, instead of only collecting topic metrics for topic partitions with an active consumer, sometimes it is important to collect metrics for topics without an active consumer, so that unconsumed partition alerts can be generated.

Metrics not updated when a consumer group is not active

Hi @seglo.
I have the same problem => #36
Ex.
I have group consumers which read topic.
image
When I stop read topic I see on the chart empty data(no data)
When I run consumer group lag and I see data on chart
This is very critical becouse in this is time consumer lag not monitoring
any idea?
Version exporter 0.5.1

Poll interval: 10 seconds
Lookup table size: 8192
Prometheus metrics endpoint port: 8000
Admin client consumer group id: kafkalagexporter
Kafka client timeout: 10 seconds
Statically defined Clusters:

  Cluster name: kafka_general
  Cluster Kafka bootstrap brokers: broker-01:9092
     
Watchers:
  Strimzi: false
      
2019-09-16 05:14:11,329 INFO  c.l.k.KafkaClusterManager$ akka://kafka-lag-exporter/user - Cluster Added: 
  Cluster name: kafka_general
  Cluster Kafka bootstrap brokers: broker-01:9092
      
2019-09-16 05:14:11,344 INFO  c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-kafka_general - Spawned ConsumerGroupCollector for cluster: kafka_general 
2019-09-16 05:14:11,355 INFO  c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-kafka_general - Collecting offsets 
2019-09-16 05:14:11,384 INFO  o.a.k.c.admin.AdminClientConfig  - AdminClientConfig values: 
	bootstrap.servers = [broker-01:9092]
	client.dns.lookup = default
	client.id = 
	connections.max.idle.ms = 300000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 10000
	retries = 0
	retry.backoff.ms = 1000
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
 
2019-09-16 05:14:11,537 INFO  o.a.kafka.common.utils.AppInfoParser  - Kafka version: 2.2.1 
2019-09-16 05:14:11,537 INFO  o.a.kafka.common.utils.AppInfoParser  - Kafka commitId: 55783d3133a5a49a 
2019-09-16 05:14:12,105 INFO  o.a.k.c.consumer.ConsumerConfig  - ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [broker-01:9092]
	check.crcs = true
	client.dns.lookup = default
	client.id = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = kafkalagexporter
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 10000
	retry.backoff.ms = 1000
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Setup MicroK8s test harness

Setup microk8s on the travis build so the helm chart can be deployed as part of a full Kubernetes integration test.

Additional Derivative Metrics

Knowing the earliest and latest available offsets allows for several derivative metrics, which would be relatively easy to calculate, and certainly easier to do here than query side. Here are the ones I can think of so far:

  • kafka_partition_volume = kafka_partition_latest_offset - kafka_partition_earliest_offset
  • kafka_consumergroup_buffer = kafka_consumergroup_group_offset - kafka_partition_earliest_offset "The opposite side of lag"
  • kafka_consumergroup_lag_percent = kafka_consumergroup_lag / kafka_partition_volume
  • kafka_consumergroup_buffer_percent = kafka_consumergroup_buffer / kafka_partition_volume
  • kafka_partition_buffer_seconds Applying the same estimation technique as kafka_consumergroup_group_lag_seconds

The caveat is of course that compacted topics, where offsets aren't monotonically increasing, will not factor in the gaps in their offsets. IMHO this is enough of an edge case to just call it out in the metric definitions.

In addition to the above, it can be very useful for summary dashboards to display per-topic rollups, treating each topic/consumer pair (or topic/none pair) as a single metric, essentially just prefixing the metrics with kafka_aggregate_ for the rollup. This won't catch individual partitions being skewed or falling behind on consumption, but is great to have when building a dashboard you can drill down from. Magnifying glass vs microscope :)

not active consumer groups last value still reported under MAX lag/time

When consumer group started consumed and left, the offset

kafka-consumer-groups <- shows group in list, but when described
Consumer group 'name' has no active members.

kafka_consumergroup_group_max_lag
kafka_consumergroup_group_max_lag_seconds

Both show last seen value, which can be huge.

But there is nothing for this group under, etc per partition metrics.
kafka_consumergroup_group_lag

Seems max is not updated when consumer group leaves

Interpolate for less variable consumer lag in time estimates in Kafka Lag Exporter

Today Kafka Lag Exporter simply polls on a small interval and extrapolates lag based on two measurements from the past two intervals. This works reasonably well with a steady consumption pattern, but if consuming “flattens out” during one of those intervals than it can lead to very inflated lag in time estimates.

A solution is to interpolate based on a sliding window of measurements of the latest offset measurement. We capture latest offsets for a partition every interval, then to calculate lag in time we lookup the interval our last read offset is in. We interpolate to find the timestamp for what the offset was between that measured interval, then use it to determine how far behind the current consumer is.

Support for arbitrary Kafka Config

Hi,
I would like to be able to add arbitrary kafka configuration options to the clusters.
I think that would be easier and nicer to implement than explicitly defining a set of supported options.

Example (Support for SSL):

KAFKA_LAG_EXPORTER_CLUSTERS.0.security-protocol=SSL
KAFKA_LAG_EXPORTER_CLUSTERS.0.keystore-location=/var/private/ssl/keystores/client.keys
KAFKA_LAG_EXPORTER_CLUSTERS.0.keystore-password=…
KAFKA_LAG_EXPORTER_CLUSTERS.0.key-password=…
KAFKA_LAG_EXPORTER_CLUSTERS.0.truststore-location=/var/private/ssl/keystores/cluster.keys
KAFKA_LAG_EXPORTER_CLUSTERS.0.truststore-password=…

Open Source project

Before going public

  • Review by Justin

Confirm after making repo public

  • Add CLA validation step
  • Update Travis build status badge to use travis-ci.org
  • Verify GitHub release badge works with latest version

Use config map for Typesafe config

Instead of using environment variables for all configuration options, create a config map with the Helm and substitute chart values there. Mount config map into Kafka Lag Exporter pod.

Kafka lag metric not updated when a consumer group is not active

Hi!
This is a big problem, because you basically don't have info about lag if all your consumer instances (clients) become unavailable.

How to reproduce:

  1. Start a Consumer group with two instances (two clients)
  2. Check lag metric
  3. After some time stop both instances
  4. Check lag metric and observe that after the point when both instances were stopped, there are no more data points

Add a sum of lag per consumer group

It is convenient to produce a sum of the lag per topic/group. This is briefly mentioned in #72.

In my case we have clusters with many hundreds of consumer groups. While we scrape the granular data for our Prometheus instances, we also run a side-car DataDog agent to collect metrics from exporters and push important telemetry into that system. There is extra cost associated with the cardinality of kafka_consumergroup_group_lag, so having this rolled-up at the source is convenient.

Proposing:

kafka_consumergroup_group_total_lag

Labels: cluster_name, group, topic

Support for custom / additional labels

It would be great to have a possibility to specify custom / additional labels which could be specified per cluster.

Example:

kafka-lag-exporter {
  port = 9999
  lookup-table-size = 60
  clusters = [
    {
      name = "us-east_ny_a-cluster"                                   
      bootstrap-brokers = "us-east_ny_a-1.cluster-a.xyzcorp.com:9092,us-east_ny_a-2.cluster-a.xyzcorp.com:9092,us-east_ny_a-3.cluster-a.xyzcorp.com:9092"
      labels {
        location = "ny"
        zone = "us-east"
      }
    }
  ]
}

And this should add to all metrics for this cluster those two labels with given values.

This would simplify the Prometheus job config. Because now Prometheus relabelling tricks must be applied to get this labels.

Wrong lag metrics when consumer groups have similar names

If you have a Consumer group which is called "clickhouse-default-push_user_events_queue_fr5" and after some time (few hours for example) you add a new CG with a similar name: "test-clickhouse-default-push_user_events_queue_fr5", the information about group lag for the first group will be wrong - will actually show lag info for the second - similarly named group.

Here are some screenshots...

Here you can observe a change in lag pattern when the second group was added. And you can see that at 17:05:50 the lag for CG "clickhouse-default-push_user_events_queue_fr5" was 2001.
wrong_lag_when_similar_group_1

Here you can see that the lag for the second (new) CG - "test-clickhouse-default-push_user_events_queue_fr5" is actually 2001 at the same point in time.
wrong_lag_when_similar_group_2

And here is tha lag tracked by Burrow:
wrong_lag_when_similar_group_3
And here you can observe that the actual lag for the first CG ("clickhouse-default-push_user_events_queue_fr5") is actually 223145 at the same time.

So I guess some kind of "contains" is used to find matching CG inside the data structures of your project.

This is manifested when groups start or end with the same substring.

Parser error with SASL authentication with special charcters

Hi,

I'm trying to configure Kafka-lag-exporter using Helm with a Kafka Broker using SASL PLAIN authentication.
My password has + character and I got this error:

kl pod/kafka-lag-exporter-64c7946b98-kd9lc
Exception in thread "main" com.typesafe.config.ConfigException$Parse: /opt/docker/conf/application.conf: 13: Expecting close brace } or a comma, got 'E' ('+' not followed by =, 'E' not allowed after '+') (if you intended 'E' ('+' not followed by =, 'E' not allowed after '+') to be part of a key or string value, try enclosing the key or value in double quotes, or you may be able to rename the file .properties rather than .conf)
        at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseError(ConfigDocumentParser.java:201)
        at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseError(ConfigDocumentParser.java:197)
        at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseObject(ConfigDocumentParser.java:528)
        at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseValue(ConfigDocumentParser.java:247)
        at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.consolidateValues(ConfigDocumentParser.java:152)
        at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseObject(ConfigDocumentParser.java:473)
        at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseValue(ConfigDocumentParser.java:247)
        at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.consolidateValues(ConfigDocumentParser.java:152)
        at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseArray(ConfigDocumentParser.java:551)
        at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseValue(ConfigDocumentParser.java:249)
        at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.consolidateValues(ConfigDocumentParser.java:152)
        at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseObject(ConfigDocumentParser.java:473)
        at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseValue(ConfigDocumentParser.java:247)
        at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parseObject(ConfigDocumentParser.java:458)
        at com.typesafe.config.impl.ConfigDocumentParser$ParseContext.parse(ConfigDocumentParser.java:648)
        at com.typesafe.config.impl.ConfigDocumentParser.parse(ConfigDocumentParser.java:14)
        at com.typesafe.config.impl.Parseable.rawParseValue(Parseable.java:260)
        at com.typesafe.config.impl.Parseable.rawParseValue(Parseable.java:248)
        at com.typesafe.config.impl.Parseable.parseValue(Parseable.java:180)
        at com.typesafe.config.impl.Parseable.parseValue(Parseable.java:174)
        at com.typesafe.config.impl.Parseable.parse(Parseable.java:299)
        at com.typesafe.config.ConfigFactory.parseFile(ConfigFactory.java:689)
        at com.typesafe.config.DefaultConfigLoadingStrategy.parseApplicationConfig(DefaultConfigLoadingStrategy.java:51)
        at com.typesafe.config.ConfigFactory.defaultApplication(ConfigFactory.java:473)
        at com.typesafe.config.ConfigFactory$1.call(ConfigFactory.java:259)
        at com.typesafe.config.ConfigFactory$1.call(ConfigFactory.java:256)
        at com.typesafe.config.impl.ConfigImpl$LoaderCache.getOrElseUpdate(ConfigImpl.java:65)
        at com.typesafe.config.impl.ConfigImpl.computeCachedConfig(ConfigImpl.java:92)
        at com.typesafe.config.ConfigFactory.load(ConfigFactory.java:256)
        at com.typesafe.config.ConfigFactory.load(ConfigFactory.java:232)
        at com.lightbend.kafkalagexporter.MainApp$.start$default$1(MainApp.scala:24)
        at com.lightbend.kafkalagexporter.MainApp$.delayedEndpoint$com$lightbend$kafkalagexporter$MainApp$1(MainApp.scala:16)
        at com.lightbend.kafkalagexporter.MainApp$delayedInit$body.apply(MainApp.scala:15)
        at scala.Function0.apply$mcV$sp(Function0.scala:39)
        at scala.Function0.apply$mcV$sp$(Function0.scala:39)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
        at scala.App.$anonfun$main$1$adapted(App.scala:80)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.App.main(App.scala:80)
        at scala.App.main$(App.scala:78)
        at com.lightbend.kafkalagexporter.MainApp$.main(MainApp.scala:15)
        at com.lightbend.kafkalagexporter.MainApp.main(MainApp.scala)

My values.yaml:

clusters:
 - name: "instance-europe-west1"
   bootstrapBrokers: "broker:9092"
   consumerProperties:
        bootstrap.servers: broker:9092
        security.protocol: SASL_SSL
        sasl.mechanism: PLAIN
        sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password with +";"
        ssl.endpoint.identification.algorithm: https
   # https://kafka.apache.org/documentation/#adminclientconfigs
   adminClientProperties:
        bootstrap.servers: broker:9092
        security.protocol: SASL_SSL
        sasl.mechanism: PLAIN
        sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password with +";"
        ssl.endpoint.identification.algorithm: https

kafkaLogLevel: DEBUG

I've already tried to escape both " and + with , but I got credential error.

Any ideas?
Thanks a lot!

Incorrect kafka_consumergroup_group_offset Metric

All the kafka_consumergroup_group_offset metrics for various consumer groups are all zero which leads to the lag metrics being insanely large (i.e. equal to the latest offsets), which is incorrect as the same metrics are shown correctly using kafka-consumer-groups.sh

Helm Chart used: kafka-lag-exporter-0.4.1
Kafka Version used: 1.1.0
Kafka is deployed on kubernetes, and is not a Strimzi Kafka

Debug logs also look fine, no errors:

2019-06-14 07:45:46,218 WARN  o.a.k.c.admin.AdminClientConfig  - The configuration 'sasl.jaas.config' was supplied but isn't a known config. 

2019-06-14 07:45:46,480 INFO  o.a.kafka.common.utils.AppInfoParser  - Kafka version: 2.2.1 
2019-06-14 07:45:46,480 INFO  o.a.kafka.common.utils.AppInfoParser  - Kafka commitId: 55783d3133a5a49a 
2019-06-14 07:45:46,529 INFO  org.apache.kafka.clients.Metadata  - Cluster ID: 3r4hHZncSxSz0vhHRSRuMA 
2019-06-14 07:45:46,567 DEBUG c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-centrallogging - Updating lookup tables 
2019-06-14 07:45:46,602 DEBUG c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-centrallogging - Reporting offsets 
2019-06-14 07:45:46,657 DEBUG c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-centrallogging - Clearing evicted metrics 
2019-06-14 07:45:46,658 DEBUG c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-centrallogging - Polling in 5 seconds 
2019-06-14 07:45:51,678 DEBUG c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-centrallogging - Collecting offsets 
2019-06-14 07:45:51,776 DEBUG c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-centrallogging - Updating lookup tables 
2019-06-14 07:45:51,779 DEBUG c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-centrallogging - Reporting offsets 
2019-06-14 07:45:51,823 DEBUG c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-centrallogging - Clearing evicted metrics 
2019-06-14 07:45:51,823 DEBUG c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-centrallogging - Polling in 5 seconds 
2019-06-14 07:45:56,838 DEBUG c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-centrallogging - Collecting offsets 

"Invalid negative offset" java.lang.IllegalArgumentException

Couldn't find the exact offset by command line tool

kafka-lag-exporter | 2019-07-24 10:33:24,763 ERROR c.l.k.ConsumerGroupCollector$ akka://kafka-lag-exporter/user/consumer-group-collector-kafka-company-db - Supervisor RestartSupervisor saw failure: A failure occurred while retrieving offsets.  Shutting down. java.lang.Exception: A failure occurred while retrieving offsets.  Shutting down.
kafka-lag-exporter | 	at com.lightbend.kafkalagexporter.ConsumerGroupCollector$CollectorBehavior.$anonfun$collector$1(ConsumerGroupCollector.scala:172)
kafka-lag-exporter | 	at akka.actor.typed.internal.BehaviorImpl$ReceiveBehavior.receive(BehaviorImpl.scala:37)
kafka-lag-exporter | 	at akka.actor.typed.Behavior$.interpret(Behavior.scala:437)
kafka-lag-exporter | 	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:393)
kafka-lag-exporter | 	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:52)
kafka-lag-exporter | 	at akka.actor.typed.internal.RestartSupervisor.aroundReceive(Supervision.scala:248)
kafka-lag-exporter | 	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:79)
kafka-lag-exporter | 	at akka.actor.typed.Behavior$.interpret(Behavior.scala:437)
kafka-lag-exporter | 	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:393)
kafka-lag-exporter | 	at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:121)
kafka-lag-exporter | 	at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:102)
kafka-lag-exporter | 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
kafka-lag-exporter | 	at akka.actor.ActorCell.invoke(ActorCell.scala:581)
kafka-lag-exporter | 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
kafka-lag-exporter | 	at akka.dispatch.Mailbox.run(Mailbox.scala:229)
kafka-lag-exporter | 	at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
kafka-lag-exporter | 	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
kafka-lag-exporter | 	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
kafka-lag-exporter | 	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
kafka-lag-exporter | 	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
kafka-lag-exporter | Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Invalid negative offset
kafka-lag-exporter | 	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
kafka-lag-exporter | 	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
kafka-lag-exporter | 	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
kafka-lag-exporter | 	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
kafka-lag-exporter | 	at com.lightbend.kafkalagexporter.KafkaClient$.$anonfun$kafkaFuture$1(KafkaClient.scala:47)
kafka-lag-exporter | 	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
kafka-lag-exporter | 	at scala.util.Success.$anonfun$map$1(Try.scala:255)
kafka-lag-exporter | 	at scala.util.Success.map(Try.scala:213)
kafka-lag-exporter | 	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
kafka-lag-exporter | 	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
kafka-lag-exporter | 	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
kafka-lag-exporter | 	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
kafka-lag-exporter | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
kafka-lag-exporter | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
kafka-lag-exporter | 	at java.lang.Thread.run(Thread.java:748)
kafka-lag-exporter | Caused by: java.lang.IllegalArgumentException: Invalid negative offset
kafka-lag-exporter | 	at org.apache.kafka.clients.consumer.OffsetAndMetadata.<init>(OffsetAndMetadata.java:50)
kafka-lag-exporter | 	at org.apache.kafka.clients.admin.KafkaAdminClient$23$1.handleResponse(KafkaAdminClient.java:2673)
kafka-lag-exporter | 	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1011)
kafka-lag-exporter | 	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1139)
kafka-lag-exporter | 	... 1 common frames omitted

Capture earliest available offset for topics

Sometimes the best metric for alerting purposes is the percent of the currently available offset range that the lag represents, i.e. OffsetLagMetric / (LatestOffsetMetric - EarliestOffsetMetric). In order to have this available, capture of the earliest offsets for a topic partition needs to be added. In the future, this could also be used to report the estimated retention period for a topic, so that if you've specified retention.bytes you could get a reasonable estimate of the number of seconds it takes to hit the specified byte size of the log.

I'm working on a PR for capturing and reporting EarliestOffsetMetric now, though I'm not tackling the seconds estimation piece yet.

Weirdness with subset of topics

OK, this is a weird one and I thought at first this was a dupe of #37 or #45 but I tried using 0.5.0 and this is still an issue for me.

Here is the situation:

  • Kafka 2.1.0 cluster with 3 brokers
  • kafka-lag-exporter v0.5.0

Some data:

$ kafkacat -b broker1:9092 -L -t topic1
Metadata for topic1 (from broker 1001: broker1:9092/1001):
 3 brokers:
  broker 1001 at broker1:9092
  broker 1003 at broker3:9092
  broker 1002 at broker2:9092
 1 topics:
  topic "topic1" with 1 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001

Metrics with kafka-lag-exporter v0.4.0:

λ curl -s http://kafka-lag:8000\?name\[\]\=kafka_consumergroup_group_max_lag | grep -v 'NaN$' | grep consumer-group1
kafka_consumergroup_group_max_lag{cluster_name="dev",group="consumer-group1",state="Stable",is_simple_consumer="false",} 0.0

With v0.4.1:

λ curl -s http://kafka-lag:8000\?name\[\]\=kafka_consumergroup_group_max_lag | grep -v 'NaN$' | grep consumer-group1
kafka_consumergroup_group_max_lag{cluster_name="dev",group="consumer-group1",} 473869.0

With v0.4.2+ (incl. v0.5.0):

λ curl -s http://kafka-lag:8000\?name\[\]\=kafka_consumergroup_group_max_lag | grep -v 'NaN$' | grep consumer-group1
kafka_consumergroup_group_max_lag{cluster_name="dev",group="consumer-group1",} 73318.0

I cross-referenced the numbers using kafka-view and I can also see in my statsd metrics that the application is currently not busy.
Here is what kafka-view says:

Topic Partition Size Low mark High mark Current offset Lag Lag %
topic1 0 73318 400551 473869 473869 0 0.0%
  • v0.4.0 reports the correct lag
  • v0.4.1 reports the topic's high-mark
  • All versions since (and including) v0.4.2 seem to report the topic size

Here is where it gets fun:
I also have topics in the same cluster that are reported with correct lag in v0.5.0.
But they are wrong in v0.4.0!
The lag in this one should also be 0 because my logs indicate everything has been consumed.

$ kafkacat -b broker1:9092 -L -t topic2
Metadata for topic2 (from broker 1001: broker1:9092/1001):
 3 brokers:
  broker 1001 at broker1:9092
  broker 1003 at broker3:9092
  broker 1002 at broker2:9092
 1 topics:
  topic "topic2" with 16 partitions:
    partition 0, leader 1002, replicas: 1002,1001,1003, isrs: 1002,1003,1001
    partition 5, leader 1001, replicas: 1001,1002,1003, isrs: 1002,1003,1001
    partition 10, leader 1003, replicas: 1003,1001,1002, isrs: 1003,1002,1001
    partition 15, leader 1002, replicas: 1002,1003,1001, isrs: 1002,1003,1001
    partition 13, leader 1003, replicas: 1003,1002,1001, isrs: 1002,1003,1001
    partition 2, leader 1001, replicas: 1001,1003,1002, isrs: 1002,1003,1001
    partition 8, leader 1001, replicas: 1001,1003,1002, isrs: 1003,1002,1001
    partition 12, leader 1002, replicas: 1002,1001,1003, isrs: 1002,1003,1001
    partition 14, leader 1001, replicas: 1001,1003,1002, isrs: 1002,1003,1001
    partition 9, leader 1002, replicas: 1002,1003,1001, isrs: 1002,1003,1001
    partition 11, leader 1001, replicas: 1001,1002,1003, isrs: 1002,1003,1001
    partition 1, leader 1003, replicas: 1003,1002,1001, isrs: 1003,1002,1001
    partition 4, leader 1003, replicas: 1003,1001,1002, isrs: 1002,1003,1001
    partition 6, leader 1002, replicas: 1002,1001,1003, isrs: 1003,1002,1001
    partition 7, leader 1003, replicas: 1003,1002,1001, isrs: 1002,1003,1001
    partition 3, leader 1002, replicas: 1002,1003,1001, isrs: 1003,1002,1001

v0.4.0:

λ curl -s http://broker1:8000\?name\[\]\=kafka_consumergroup_group_max_lag | grep -v 'NaN$' | grep consumer-group2                 
kafka_consumergroup_group_max_lag{cluster_name="dev",group="consumer-group2",state="Stable",is_simple_consumer="false",} 1344.0

v0.4.1:

λ curl -s http://broker1:8000\?name\[\]\=kafka_consumergroup_group_max_lag | grep -v 'NaN$' | grep consumer-group2
kafka_consumergroup_group_max_lag{cluster_name="dev",group="consumer-group2",} 3361.0 

v0.4.2:

λ curl -s http://broker1:8000\?name\[\]\=kafka_consumergroup_group_max_lag | grep -v 'NaN$' | grep consumer-group2
kafka_consumergroup_group_max_lag{cluster_name="dev",group="consumer-group2",} 0.0

Note that each of these consumer groups only consumes a single topic.
And it does not have anything to do with the fact that the first topic is just a single partition and the other one has 16. I have the same situation with other topic.

Metrics are not reset after consumer restart

In my app each consumer gets unique generated client_id. When I restart all consumers, Kafka assigns partitions to their new client_ids, but kafka_consumergroup_group_lag metrics for old client_ids remain in the output until I restart kafka-lag-exporter.

The graph below shows stacked values of kafka_consumergroup_group_lag for each client_id on the first start, after restart of consumers, and then after restart of kafka-lag-exporter:

image

The prometheus query is:

sum(kafka_consumergroup_group_lag) by (client_id)

jaas configuration logged out to console

With default info log level, the provided jaas conf is logged out in plaintext

kafka-lag-exporter | 2019-07-24 10:28:35,231 INFO  c.l.k.KafkaClusterManager$ akka://kafka-lag-exporter/user - Cluster Added: KafkaCluster(kafka-company-db,... org.apache.kafka.common.security.scram.ScramLoginModule required username="!!!!!" password="!!!!!";)

Strimzi watcher fails if cluster name is the same

We have multiple Strimzi clusters in different namespaces, but all have the same cluster name. The Strimzi watcher fails in this case because only the cluster name is used to determine uniqueness. Need to add a differentiator, perhaps $clustername-$namespace?

Prometheus error on second manually configured cluster

When I add a second manually configured cluster initializing the prometheus exporter fails with this error:

java.lang.IllegalArgumentException: Collector already registered that provides name: kafka_partition_latest_offset

Incorrect lag when a consumer group is subscribed to topics with no traffic

If a CG consumes from multiple topics, and if over time some of these topics don't have traffic produced to them - the calculated lag will become huge:
lag_no_traffic

This is a big problem, because if you monitor lag on the group level, you will have wrong info about lag - about lag which basically doesn't exist.

Here is the lag info from Burrow:
lag_no_traffic_burrow

You can see that at the same time when Kafka Lag Exporter metric shows lag, there is no lag.

How to reproduce:

  1. Create two topics with very low retention. TopicA and TopicB.
  2. Start producing to TopicA.
  3. Start a Consumer group which is subscribed to TopicA and TopicB.
  4. Observe lag metric over time and in one point the lag will increase for an order of magnitude on the TopicB. Not sure exactly when this happens, but I think it's connected with retention.ms params on the topic with no traffic, because maybe then Kafka segments are rotated, or something like that.

Wrong metrics

Hey! Sorry to put this here. I know it might not be an issue but I didn't know where else to ask for help :)

First, thank you for the library and helm charts. It's the only one I saw out there to monitor lag, works with Grafana, and has helm charts.

I'm looking at the numbers in the dashboard and nothing makes sense to me. The lag is in hours. It seems that the lag in seconds is incremental. Any idea how I can debug this issue?

Screenshot 2019-06-19 at 6 07 32 PM

Metrics from exporter ``` # HELP process_cpu_seconds_total Total user and system CPU time spent in seconds. # TYPE process_cpu_seconds_total counter process_cpu_seconds_total 84.03 # HELP process_start_time_seconds Start time of the process since unix epoch in seconds. # TYPE process_start_time_seconds gauge process_start_time_seconds 1.560945318082E9 # HELP process_open_fds Number of open file descriptors. # TYPE process_open_fds gauge process_open_fds 69.0 # HELP process_max_fds Maximum number of open file descriptors. # TYPE process_max_fds gauge process_max_fds 65536.0 # HELP process_virtual_memory_bytes Virtual memory size in bytes. # TYPE process_virtual_memory_bytes gauge process_virtual_memory_bytes 3.510411264E9 # HELP process_resident_memory_bytes Resident memory size in bytes. # TYPE process_resident_memory_bytes gauge process_resident_memory_bytes 1.89243392E8 # HELP jvm_memory_bytes_used Used bytes of a given JVM memory area. # TYPE jvm_memory_bytes_used gauge jvm_memory_bytes_used{area="heap",} 1.8873176E7 jvm_memory_bytes_used{area="nonheap",} 5.012472E7 # HELP jvm_memory_bytes_committed Committed (bytes) of a given JVM memory area. # TYPE jvm_memory_bytes_committed gauge jvm_memory_bytes_committed{area="heap",} 6.2980096E7 jvm_memory_bytes_committed{area="nonheap",} 5.24288E7 # HELP jvm_memory_bytes_max Max (bytes) of a given JVM memory area. # TYPE jvm_memory_bytes_max gauge jvm_memory_bytes_max{area="heap",} 9.85268224E8 jvm_memory_bytes_max{area="nonheap",} -1.0 # HELP jvm_memory_bytes_init Initial bytes of a given JVM memory area. # TYPE jvm_memory_bytes_init gauge jvm_memory_bytes_init{area="heap",} 6.5011712E7 jvm_memory_bytes_init{area="nonheap",} 2555904.0 # HELP jvm_memory_pool_bytes_used Used bytes of a given JVM memory pool. # TYPE jvm_memory_pool_bytes_used gauge jvm_memory_pool_bytes_used{pool="Code Cache",} 1.6171648E7 jvm_memory_pool_bytes_used{pool="Metaspace",} 3.0053072E7 jvm_memory_pool_bytes_used{pool="Compressed Class Space",} 3900000.0 jvm_memory_pool_bytes_used{pool="Eden Space",} 453432.0 jvm_memory_pool_bytes_used{pool="Survivor Space",} 347568.0 jvm_memory_pool_bytes_used{pool="Tenured Gen",} 1.8072176E7 # HELP jvm_memory_pool_bytes_committed Committed bytes of a given JVM memory pool. # TYPE jvm_memory_pool_bytes_committed gauge jvm_memory_pool_bytes_committed{pool="Code Cache",} 1.6515072E7 jvm_memory_pool_bytes_committed{pool="Metaspace",} 3.1719424E7 jvm_memory_pool_bytes_committed{pool="Compressed Class Space",} 4194304.0 jvm_memory_pool_bytes_committed{pool="Eden Space",} 1.7432576E7 jvm_memory_pool_bytes_committed{pool="Survivor Space",} 2162688.0 jvm_memory_pool_bytes_committed{pool="Tenured Gen",} 4.3384832E7 # HELP jvm_memory_pool_bytes_max Max bytes of a given JVM memory pool. # TYPE jvm_memory_pool_bytes_max gauge jvm_memory_pool_bytes_max{pool="Code Cache",} 2.5165824E8 jvm_memory_pool_bytes_max{pool="Metaspace",} -1.0 jvm_memory_pool_bytes_max{pool="Compressed Class Space",} 1.073741824E9 jvm_memory_pool_bytes_max{pool="Eden Space",} 2.71843328E8 jvm_memory_pool_bytes_max{pool="Survivor Space",} 3.3947648E7 jvm_memory_pool_bytes_max{pool="Tenured Gen",} 6.79477248E8 # HELP jvm_memory_pool_bytes_init Initial bytes of a given JVM memory pool. # TYPE jvm_memory_pool_bytes_init gauge jvm_memory_pool_bytes_init{pool="Code Cache",} 2555904.0 jvm_memory_pool_bytes_init{pool="Metaspace",} 0.0 jvm_memory_pool_bytes_init{pool="Compressed Class Space",} 0.0 jvm_memory_pool_bytes_init{pool="Eden Space",} 1.7301504E7 jvm_memory_pool_bytes_init{pool="Survivor Space",} 2162688.0 jvm_memory_pool_bytes_init{pool="Tenured Gen",} 4.3384832E7 # HELP kafka_consumergroup_group_max_lag Max group offset lag # TYPE kafka_consumergroup_group_max_lag gauge kafka_consumergroup_group_max_lag{cluster_name="data-dev",group="postgres-orders",} 4490.0 kafka_consumergroup_group_max_lag{cluster_name="data-dev",group="neo4j-orders",} 4490.0 kafka_consumergroup_group_max_lag{cluster_name="data-dev",group="postgres-merchants",} 0.0 kafka_consumergroup_group_max_lag{cluster_name="data-dev",group="connect-topic-to-es",} 4490.0 kafka_consumergroup_group_max_lag{cluster_name="data-dev",group="console-consumer-38236",} 5.0 # HELP kafka_consumergroup_group_offset Last group consumed offset of a partition # TYPE kafka_consumergroup_group_offset gauge kafka_consumergroup_group_offset{cluster_name="data-dev",group="postgres-orders",topic="orders",partition="1",member_host="/10.2.206.77",consumer_id="rdkafka-d0c02698-f920-4d26-a355-e2b7c2480663",client_id="rdkafka",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="connect-topic-to-es",topic="orders",partition="1",member_host="/10.2.183.15",consumer_id="consumer-6-78e7a105-d4dc-43a7-9f45-9267106bf4bd",client_id="consumer-6",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="neo4j-orders",topic="orders",partition="2",member_host="/10.4.248.137",consumer_id="rdkafka-0783a24c-53d6-4403-9c17-55e0583cd7da",client_id="rdkafka",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="connect-topic-to-es",topic="merchants",partition="0",member_host="/10.2.183.15",consumer_id="consumer-5-d97fdf1f-24f7-4773-937b-f49c39fbba90",client_id="consumer-5",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="neo4j-orders",topic="orders",partition="1",member_host="/10.4.248.137",consumer_id="rdkafka-0783a24c-53d6-4403-9c17-55e0583cd7da",client_id="rdkafka",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="neo4j-orders",topic="orders",partition="0",member_host="/10.4.248.137",consumer_id="rdkafka-0783a24c-53d6-4403-9c17-55e0583cd7da",client_id="rdkafka",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="connect-topic-to-es",topic="orders",partition="0",member_host="/10.2.183.15",consumer_id="consumer-5-d97fdf1f-24f7-4773-937b-f49c39fbba90",client_id="consumer-5",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="postgres-orders",topic="orders",partition="2",member_host="/10.2.206.77",consumer_id="rdkafka-f1c2a073-0930-4bed-989c-8d818e64269c",client_id="rdkafka",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="connect-topic-to-es",topic="merchants",partition="1",member_host="/10.2.183.15",consumer_id="consumer-6-78e7a105-d4dc-43a7-9f45-9267106bf4bd",client_id="consumer-6",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="connect-topic-to-es",topic="customers",partition="1",member_host="/10.2.183.15",consumer_id="consumer-6-78e7a105-d4dc-43a7-9f45-9267106bf4bd",client_id="consumer-6",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="connect-topic-to-es",topic="customers",partition="2",member_host="/10.2.183.15",consumer_id="consumer-7-0d1e5e67-009d-433a-930c-3530d9d64369",client_id="consumer-7",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="connect-topic-to-es",topic="customers",partition="0",member_host="/10.2.183.15",consumer_id="consumer-5-d97fdf1f-24f7-4773-937b-f49c39fbba90",client_id="consumer-5",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="postgres-merchants",topic="merchants",partition="1",member_host="/10.2.206.77",consumer_id="rdkafka-ec35d4ce-7612-480a-b5cc-d2e5c6ab553a",client_id="rdkafka",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="connect-topic-to-es",topic="orders",partition="2",member_host="/10.2.183.15",consumer_id="consumer-7-0d1e5e67-009d-433a-930c-3530d9d64369",client_id="consumer-7",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="postgres-merchants",topic="merchants",partition="2",member_host="/10.2.206.77",consumer_id="rdkafka-ec35d4ce-7612-480a-b5cc-d2e5c6ab553a",client_id="rdkafka",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="postgres-merchants",topic="merchants",partition="0",member_host="/10.2.206.77",consumer_id="rdkafka-ec35d4ce-7612-480a-b5cc-d2e5c6ab553a",client_id="rdkafka",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="postgres-orders",topic="orders",partition="0",member_host="/10.2.206.77",consumer_id="rdkafka-0bf8ef54-3c75-4087-812f-a857621b02b3",client_id="rdkafka",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="connect-topic-to-es",topic="merchants",partition="2",member_host="/10.2.183.15",consumer_id="consumer-7-0d1e5e67-009d-433a-930c-3530d9d64369",client_id="consumer-7",} 0.0 kafka_consumergroup_group_offset{cluster_name="data-dev",group="console-consumer-38236",topic="data-dev",partition="0",member_host="/10.0.5.182",consumer_id="consumer-1-badb766f-d7d6-4168-9c97-0cd5cb401a2b",client_id="consumer-1",} 0.0 # HELP jvm_gc_collection_seconds Time spent in a given JVM garbage collector in seconds. # TYPE jvm_gc_collection_seconds summary jvm_gc_collection_seconds_count{gc="Copy",} 147.0 jvm_gc_collection_seconds_sum{gc="Copy",} 0.467 jvm_gc_collection_seconds_count{gc="MarkSweepCompact",} 1.0 jvm_gc_collection_seconds_sum{gc="MarkSweepCompact",} 0.017 # HELP kafka_consumergroup_group_lag Group offset lag of a partition # TYPE kafka_consumergroup_group_lag gauge kafka_consumergroup_group_lag{cluster_name="data-dev",group="postgres-orders",topic="orders",partition="1",member_host="/10.2.206.77",consumer_id="rdkafka-d0c02698-f920-4d26-a355-e2b7c2480663",client_id="rdkafka",} 0.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="connect-topic-to-es",topic="orders",partition="1",member_host="/10.2.183.15",consumer_id="consumer-6-78e7a105-d4dc-43a7-9f45-9267106bf4bd",client_id="consumer-6",} 0.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="neo4j-orders",topic="orders",partition="2",member_host="/10.4.248.137",consumer_id="rdkafka-0783a24c-53d6-4403-9c17-55e0583cd7da",client_id="rdkafka",} 0.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="connect-topic-to-es",topic="merchants",partition="0",member_host="/10.2.183.15",consumer_id="consumer-5-d97fdf1f-24f7-4773-937b-f49c39fbba90",client_id="consumer-5",} 0.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="neo4j-orders",topic="orders",partition="1",member_host="/10.4.248.137",consumer_id="rdkafka-0783a24c-53d6-4403-9c17-55e0583cd7da",client_id="rdkafka",} 0.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="neo4j-orders",topic="orders",partition="0",member_host="/10.4.248.137",consumer_id="rdkafka-0783a24c-53d6-4403-9c17-55e0583cd7da",client_id="rdkafka",} 4490.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="connect-topic-to-es",topic="orders",partition="0",member_host="/10.2.183.15",consumer_id="consumer-5-d97fdf1f-24f7-4773-937b-f49c39fbba90",client_id="consumer-5",} 4490.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="postgres-orders",topic="orders",partition="2",member_host="/10.2.206.77",consumer_id="rdkafka-f1c2a073-0930-4bed-989c-8d818e64269c",client_id="rdkafka",} 0.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="connect-topic-to-es",topic="merchants",partition="1",member_host="/10.2.183.15",consumer_id="consumer-6-78e7a105-d4dc-43a7-9f45-9267106bf4bd",client_id="consumer-6",} 0.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="connect-topic-to-es",topic="customers",partition="1",member_host="/10.2.183.15",consumer_id="consumer-6-78e7a105-d4dc-43a7-9f45-9267106bf4bd",client_id="consumer-6",} 0.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="connect-topic-to-es",topic="customers",partition="2",member_host="/10.2.183.15",consumer_id="consumer-7-0d1e5e67-009d-433a-930c-3530d9d64369",client_id="consumer-7",} 0.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="connect-topic-to-es",topic="customers",partition="0",member_host="/10.2.183.15",consumer_id="consumer-5-d97fdf1f-24f7-4773-937b-f49c39fbba90",client_id="consumer-5",} 0.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="postgres-merchants",topic="merchants",partition="1",member_host="/10.2.206.77",consumer_id="rdkafka-ec35d4ce-7612-480a-b5cc-d2e5c6ab553a",client_id="rdkafka",} 0.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="connect-topic-to-es",topic="orders",partition="2",member_host="/10.2.183.15",consumer_id="consumer-7-0d1e5e67-009d-433a-930c-3530d9d64369",client_id="consumer-7",} 0.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="postgres-merchants",topic="merchants",partition="2",member_host="/10.2.206.77",consumer_id="rdkafka-ec35d4ce-7612-480a-b5cc-d2e5c6ab553a",client_id="rdkafka",} 0.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="postgres-merchants",topic="merchants",partition="0",member_host="/10.2.206.77",consumer_id="rdkafka-ec35d4ce-7612-480a-b5cc-d2e5c6ab553a",client_id="rdkafka",} 0.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="postgres-orders",topic="orders",partition="0",member_host="/10.2.206.77",consumer_id="rdkafka-0bf8ef54-3c75-4087-812f-a857621b02b3",client_id="rdkafka",} 4490.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="connect-topic-to-es",topic="merchants",partition="2",member_host="/10.2.183.15",consumer_id="consumer-7-0d1e5e67-009d-433a-930c-3530d9d64369",client_id="consumer-7",} 0.0 kafka_consumergroup_group_lag{cluster_name="data-dev",group="console-consumer-38236",topic="data-dev",partition="0",member_host="/10.0.5.182",consumer_id="consumer-1-badb766f-d7d6-4168-9c97-0cd5cb401a2b",client_id="consumer-1",} 5.0 # HELP kafka_consumergroup_group_lag_seconds Group time lag of a partition # TYPE kafka_consumergroup_group_lag_seconds gauge kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="postgres-orders",topic="orders",partition="1",member_host="/10.2.206.77",consumer_id="rdkafka-d0c02698-f920-4d26-a355-e2b7c2480663",client_id="rdkafka",} 0.0 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="connect-topic-to-es",topic="orders",partition="1",member_host="/10.2.183.15",consumer_id="consumer-6-78e7a105-d4dc-43a7-9f45-9267106bf4bd",client_id="consumer-6",} 0.0 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="neo4j-orders",topic="orders",partition="2",member_host="/10.4.248.137",consumer_id="rdkafka-0783a24c-53d6-4403-9c17-55e0583cd7da",client_id="rdkafka",} 0.0 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="connect-topic-to-es",topic="merchants",partition="0",member_host="/10.2.183.15",consumer_id="consumer-5-d97fdf1f-24f7-4773-937b-f49c39fbba90",client_id="consumer-5",} 0.0 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="neo4j-orders",topic="orders",partition="1",member_host="/10.4.248.137",consumer_id="rdkafka-0783a24c-53d6-4403-9c17-55e0583cd7da",client_id="rdkafka",} 0.0 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="neo4j-orders",topic="orders",partition="0",member_host="/10.4.248.137",consumer_id="rdkafka-0783a24c-53d6-4403-9c17-55e0583cd7da",client_id="rdkafka",} 8379.677 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="connect-topic-to-es",topic="orders",partition="0",member_host="/10.2.183.15",consumer_id="consumer-5-d97fdf1f-24f7-4773-937b-f49c39fbba90",client_id="consumer-5",} 8379.677 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="postgres-orders",topic="orders",partition="2",member_host="/10.2.206.77",consumer_id="rdkafka-f1c2a073-0930-4bed-989c-8d818e64269c",client_id="rdkafka",} 0.0 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="connect-topic-to-es",topic="merchants",partition="1",member_host="/10.2.183.15",consumer_id="consumer-6-78e7a105-d4dc-43a7-9f45-9267106bf4bd",client_id="consumer-6",} 0.0 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="connect-topic-to-es",topic="customers",partition="1",member_host="/10.2.183.15",consumer_id="consumer-6-78e7a105-d4dc-43a7-9f45-9267106bf4bd",client_id="consumer-6",} 0.0 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="connect-topic-to-es",topic="customers",partition="2",member_host="/10.2.183.15",consumer_id="consumer-7-0d1e5e67-009d-433a-930c-3530d9d64369",client_id="consumer-7",} 0.0 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="connect-topic-to-es",topic="customers",partition="0",member_host="/10.2.183.15",consumer_id="consumer-5-d97fdf1f-24f7-4773-937b-f49c39fbba90",client_id="consumer-5",} 0.0 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="postgres-merchants",topic="merchants",partition="1",member_host="/10.2.206.77",consumer_id="rdkafka-ec35d4ce-7612-480a-b5cc-d2e5c6ab553a",client_id="rdkafka",} 0.0 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="connect-topic-to-es",topic="orders",partition="2",member_host="/10.2.183.15",consumer_id="consumer-7-0d1e5e67-009d-433a-930c-3530d9d64369",client_id="consumer-7",} 0.0 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="postgres-merchants",topic="merchants",partition="2",member_host="/10.2.206.77",consumer_id="rdkafka-ec35d4ce-7612-480a-b5cc-d2e5c6ab553a",client_id="rdkafka",} 0.0 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="postgres-merchants",topic="merchants",partition="0",member_host="/10.2.206.77",consumer_id="rdkafka-ec35d4ce-7612-480a-b5cc-d2e5c6ab553a",client_id="rdkafka",} 0.0 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="postgres-orders",topic="orders",partition="0",member_host="/10.2.206.77",consumer_id="rdkafka-0bf8ef54-3c75-4087-812f-a857621b02b3",client_id="rdkafka",} 8379.677 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="connect-topic-to-es",topic="merchants",partition="2",member_host="/10.2.183.15",consumer_id="consumer-7-0d1e5e67-009d-433a-930c-3530d9d64369",client_id="consumer-7",} 0.0 kafka_consumergroup_group_lag_seconds{cluster_name="data-dev",group="console-consumer-38236",topic="data-dev",partition="0",member_host="/10.0.5.182",consumer_id="consumer-1-badb766f-d7d6-4168-9c97-0cd5cb401a2b",client_id="consumer-1",} 44952.205 # HELP jvm_threads_current Current thread count of a JVM # TYPE jvm_threads_current gauge jvm_threads_current 27.0 # HELP jvm_threads_daemon Daemon thread count of a JVM # TYPE jvm_threads_daemon gauge jvm_threads_daemon 5.0 # HELP jvm_threads_peak Peak thread count of a JVM # TYPE jvm_threads_peak gauge jvm_threads_peak 33.0 # HELP jvm_threads_started_total Started thread count of a JVM # TYPE jvm_threads_started_total counter jvm_threads_started_total 740.0 # HELP jvm_threads_deadlocked Cycles of JVM-threads that are in deadlock waiting to acquire object monitors or ownable synchronizers # TYPE jvm_threads_deadlocked gauge jvm_threads_deadlocked 0.0 # HELP jvm_threads_deadlocked_monitor Cycles of JVM-threads that are in deadlock waiting to acquire object monitors # TYPE jvm_threads_deadlocked_monitor gauge jvm_threads_deadlocked_monitor 0.0 # HELP jvm_threads_state Current count of threads by state # TYPE jvm_threads_state gauge jvm_threads_state{state="RUNNABLE",} 5.0 jvm_threads_state{state="TIMED_WAITING",} 10.0 jvm_threads_state{state="TERMINATED",} 0.0 jvm_threads_state{state="NEW",} 0.0 jvm_threads_state{state="BLOCKED",} 0.0 jvm_threads_state{state="WAITING",} 12.0 # HELP jvm_classes_loaded The number of classes that are currently loaded in the JVM # TYPE jvm_classes_loaded gauge jvm_classes_loaded 4550.0 # HELP jvm_classes_loaded_total The total number of classes that have been loaded since the JVM has started execution # TYPE jvm_classes_loaded_total counter jvm_classes_loaded_total 4550.0 # HELP jvm_classes_unloaded_total The total number of classes that have been unloaded since the JVM has started execution # TYPE jvm_classes_unloaded_total counter jvm_classes_unloaded_total 0.0 # HELP kafka_partition_latest_offset Latest offset of a partition # TYPE kafka_partition_latest_offset gauge kafka_partition_latest_offset{cluster_name="data-dev",topic="customers",partition="2",} 0.0 kafka_partition_latest_offset{cluster_name="data-dev",topic="data-dev",partition="0",} 5.0 kafka_partition_latest_offset{cluster_name="data-dev",topic="merchants",partition="0",} 0.0 kafka_partition_latest_offset{cluster_name="data-dev",topic="customers",partition="1",} 0.0 kafka_partition_latest_offset{cluster_name="data-dev",topic="customers",partition="0",} 0.0 kafka_partition_latest_offset{cluster_name="data-dev",topic="merchants",partition="1",} 0.0 kafka_partition_latest_offset{cluster_name="data-dev",topic="orders",partition="2",} 0.0 kafka_partition_latest_offset{cluster_name="data-dev",topic="orders",partition="1",} 0.0 kafka_partition_latest_offset{cluster_name="data-dev",topic="orders",partition="0",} 4490.0 kafka_partition_latest_offset{cluster_name="data-dev",topic="merchants",partition="2",} 0.0 # HELP jvm_buffer_pool_used_bytes Used bytes of a given JVM buffer pool. # TYPE jvm_buffer_pool_used_bytes gauge jvm_buffer_pool_used_bytes{pool="direct",} 49677.0 jvm_buffer_pool_used_bytes{pool="mapped",} 0.0 # HELP jvm_buffer_pool_capacity_bytes Bytes capacity of a given JVM buffer pool. # TYPE jvm_buffer_pool_capacity_bytes gauge jvm_buffer_pool_capacity_bytes{pool="direct",} 49677.0 jvm_buffer_pool_capacity_bytes{pool="mapped",} 0.0 # HELP jvm_buffer_pool_used_buffers Used buffers of a given JVM buffer pool. # TYPE jvm_buffer_pool_used_buffers gauge jvm_buffer_pool_used_buffers{pool="direct",} 43.0 jvm_buffer_pool_used_buffers{pool="mapped",} 0.0 # HELP kafka_consumergroup_group_max_lag_seconds Max group time lag # TYPE kafka_consumergroup_group_max_lag_seconds gauge kafka_consumergroup_group_max_lag_seconds{cluster_name="data-dev",group="postgres-orders",} 8379.677 kafka_consumergroup_group_max_lag_seconds{cluster_name="data-dev",group="neo4j-orders",} 8379.677 kafka_consumergroup_group_max_lag_seconds{cluster_name="data-dev",group="postgres-merchants",} 0.0 kafka_consumergroup_group_max_lag_seconds{cluster_name="data-dev",group="connect-topic-to-es",} 8379.677 kafka_consumergroup_group_max_lag_seconds{cluster_name="data-dev",group="console-consumer-38236",} 44952.205 # HELP jvm_memory_pool_allocated_bytes_total Total bytes allocated in a given JVM memory pool. Only updated after GC, not continuously. # TYPE jvm_memory_pool_allocated_bytes_total counter jvm_memory_pool_allocated_bytes_total{pool="Eden Space",} 2.510280528E9 jvm_memory_pool_allocated_bytes_total{pool="Code Cache",} 1.6849984E7 jvm_memory_pool_allocated_bytes_total{pool="Compressed Class Space",} 3900000.0 jvm_memory_pool_allocated_bytes_total{pool="Metaspace",} 3.0053072E7 jvm_memory_pool_allocated_bytes_total{pool="Tenured Gen",} 1.8072176E7 jvm_memory_pool_allocated_bytes_total{pool="Survivor Space",} 4902760.0 # HELP jvm_info JVM version info # TYPE jvm_info gauge jvm_info{version="1.8.0_191-b12",vendor="Oracle Corporation",runtime="OpenJDK Runtime Environment",} 1.0 ```

Continue to calculate lag for inactive groups for a configurable timespan

Inspired by discussion in #63

Add a feature that continues to calculate consumer group lag for a group after it's no longer active. Today, we will immediately evict metrics for groups that no longer exist. We detect that a group has been removed by comparing the list of groups returned to the list returned in the last poll. Instead of removing metrics immediately, when we discover that groups no longer exist (they're no longer returned when we retrieve group metadata), we will continue to calculate lag for their last reported partition subscription. When a group is detected as removed it will be added with a timestamp to a removal list that will be cleaned up after each poll. If a group in the removal list exceed a configured time span then it will be removed. If the group becomes active again then the group is removed from the removal list. A default of 30 minutes would be a good value to start with.

How to run / configure outside kubernetes/strimzi

Hello,

I am trying to run the kafka-lag-exporter as a classic stand alone application.
To do that, I've built the project with "sbt clean dist" which has produced a zip file under target/universal. After unzipping I am able to start the exporter with "./bin/kafka-lag-exporter".
My problem is that it is not very clear for me how you can configure a static cluster definition, the documentation stated that it is possible to create an application.conf file but I could not find any documentation on its structure.

Thanks for your help!

Julien

Implement backoff strategy for Kafka connections in Kafka Lag Exporter

When installed in a fresh cluster Kafka Lag Exporter will fail if configured/discovered Kafka clusters cannot be reached. Kafka Lag Exporter is configured to automatically discover Strimzi Kafka clusters by watching for Kafka CRD’s, but at first install it may detect the Kafka CRD before Kafka has finished coming online, and fail. It won’t attempt to connect again.

The workaround right now is to delete the pod and let its deployment recreate it once the Kafka clusters are online. Since Kafka Lag Exporter can support multiple clusters I would like to add a backoff strategy to connection attempts so it will try to connect to clusters indefinitely.

Logs for kafka-lag-exporter pod:

2019-01-17 13:26:33,410 WARN  org.apache.kafka.clients.ClientUtils  - Couldn't resolve server pipelines-strimzi-kafka-bootstrap.lightbend:9092 from bootstrap.servers as DNS resolution failed for pipelines-strimzi-kafka-bootstrap.lightbend
2019-01-17 13:26:33,423 ERROR akka.actor.OneForOneStrategy akka://kafkalagexporterapp/user/consumer-group-collector-pipelines-strimzi - Failed create new KafkaAdminClient
akka.actor.ActorInitializationException: akka://kafkalagexporterapp/user/consumer-group-collector-pipelines-strimzi: exception during creation
        at akka.actor.ActorInitializationException$.apply(Actor.scala:193)
        at akka.actor.ActorCell.create(ActorCell.scala:669)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:523)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:545)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:283)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.kafka.common.KafkaException: Failed create new KafkaAdminClient
        at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:378)
        at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:54)
        at com.lightbend.kafkalagexporter.KafkaClient$.com$lightbend$kafkalagexporter$KafkaClient$$createAdminClient(KafkaClient.scala:42)
        at com.lightbend.kafkalagexporter.KafkaClient.<init>(KafkaClient.scala:71)
        at com.lightbend.kafkalagexporter.KafkaClient$.apply(KafkaClient.scala:18)
        at com.lightbend.kafkalagexporter.MainApp$.$anonfun$clientCreator$1(MainApp.scala:26)
        at com.lightbend.kafkalagexporter.ConsumerGroupCollector$.$anonfun$init$1(ConsumerGroupCollector.scala:47)
        at akka.actor.typed.Behavior$DeferredBehavior$$anon$1.apply(Behavior.scala:219)
        at akka.actor.typed.Behavior$.start(Behavior.scala:300)
        at akka.actor.typed.internal.adapter.ActorAdapter.start(ActorAdapter.scala:145)
        at akka.actor.typed.internal.adapter.ActorAdapter.preStart(ActorAdapter.scala:140)
        at akka.actor.Actor.aroundPreStart(Actor.scala:528)
        at akka.actor.Actor.aroundPreStart$(Actor.scala:528)
        at akka.actor.typed.internal.adapter.ActorAdapter.aroundPreStart(ActorAdapter.scala:21)
        at akka.actor.ActorCell.create(ActorCell.scala:652)
        ... 9 common frames omitted
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
        at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:86)
        at org.apache.kafka.clients.admin.KafkaAdminClient.<init>(KafkaAdminClient.java:417)
        at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:371)
        ... 23 common frames omitted

Factor out Spark Event Exporter into separate projects

Split out spark event exporter into potentially twoone separate project

  1. spark-committer - Only commits offsets to a defined group.id. This complements kafka lag exporter, because when used with a Spark app we can report lag the same way we do any other app.
  2. spark-event-exporter - Simplified version of current exporter which only passes thru event data as Spark custom metrics, but does not calculate lag (use spark-committer for that).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.