appsflyer / ketu Goto Github PK
View Code? Open in Web Editor NEWA clojure kafka client with core.async integration.
License: Other
A clojure kafka client with core.async integration.
License: Other
Hello,
I was looking for a way to pause/resume consumption using the library but it seems that this is not supported.
I tried to achive this "manually" by using the underlying kafka consumer and the raw java API, but since the consumer is not thread safe, an ConcurrentModificationException
exception is thrown when I attempt to call .pause().
It seems that the only way to achieve this is by calling pause/resume from within the poll loop but this loop is handled by the library itself and cannot be modified.
(let [consumer-source (source/source
(chan 10)
{:name "test-consumer"
:brokers "localhost:9092"
:topic "test1"
:group-id "group1"
:value-type :string
:shape :value})
^KafkaConsumer kafka-consumer (:ketu.source/consumer consumer-source)]
(Thread/sleep 1000) ; To allow consumer to start polling
(try
(.pause kafka-consumer (.assignment kafka-consumer))
(catch Exception e
(println e))
(finally
(source/stop! consumer-source))))
Output:
2022-10-13 10:12:25 INFO - [o.a.kafka.common.utils.AppInfoParser] Kafka version: 2.5.1
2022-10-13 10:12:25 INFO - [o.a.kafka.common.utils.AppInfoParser] Kafka commitId: 0efa8fb0f4c73d92
2022-10-13 10:12:25 INFO - [o.a.kafka.common.utils.AppInfoParser] Kafka startTimeMs: 1665645145541
2022-10-13 10:12:25 INFO - [ketu.async.source] [source=test-consumer] Start consumer thread
2022-10-13 10:12:25 INFO - [o.a.k.clients.consumer.KafkaConsumer] [Consumer clientId=consumer-group1-25, groupId=group1] Subscribed to topic(s): test1
2022-10-13 10:12:25 INFO - [org.apache.kafka.clients.Metadata] [Consumer clientId=consumer-group1-25, groupId=group1] Cluster ID: m86DoQ0mQZW-sSgHvQvrCA
2022-10-13 10:12:25 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-25, groupId=group1] Discovered group coordinator 10.100.102.8:9092 (id: 2147483647 rack: null)
2022-10-13 10:12:25 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-25, groupId=group1] (Re-)joining group
2022-10-13 10:12:25 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-25, groupId=group1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2022-10-13 10:12:25 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-25, groupId=group1] (Re-)joining group
#error {
:cause KafkaConsumer is not safe for multi-threaded access
:via
[{:type java.util.ConcurrentModificationException
:message KafkaConsumer is not safe for multi-threaded access
:at [org.apache.kafka.clients.consumer.KafkaConsumer acquire KafkaConsumer.java 2421]}]
:trace
[[org.apache.kafka.clients.consumer.KafkaConsumer acquire KafkaConsumer.java 2421]
[org.apache.kafka.clients.consumer.KafkaConsumer acquireAndEnsureOpen KafkaConsumer.java 2405]
[org.apache.kafka.clients.consumer.KafkaConsumer assignment KafkaConsumer.java 899]
[ketu.example$eval12819 invokeStatic example.clj 12]
[ketu.example$eval12819 invoke example.clj 7]
[clojure.lang.Compiler eval Compiler.java 7177]
[clojure.lang.Compiler eval Compiler.java 7132]
[clojure.core$eval invokeStatic core.clj 3214]
[clojure.core$eval invoke core.clj 3210]
[clojure.main$repl$read_eval_print__9086$fn__9089 invoke main.clj 437]
[clojure.main$repl$read_eval_print__9086 invoke main.clj 437]
[clojure.main$repl$fn__9095 invoke main.clj 458]
[clojure.main$repl invokeStatic main.clj 458]
[clojure.main$repl doInvoke main.clj 368]
[clojure.lang.RestFn invoke RestFn.java 1523]
[nrepl.middleware.interruptible_eval$evaluate invokeStatic interruptible_eval.clj 79]
[nrepl.middleware.interruptible_eval$evaluate invoke interruptible_eval.clj 55]
[nrepl.middleware.interruptible_eval$interruptible_eval$fn__923$fn__927 invoke interruptible_eval.clj 142]
[clojure.lang.AFn run AFn.java 22]
[nrepl.middleware.session$session_exec$main_loop__1024$fn__1028 invoke session.clj 171]
[nrepl.middleware.session$session_exec$main_loop__1024 invoke session.clj 170]
[clojure.lang.AFn run AFn.java 22]
[java.lang.Thread run Thread.java 832]]}
I am trying to commit offset manually using commit-async!
function from ketu.clients.consumer
namespace but I am getting this error.
Execution error (ConcurrentModificationException) at org.apache.kafka.clients.consumer.KafkaConsumer/acquire (KafkaConsumer.java:2421).
KafkaConsumer is not safe for multi-threaded access
(defn consume-topic
"Consume message from Kafka"
[src-chan topic]
(let [source-opts {:name "topic-consumer"
:brokers "localhost:9092"
:topic topic
:group-id "group-1"
:key-type :string
:auto-offset-reset "earliest"
:value-type :string
:internal-config {"enable.auto.commit" false}
:shape [:map :key :value :topic :offset]}]
(source/source src-chan source-opts)))
(defn commit-offset
"Commit offset"
[consumer topic offset]
(c/commit-async! consumer
{(TopicPartition. topic 0)
(OffsetAndMetadata. offset)}
(c/commit-callback (fn [x e]
(prn "comitted" x)))))
(def channel (chan 100))
(def src (consume-topic channel "topic"))
(commit-offset (:ketu.source/consumer ksrc)
"topic1"
5)
I don't know what is wrong here. Any pointer would great. Thanks
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.