Code Monkey home page Code Monkey logo

kt's Issues

Panic when a message gets deleted (using a retention period) while consuming

To reproduce:

  • Start consuming from a topic
  • While consuming, do bin/kafka-configs.sh --zookeeper dockerhost:2181 --alter --entity-type topics --add-config 'retention.ms=1000, cleanup.policy=delete' --entity "thetopicname"
  • After a while, when messages get deleted, you'll get the below error.
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x1277f0f]

goroutine 73 [running]:
main.(*consumeCmd).partitionLoop(0xc4200a0210, 0xc420313d40, 0x147f200, 0xc420314080, 0x7, 0x7fffffffffffffff)
	/private/var/folders/qk/xbjs71jn3pg85_vzg1k2vgf40000gn/T/nix-build-kt-11.1.0.drv-0/go/src/github.com/fgeller/kt/consume.go:396 +0x32f
main.(*consumeCmd).consumePartition(0xc4200a0210, 0xc420313d40, 0xc400000007)
	/private/var/folders/qk/xbjs71jn3pg85_vzg1k2vgf40000gn/T/nix-build-kt-11.1.0.drv-0/go/src/github.com/fgeller/kt/consume.go:352 +0x51b
main.(*consumeCmd).consume.func1(0xc420305c10, 0xc4200a0210, 0xc420313d40, 0x7)
	/private/var/folders/qk/xbjs71jn3pg85_vzg1k2vgf40000gn/T/nix-build-kt-11.1.0.drv-0/go/src/github.com/fgeller/kt/consume.go:318 +0x63
created by main.(*consumeCmd).consume
	/private/var/folders/qk/xbjs71jn3pg85_vzg1k2vgf40000gn/T/nix-build-kt-11.1.0.drv-0/go/src/github.com/fgeller/kt/consume.go:318 +0xea

`exec format error` when running Linux 1.0.0 binary

On Ubuntu 15.04 and Ubuntu 15.10, the following occurs when trying to download and run the Linux 1.0.0 release:

$ wget https://github.com/fgeller/kt/releases/download/v1.0.0/kt-1.0.0-linux.xz
$ xz -d kt-1.0.0-linux.xz
$ ./kt-1.0.0-linux
zsh: exec format error: ./kt-1.0.0-linux

Unreliable behaviour for `kt topic` on some topics

On v9.2.0 (a990512) I'm getting this for some topic on a 0.10 cluster.

$ kt topic -brokers obfuscated:9092 -partitions --filter obfuscated.topic
$ kt topic -brokers obfuscated:9092 -partitions --filter obfuscated.topic
{"name":"obfuscated.topic","partitions":[{"id":0,"oldest":0,"newest":17}]}
$ kt topic -brokers obfuscated:9092 -partitions --filter obfuscated.topic
{"name":"obfuscated.topic","partitions":[{"id":0,"oldest":0,"newest":17}]}
$ kt topic -brokers obfuscated:9092 -partitions --filter obfuscated.topic
$

Add JSON output format

It'd be cool if there's an option to use a JSON output format compatible with https://github.com/echojc/kp so that we can pipe output from kt into kp preserving partitioning and keys, useful for testing, debugging, and transferring data between (test) clusters.

Partitioner argument is not read

The -partitioner option for producer doesn't seem to have any effect. It sounds it is not passed properly to the cmd object in this section of the code

cmd.batch = args.batch
cmd.timeout = args.timeout
cmd.verbose = args.verbose
cmd.pretty = args.pretty
cmd.literal = args.literal
cmd.partition = int32(args.partition)
cmd.version = kafkaVersion(args.version)
cmd.bufferSize = args.bufferSize

Allow producing `null` payload messages

As per https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction you can "delete" a message from a topic with log compation on by sending a message with a null payload.

I tried producing such a message using kt by specifying json

{"key": "1", "value": null, "partition": 0}

but the message produced by kt has an empty string as its payload not null.

On a side note, the kt consume command doesn't seem to differentiate between "" and null message payload for display purposes.

group displays incorrect lag

when offset is sarama.OffsetNewest (-1), the lag is computed incorrectly:

resolve(sarama.OffsetNewest) - offset
= resolve(sarama.OffsetNewest) - sarama.OffsetNewest
= resolve(sarama.OffsetNewest) - (-1)
= resolve(sarama.OffsetNewest) + 1

Allow reading last X messages

Like tail -nX. Even nicer would be tail -f -nX, i.e., read from the last X messages and keep going.

Not sure what the syntax should be . Usually you'd use negative numbers but that overlaps with your current usage of - for range. My inclination would be to change your range operator to something else because negative numbers for reverse offset feels really nice, but yeah, not sure if you want to change that.

Question: Understanding Resetting Partition Offsets

hi there,

I was pointed to this project because I was working on one very similar. The only problem is, I can't seem to get the reset working like yours...yet it looks like im using similar, if not identical code. I was wondering, if you had the time that is, if perhaps a second set of eyes (and a more knowledgable one at that!) could assist me in pointing out where exactly I am missing the functionality, and why?

Thanks ahead of time for any help you may provide!

func setConsumerGroupOffset(cmd *cobra.Command, args []string) {
	var setOffset int64
	var err error
	switch offsetString {
	case "oldest":
		setOffset = sarama.OffsetOldest
	case "newest":
		setOffset = sarama.OffsetNewest
	default:
		setOffset, err = strconv.ParseInt(offsetString, 10, 64)
		if err != nil || setOffset < 0 {
			log.Printf("[ERROR] Can't Parse/Use Given Offset: %v (err: %s)", setOffset, err)
		}
	}

	clientConfig := sarama.NewConfig()
	clientConfig.ClientID = "kafkactl"
	table := uitable.New()
	table.Wrap = true

	clientConn, err := sarama.NewClient(kafkactlCfg.brokers, clientConfig)
	if err != nil {
		log.Fatalf("[ERROR] Unable to connect to Kafka Brokers (err: %s)", err)
	}

	myPartitions, err := getPartitionsByTopic(clientConn, topic)
	if err != nil {
		return
	}
	defer clientConn.Close()

	om, err := sarama.NewOffsetManagerFromClient(consumerGroup, clientConn)
	if err != nil {
		log.Fatalf("[ERROR] Unable to Manage OFfsets (err: %s)", err)
	}
	defer om.Close()

	for _, partition := range myPartitions {
		pom, err := om.ManagePartition(topic, partition)
		if err != nil {
			log.Fatalf("[ERROR] Unable to connect to Offset Manager for Partition %v", partition)
		}
		defer pom.Close()

		pomOffset, pomMetadata := pom.NextOffset()
		log.Printf("[DEBUG] Offset: %v, Metadata: %s", pomOffset, pomMetadata)
		if setOffset == sarama.OffsetNewest || setOffset == sarama.OffsetOldest {
			setOffset, err = clientConn.GetOffset(topic, partition, setOffset)
			if err != nil {
				log.Fatalf("[ERROR] Unable to retrieve offset for topic/partition: %s/%v, err: %s", topic, partition, err)
			}
		}
		log.Printf("Setting offset for topic/partition: %s/%v, offset: %v", topic, partition, setOffset)
		pom.MarkOffset(setOffset, "")
	}

There are no errors in the above code. However, the partitions never get reset in lag. I tried your program and it worked perfect - not sure where im going wrong!

Thanks again,
jbkc85

Delay creating handlers for interrupt signals until after client connects to Kafka

Thanks for making and maintaining kt! ๐Ÿ˜„

At the moment kt starts capturing interrupt signals before the client connects means that ^C while attempting to connect to Kafka will stall kt until the server responds. If the server doesn't respond (e.g., -brokers 8.8.8.8) kt will hang until the connection attempt times out.

You handle this in the topic command by spinning off a goroutine that waits for the interrupt channel to close but the other commands don't have that at the moment.

I'm also not sure that's the right way to handle it? That particular goroutine calls failf, which calls os.Exit, which means deferred functions don't get executed, in which case, is that the same as using the default interrupt handler?

Failed to send message results in kt not responding

Failed to send message, quitting. err=kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date. ^C2016/10/13 09:38:55 Received interrupt - shutting down...

When failing to produce a message, if you try kill kt (ctrl + c) it just hangs forever.

`kt group` doesn't show consumer groups for Kafka v0.10.0.1

Hey Felix :)

We'd like to use kt group to view and manipulate offsets, however the command doesn't show any of our groups on Kafka v0.10.0.1

I've tested the exact same consumer code on Kafka 0.10.1.0 and in that case kt group shows the consumer groups as expected.

Note that we use sarama to create the consumer and mark offsets directly using sarama.PartitionOffsetManager.MarkOffset. We're not using sarama-cluster.

Do you know any reason why kt group wouldn't show all consumer groups for Kafka versions older than 0.10.1.0?

Not able to change the offset

kt group -group enews -topic actor-news -partitions 0
found 1 groups
found 1 topics
failed to read partition offset for topic=actor-news partition=0 err=EOF

i am using kafka version 2.10-0.10.0.1.

i don't know why. can you please help me .

In the kafka logs it shows this error

ERROR Closing socket for 127.0.0.1:9092-127.0.0.1:59373 because of error (kafka.network.Processor)
kafka.network.InvalidRequestException: Error getting request for apiKey: 2 and apiVersion: 1
at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:95)
at kafka.network.RequestChannel$Request.(RequestChannel.scala:87)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:488)
at kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:483)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.processCompletedReceives(SocketServer.scala:483)
at kafka.network.Processor.run(SocketServer.scala:413)

If i update my kafka to kafka_2.10-0.10.1.0 the command worked. but if i reset the consumer using
kt group -group enews -topic actor-news -partitions 0 -reset 1 and then i start consumer it doesn't starting sending me the data from offset 1. can you please explain how to reset it .

Unable to run kt: command not found

Hi,

I tried steps as provided in readme i.e. go get -u github.com/fgeller/kt.
However, I am unable to run the tool and seeing message "bash: kt: command not found" on typing kt in terminal (mac os)
Please help.

pretty print output by default

  • pretty print output by default
  • don't pretty print, when stdout is not a tty
  • add a flag to each command to control/force pretty printing

as triggered by #49

Consumer hangs when trying to consume old offsets on a 0.10 broker

When trying to consume from a topic using offsets older than a certain point the consumer will hang until you kill it. It appears to work fine using offsetsNewest or an absolute value above some point. When trying to consume a topic with 155 log entries and starting with an offset anywhere under roughly 30 would hang. Anything above that would work as normal.

This appears to only affect consuming against 0.10 clusters.

Document `-1-` to consume from latest offset

-offsets -1- to consume all partitions from the end

-offsets 0:-1- to consume partition 0 from the end

The current documentation implies that -1 will consume up to message at offset 1 and doesn't offer an explanation for -1-.

Avro / Schema Registry support

Avro + (optionally) Schema Registry support would be an awesome addition to kt for those of us who can't afford fat JSON payloads.

The Confluent Platform has two command-line tools kafka-avro-console-producer and kafka-avro-console-consumer that either take the schema directly on the command-line, and can also connect to Schema Registry. Similar functionality in kt would be really useful!

E.g.

$ ./bin/kafka-avro-console-producer \
         --broker-list localhost:9092 --topic test \
         --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

support specifying partitions when consuming

0:1-13 should read from partition 0 only and read messages between offsets 1 and 13

:1-13 should read from all partitions and read messages between offsets 1 and 13

0:1-13,6:23-48 should read from partition 0 between 1 and 13 and partition 6 between 23 and 48.

0:1- should read from partition 0 between 1 and whatever comes after

How to set group for consume?

As I see kt consume works without group, so every time it will start from beginning

Is there a way to specify consumer group?

Consuming null message value should be consistent with producing

In reference to issue #24 , while consuming null value message from Kafka, the output json should contains

"value": null

currently it is outputing

"value": ""

It could be convenient for doing a data backup or a data transfer between topics. A current workaround is to use jq tool to replace the empty string by null, but it only works if the topic is not supposed to contain empty string
Here the jq

jq -c '{partition:.partition,key:.key,value:(if (.value|length) > 0 then .value else null end)}'

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.