Code Monkey home page Code Monkey logo

kt's Introduction

kt - a Kafka tool that likes JSON Continuous Integration

Some reasons why you might be interested:

  • Consume messages on specific partitions between specific offsets.
  • Display topic information (e.g., with partition offset and leader info).
  • Modify consumer group offsets (e.g., resetting or manually setting offsets per topic and per partition).
  • JSON output for easy consumption with tools like kp or jq.
  • JSON input to facilitate automation via tools like jsonify.
  • Configure brokers, topic, Kafka version and authentication via environment variables KT_BROKERS, KT_TOPIC, KT_KAFKA_VERSION and KT_AUTH.
  • Fast start up time.
  • No buffering of output.
  • Binary keys and payloads can be passed and presented in base64 or hex encoding.
  • Support for TLS authentication.
  • Basic cluster admin functions: Create & delete topics.

Note

I'm not using kt actively myself anymore, so if you think it's lacking some feature - please let me know by creating an issue.

Examples

Read details about topics that match a regex
$ kt topic -filter news -partitions
{
  "name": "actor-news",
  "partitions": [
    {
      "id": 0,
      "oldest": 0,
      "newest": 0
    }
  ]
}
Produce messages
$ echo 'Alice wins Oscar' | kt produce -topic actor-news -literal
{
  "count": 1,
  "partition": 0,
  "startOffset": 0
}
$ echo 'Bob wins Oscar' | kt produce -topic actor-news -literal
{
  "count": 1,
  "partition": 0,
  "startOffset": 0
}
$ for i in {6..9} ; do echo Bourne sequel $i in production. | kt produce -topic actor-news -literal ;done
{
  "count": 1,
  "partition": 0,
  "startOffset": 1
}
{
  "count": 1,
  "partition": 0,
  "startOffset": 2
}
{
  "count": 1,
  "partition": 0,
  "startOffset": 3
}
{
  "count": 1,
  "partition": 0,
  "startOffset": 4
}
Or pass in JSON object to control key, value and partition
$ echo '{"value": "Terminator terminated", "key": "Arni", "partition": 0}' | kt produce -topic actor-news
{
  "count": 1,
  "partition": 0,
  "startOffset": 5
}
Read messages at specific offsets on specific partitions
$ kt consume -topic actor-news -offsets 0=1:2
{
  "partition": 0,
  "offset": 1,
  "key": "",
  "value": "Bourne sequel 6 in production.",
  "timestamp": "1970-01-01T00:59:59.999+01:00"
}
{
  "partition": 0,
  "offset": 2,
  "key": "",
  "value": "Bourne sequel 7 in production.",
  "timestamp": "1970-01-01T00:59:59.999+01:00"
}
Follow a topic, starting relative to newest offset
$ kt consume -topic actor-news -offsets all=newest-1:
{
  "partition": 0,
  "offset": 4,
  "key": "",
  "value": "Bourne sequel 9 in production.",
  "timestamp": "1970-01-01T00:59:59.999+01:00"
}
{
  "partition": 0,
  "offset": 5,
  "key": "Arni",
  "value": "Terminator terminated",
  "timestamp": "1970-01-01T00:59:59.999+01:00"
}
^Creceived interrupt - shutting down
shutting down partition consumer for partition 0
View offsets for a given consumer group
$ kt group -group enews -topic actor-news -partitions 0
found 1 groups
found 1 topics
{
  "name": "enews",
  "topic": "actor-news",
  "offsets": [
    {
      "partition": 0,
      "offset": 6,
      "lag": 0
    }
  ]
}
Change consumer group offset
$ kt group -group enews -topic actor-news -partitions 0 -reset 1
found 1 groups
found 1 topics
{
  "name": "enews",
  "topic": "actor-news",
  "offsets": [
    {
      "partition": 0,
      "offset": 1,
      "lag": 5
    }
  ]
}
$ kt group -group enews -topic actor-news -partitions 0
found 1 groups
found 1 topics
{
  "name": "enews",
  "topic": "actor-news",
  "offsets": [
    {
      "partition": 0,
      "offset": 1,
      "lag": 5
    }
  ]
}
Create and delete a topic
$ kt admin -createtopic morenews -topicdetail <(jsonify =NumPartitions 1 =ReplicationFactor 1)
$ kt topic -filter news
{
  "name": "morenews"
}
$ kt admin -deletetopic morenews
$ kt topic -filter news
Change broker address via environment variable
$ export KT_BROKERS=brokers.kafka:9092
$ kt <command> <option>

Installation

You can download kt via the Releases section.

Alternatively, the usual way via the go tool, for example:

$ go install github.com/fgeller/kt/v14@latest

Or via Homebrew on OSX:

$ brew tap fgeller/tap
$ brew install kt

Docker

@Paxa maintains an image to run kt in a Docker environment - thanks!

For more information: https://github.com/Paxa/kt

Usage:

$ kt -help
kt is a tool for Kafka.

Usage:

        kt command [arguments]

The commands are:

        consume        consume messages.
        produce        produce messages.
        topic          topic information.
        group          consumer group information and modification.
        admin          basic cluster administration.

Use "kt [command] -help" for for information about the command.

Authentication:

Authentication with Kafka can be configured via a JSON file.
You can set the file name via an "-auth" flag to each command or
set it via the environment variable KT_AUTH.

Authentication / Encryption

Authentication configuration is possibly via a JSON file. You indicate the mode of authentication you need and provide additional information as required for your mode. You pass the path to your configuration file via the -auth flag to each command individually, or set it via the environment variable KT_AUTH.

TLS

Required fields:

  • mode: This needs to be set to TLS
  • client-certificate: Path to your certificate
  • client-certificate-key: Path to your certificate key
  • ca-certificate: Path to your CA certificate

Example for an authorization configuration that is used for the system tests:

{
    "mode": "TLS",
    "client-certificate": "test-secrets/kt-test.crt",
    "client-certificate-key": "test-secrets/kt-test.key",
    "ca-certificate": "test-secrets/snakeoil-ca-1.crt"
}

If any certificate or key path is simply the name of the file, it is assumed to be in the same directory as the auth file itself. For example if the path to the auth file is /some/dir/kt-auth.json then a "client-certificate": "kt-test.crt" will be qualified to /some/dir/kt-test.crt.

TLS one-way

Required fields:

  • mode: This needs to be set to TLS-1way

Optional fields:

  • ca-certificate: Path to your CA certificate

Example:

{
    "mode": "TLS-1way"
}

Other modes

Please create an issue with details for the mode that you need.

kt's People

Contributors

bo0rsh201 avatar contrun avatar disq avatar ducnt114 avatar ebati avatar echojc avatar empeje avatar fgeller avatar henzenvandijk avatar henzenvd avatar jackhopner avatar jackyzhen avatar jvansanten avatar mponton avatar pd avatar rogpeppe avatar rwaweber avatar swagile1 avatar testwill avatar tyranron avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kt's Issues

Consuming null message value should be consistent with producing

In reference to issue #24 , while consuming null value message from Kafka, the output json should contains

"value": null

currently it is outputing

"value": ""

It could be convenient for doing a data backup or a data transfer between topics. A current workaround is to use jq tool to replace the empty string by null, but it only works if the topic is not supposed to contain empty string
Here the jq

jq -c '{partition:.partition,key:.key,value:(if (.value|length) > 0 then .value else null end)}'

Unable to run kt: command not found

Hi,

I tried steps as provided in readme i.e. go get -u github.com/fgeller/kt.
However, I am unable to run the tool and seeing message "bash: kt: command not found" on typing kt in terminal (mac os)
Please help.

Allow producing `null` payload messages

As per https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction you can "delete" a message from a topic with log compation on by sending a message with a null payload.

I tried producing such a message using kt by specifying json

{"key": "1", "value": null, "partition": 0}

but the message produced by kt has an empty string as its payload not null.

On a side note, the kt consume command doesn't seem to differentiate between "" and null message payload for display purposes.

Question: Understanding Resetting Partition Offsets

hi there,

I was pointed to this project because I was working on one very similar. The only problem is, I can't seem to get the reset working like yours...yet it looks like im using similar, if not identical code. I was wondering, if you had the time that is, if perhaps a second set of eyes (and a more knowledgable one at that!) could assist me in pointing out where exactly I am missing the functionality, and why?

Thanks ahead of time for any help you may provide!

func setConsumerGroupOffset(cmd *cobra.Command, args []string) {
	var setOffset int64
	var err error
	switch offsetString {
	case "oldest":
		setOffset = sarama.OffsetOldest
	case "newest":
		setOffset = sarama.OffsetNewest
	default:
		setOffset, err = strconv.ParseInt(offsetString, 10, 64)
		if err != nil || setOffset < 0 {
			log.Printf("[ERROR] Can't Parse/Use Given Offset: %v (err: %s)", setOffset, err)
		}
	}

	clientConfig := sarama.NewConfig()
	clientConfig.ClientID = "kafkactl"
	table := uitable.New()
	table.Wrap = true

	clientConn, err := sarama.NewClient(kafkactlCfg.brokers, clientConfig)
	if err != nil {
		log.Fatalf("[ERROR] Unable to connect to Kafka Brokers (err: %s)", err)
	}

	myPartitions, err := getPartitionsByTopic(clientConn, topic)
	if err != nil {
		return
	}
	defer clientConn.Close()

	om, err := sarama.NewOffsetManagerFromClient(consumerGroup, clientConn)
	if err != nil {
		log.Fatalf("[ERROR] Unable to Manage OFfsets (err: %s)", err)
	}
	defer om.Close()

	for _, partition := range myPartitions {
		pom, err := om.ManagePartition(topic, partition)
		if err != nil {
			log.Fatalf("[ERROR] Unable to connect to Offset Manager for Partition %v", partition)
		}
		defer pom.Close()

		pomOffset, pomMetadata := pom.NextOffset()
		log.Printf("[DEBUG] Offset: %v, Metadata: %s", pomOffset, pomMetadata)
		if setOffset == sarama.OffsetNewest || setOffset == sarama.OffsetOldest {
			setOffset, err = clientConn.GetOffset(topic, partition, setOffset)
			if err != nil {
				log.Fatalf("[ERROR] Unable to retrieve offset for topic/partition: %s/%v, err: %s", topic, partition, err)
			}
		}
		log.Printf("Setting offset for topic/partition: %s/%v, offset: %v", topic, partition, setOffset)
		pom.MarkOffset(setOffset, "")
	}

There are no errors in the above code. However, the partitions never get reset in lag. I tried your program and it worked perfect - not sure where im going wrong!

Thanks again,
jbkc85

`exec format error` when running Linux 1.0.0 binary

On Ubuntu 15.04 and Ubuntu 15.10, the following occurs when trying to download and run the Linux 1.0.0 release:

$ wget https://github.com/fgeller/kt/releases/download/v1.0.0/kt-1.0.0-linux.xz
$ xz -d kt-1.0.0-linux.xz
$ ./kt-1.0.0-linux
zsh: exec format error: ./kt-1.0.0-linux

Not able to change the offset

kt group -group enews -topic actor-news -partitions 0
found 1 groups
found 1 topics
failed to read partition offset for topic=actor-news partition=0 err=EOF

i am using kafka version 2.10-0.10.0.1.

i don't know why. can you please help me .

In the kafka logs it shows this error

ERROR Closing socket for 127.0.0.1:9092-127.0.0.1:59373 because of error (kafka.network.Processor)
kafka.network.InvalidRequestException: Error getting request for apiKey: 2 and apiVersion: 1
at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:95)
at kafka.network.RequestChannel$Request.(RequestChannel.scala:87)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:488)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:483)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.processCompletedReceives(SocketServer.scala:483)
at kafka.network.Processor.run(SocketServer.scala:413)

If i update my kafka to kafka_2.10-0.10.1.0 the command worked. but if i reset the consumer using
kt group -group enews -topic actor-news -partitions 0 -reset 1 and then i start consumer it doesn't starting sending me the data from offset 1. can you please explain how to reset it .

Consumer hangs when trying to consume old offsets on a 0.10 broker

When trying to consume from a topic using offsets older than a certain point the consumer will hang until you kill it. It appears to work fine using offsetsNewest or an absolute value above some point. When trying to consume a topic with 155 log entries and starting with an offset anywhere under roughly 30 would hang. Anything above that would work as normal.

This appears to only affect consuming against 0.10 clusters.

Unreliable behaviour for `kt topic` on some topics

On v9.2.0 (a990512) I'm getting this for some topic on a 0.10 cluster.

$ kt topic -brokers obfuscated:9092 -partitions --filter obfuscated.topic
$ kt topic -brokers obfuscated:9092 -partitions --filter obfuscated.topic
{"name":"obfuscated.topic","partitions":[{"id":0,"oldest":0,"newest":17}]}
$ kt topic -brokers obfuscated:9092 -partitions --filter obfuscated.topic
{"name":"obfuscated.topic","partitions":[{"id":0,"oldest":0,"newest":17}]}
$ kt topic -brokers obfuscated:9092 -partitions --filter obfuscated.topic
$

Delay creating handlers for interrupt signals until after client connects to Kafka

Thanks for making and maintaining kt! ๐Ÿ˜„

At the moment kt starts capturing interrupt signals before the client connects means that ^C while attempting to connect to Kafka will stall kt until the server responds. If the server doesn't respond (e.g., -brokers 8.8.8.8) kt will hang until the connection attempt times out.

You handle this in the topic command by spinning off a goroutine that waits for the interrupt channel to close but the other commands don't have that at the moment.

I'm also not sure that's the right way to handle it? That particular goroutine calls failf, which calls os.Exit, which means deferred functions don't get executed, in which case, is that the same as using the default interrupt handler?

group displays incorrect lag

when offset is sarama.OffsetNewest (-1), the lag is computed incorrectly:

resolve(sarama.OffsetNewest) - offset
= resolve(sarama.OffsetNewest) - sarama.OffsetNewest
= resolve(sarama.OffsetNewest) - (-1)
= resolve(sarama.OffsetNewest) + 1

How to set group for consume?

As I see kt consume works without group, so every time it will start from beginning

Is there a way to specify consumer group?

Add JSON output format

It'd be cool if there's an option to use a JSON output format compatible with https://github.com/echojc/kp so that we can pipe output from kt into kp preserving partitioning and keys, useful for testing, debugging, and transferring data between (test) clusters.

Failed to send message results in kt not responding

Failed to send message, quitting. err=kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date. ^C2016/10/13 09:38:55 Received interrupt - shutting down...

When failing to produce a message, if you try kill kt (ctrl + c) it just hangs forever.

support specifying partitions when consuming

0:1-13 should read from partition 0 only and read messages between offsets 1 and 13

:1-13 should read from all partitions and read messages between offsets 1 and 13

0:1-13,6:23-48 should read from partition 0 between 1 and 13 and partition 6 between 23 and 48.

0:1- should read from partition 0 between 1 and whatever comes after

Document `-1-` to consume from latest offset

-offsets -1- to consume all partitions from the end

-offsets 0:-1- to consume partition 0 from the end

The current documentation implies that -1 will consume up to message at offset 1 and doesn't offer an explanation for -1-.

Avro / Schema Registry support

Avro + (optionally) Schema Registry support would be an awesome addition to kt for those of us who can't afford fat JSON payloads.

The Confluent Platform has two command-line tools kafka-avro-console-producer and kafka-avro-console-consumer that either take the schema directly on the command-line, and can also connect to Schema Registry. Similar functionality in kt would be really useful!

E.g.

$ ./bin/kafka-avro-console-producer \
         --broker-list localhost:9092 --topic test \
         --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

pretty print output by default

  • pretty print output by default
  • don't pretty print, when stdout is not a tty
  • add a flag to each command to control/force pretty printing

as triggered by #49

Panic when a message gets deleted (using a retention period) while consuming

To reproduce:

  • Start consuming from a topic
  • While consuming, do bin/kafka-configs.sh --zookeeper dockerhost:2181 --alter --entity-type topics --add-config 'retention.ms=1000, cleanup.policy=delete' --entity "thetopicname"
  • After a while, when messages get deleted, you'll get the below error.
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x1277f0f]

goroutine 73 [running]:
main.(*consumeCmd).partitionLoop(0xc4200a0210, 0xc420313d40, 0x147f200, 0xc420314080, 0x7, 0x7fffffffffffffff)
	/private/var/folders/qk/xbjs71jn3pg85_vzg1k2vgf40000gn/T/nix-build-kt-11.1.0.drv-0/go/src/github.com/fgeller/kt/consume.go:396 +0x32f
main.(*consumeCmd).consumePartition(0xc4200a0210, 0xc420313d40, 0xc400000007)
	/private/var/folders/qk/xbjs71jn3pg85_vzg1k2vgf40000gn/T/nix-build-kt-11.1.0.drv-0/go/src/github.com/fgeller/kt/consume.go:352 +0x51b
main.(*consumeCmd).consume.func1(0xc420305c10, 0xc4200a0210, 0xc420313d40, 0x7)
	/private/var/folders/qk/xbjs71jn3pg85_vzg1k2vgf40000gn/T/nix-build-kt-11.1.0.drv-0/go/src/github.com/fgeller/kt/consume.go:318 +0x63
created by main.(*consumeCmd).consume
	/private/var/folders/qk/xbjs71jn3pg85_vzg1k2vgf40000gn/T/nix-build-kt-11.1.0.drv-0/go/src/github.com/fgeller/kt/consume.go:318 +0xea

Partitioner argument is not read

The -partitioner option for producer doesn't seem to have any effect. It sounds it is not passed properly to the cmd object in this section of the code

cmd.batch = args.batch
cmd.timeout = args.timeout
cmd.verbose = args.verbose
cmd.pretty = args.pretty
cmd.literal = args.literal
cmd.partition = int32(args.partition)
cmd.version = kafkaVersion(args.version)
cmd.bufferSize = args.bufferSize

`kt group` doesn't show consumer groups for Kafka v0.10.0.1

Hey Felix :)

We'd like to use kt group to view and manipulate offsets, however the command doesn't show any of our groups on Kafka v0.10.0.1

I've tested the exact same consumer code on Kafka 0.10.1.0 and in that case kt group shows the consumer groups as expected.

Note that we use sarama to create the consumer and mark offsets directly using sarama.PartitionOffsetManager.MarkOffset. We're not using sarama-cluster.

Do you know any reason why kt group wouldn't show all consumer groups for Kafka versions older than 0.10.1.0?

Allow reading last X messages

Like tail -nX. Even nicer would be tail -f -nX, i.e., read from the last X messages and keep going.

Not sure what the syntax should be . Usually you'd use negative numbers but that overlaps with your current usage of - for range. My inclination would be to change your range operator to something else because negative numbers for reverse offset feels really nice, but yeah, not sure if you want to change that.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.