Code Monkey home page Code Monkey logo

ketu's People

Contributors

asafch avatar assafadato avatar eladleev avatar evg-tso avatar mdrdannyr avatar yonatane avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ketu's Issues

Not able to pause/resume consumtion

Hello,
I was looking for a way to pause/resume consumption using the library but it seems that this is not supported.
I tried to achive this "manually" by using the underlying kafka consumer and the raw java API, but since the consumer is not thread safe, an ConcurrentModificationException exception is thrown when I attempt to call .pause().
It seems that the only way to achieve this is by calling pause/resume from within the poll loop but this loop is handled by the library itself and cannot be modified.

Sample Code

(let [consumer-source (source/source
                        (chan 10)
                        {:name       "test-consumer"
                         :brokers    "localhost:9092"
                         :topic      "test1"
                         :group-id   "group1"
                         :value-type :string
                         :shape      :value})
      ^KafkaConsumer kafka-consumer (:ketu.source/consumer consumer-source)]
  (Thread/sleep 1000) ; To allow consumer to start polling
  (try
    (.pause kafka-consumer (.assignment kafka-consumer))
    (catch Exception e
      (println e))
    (finally
      (source/stop! consumer-source))))

Output:

2022-10-13 10:12:25 INFO - [o.a.kafka.common.utils.AppInfoParser] Kafka version: 2.5.1  
2022-10-13 10:12:25 INFO - [o.a.kafka.common.utils.AppInfoParser] Kafka commitId: 0efa8fb0f4c73d92  
2022-10-13 10:12:25 INFO - [o.a.kafka.common.utils.AppInfoParser] Kafka startTimeMs: 1665645145541  
2022-10-13 10:12:25 INFO - [ketu.async.source] [source=test-consumer] Start consumer thread  
2022-10-13 10:12:25 INFO - [o.a.k.clients.consumer.KafkaConsumer] [Consumer clientId=consumer-group1-25, groupId=group1] Subscribed to topic(s): test1  
2022-10-13 10:12:25 INFO - [org.apache.kafka.clients.Metadata] [Consumer clientId=consumer-group1-25, groupId=group1] Cluster ID: m86DoQ0mQZW-sSgHvQvrCA  
2022-10-13 10:12:25 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-25, groupId=group1] Discovered group coordinator 10.100.102.8:9092 (id: 2147483647 rack: null)  
2022-10-13 10:12:25 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-25, groupId=group1] (Re-)joining group  
2022-10-13 10:12:25 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-25, groupId=group1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group  
2022-10-13 10:12:25 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-25, groupId=group1] (Re-)joining group  
#error {
 :cause KafkaConsumer is not safe for multi-threaded access
 :via
 [{:type java.util.ConcurrentModificationException
   :message KafkaConsumer is not safe for multi-threaded access
   :at [org.apache.kafka.clients.consumer.KafkaConsumer acquire KafkaConsumer.java 2421]}]
 :trace
 [[org.apache.kafka.clients.consumer.KafkaConsumer acquire KafkaConsumer.java 2421]
  [org.apache.kafka.clients.consumer.KafkaConsumer acquireAndEnsureOpen KafkaConsumer.java 2405]
  [org.apache.kafka.clients.consumer.KafkaConsumer assignment KafkaConsumer.java 899]
  [ketu.example$eval12819 invokeStatic example.clj 12]
  [ketu.example$eval12819 invoke example.clj 7]
  [clojure.lang.Compiler eval Compiler.java 7177]
  [clojure.lang.Compiler eval Compiler.java 7132]
  [clojure.core$eval invokeStatic core.clj 3214]
  [clojure.core$eval invoke core.clj 3210]
  [clojure.main$repl$read_eval_print__9086$fn__9089 invoke main.clj 437]
  [clojure.main$repl$read_eval_print__9086 invoke main.clj 437]
  [clojure.main$repl$fn__9095 invoke main.clj 458]
  [clojure.main$repl invokeStatic main.clj 458]
  [clojure.main$repl doInvoke main.clj 368]
  [clojure.lang.RestFn invoke RestFn.java 1523]
  [nrepl.middleware.interruptible_eval$evaluate invokeStatic interruptible_eval.clj 79]
  [nrepl.middleware.interruptible_eval$evaluate invoke interruptible_eval.clj 55]
  [nrepl.middleware.interruptible_eval$interruptible_eval$fn__923$fn__927 invoke interruptible_eval.clj 142]
  [clojure.lang.AFn run AFn.java 22]
  [nrepl.middleware.session$session_exec$main_loop__1024$fn__1028 invoke session.clj 171]
  [nrepl.middleware.session$session_exec$main_loop__1024 invoke session.clj 170]
  [clojure.lang.AFn run AFn.java 22]
  [java.lang.Thread run Thread.java 832]]}

Not able commit an offset or seek to an offset manually

I am trying to commit offset manually using commit-async! function from ketu.clients.consumer namespace but I am getting this error.

Execution error (ConcurrentModificationException) at org.apache.kafka.clients.consumer.KafkaConsumer/acquire (KafkaConsumer.java:2421).
KafkaConsumer is not safe for multi-threaded access

Sample code

(defn consume-topic
  "Consume message from Kafka"
  [src-chan topic]
  (let [source-opts {:name "topic-consumer"
                     :brokers "localhost:9092"
                     :topic topic
                     :group-id "group-1"
                     :key-type :string
                     :auto-offset-reset "earliest"
                     :value-type :string
                     :internal-config {"enable.auto.commit" false}
                     :shape [:map :key :value :topic :offset]}]
    (source/source src-chan source-opts)))


(defn commit-offset
  "Commit offset"
  [consumer topic offset]
  (c/commit-async! consumer
                   {(TopicPartition. topic 0)
                    (OffsetAndMetadata. offset)}
                   (c/commit-callback (fn [x e]
                                        (prn "comitted" x)))))


(def channel (chan 100))

(def src (consume-topic channel "topic"))

(commit-offset (:ketu.source/consumer ksrc)
                          "topic1"
                          5)

I don't know what is wrong here. Any pointer would great. Thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.