Code Monkey home page Code Monkey logo

cues's People

Contributors

zalky avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

Forkers

jamesnyika ghlian

cues's Issues

Non-blocking alts does not remove all watches

cues.queue/alts does not remove all controller and unblock watches. Watches that are not removed pose a minor performance penalty under certain use cases. Queue behaviour and message delivery should be unaffected.

Processor Output: Dynamic Maps causing "Key must be an integer"

Hi
I have some processor code that dynamically builds the output queue maps but it seems to fail on an IllegalArgumentException.

Code

The processor simply does stuff to generate a map with two keys and each key's value is basically a vector of maps.


(defmethod q/processor ::data-splitter
 ;; this processor splits a message across multiple channels
 [ctx input]

 (println "===> Processor: data-spliter")
 (tap> "======input=========")
 (tap> ctx)
 (tap> "======input=========")
 (tap> input)
 ;; need to take the input and split it up
 (let [proc-ctx (:ctx (:config ctx))
       outkeys  (keys (:out (:config ctx)))
       ikey (:receivekey proc-ctx)
       ;; channel keys are the splitkeys
       fullinput (if (nil? ikey) {} (get-in input ikey))
       splitdata (select-keys fullinput outkeys)
       output  (into {} (map vector (keys splitdata) (vals splitdata)))
       ]
   (println "--data split complete--")
   (tap> "----------------->> Output class:")
   (tap> (class output))
   (tap> "----------------->> Output keys:")
   (tap> output)

   output

   ))

The resulting data just looks like so:

   {
      :scheduled_arrivals [ {:flt 1 , :gate "C2} {:flt 2 , :gate "C3} {:flt 3 , :gate "B2}]
     :scheduled_departuress [ {:flt 4 , :gate "D2} {:flt 5 , :gate "G3} {:flt 6 , :gate "T2}]
    }

Error Message

2023-10-27T01:26:31.829Z Aurelius ERROR [cues.queue:1013] - :util.processors/airports-schedule-splitter ({:input :util.cues/airports-schedule-raw} -> {:scheduled_arrivals :util.cues/arrivals, :scheduled_departures :util.cues/departures})
                              �[37mjava.lang.Thread.run�[m  �[32m            Thread.java:  833�[m
�[37mjava.util.concurrent.ThreadPoolExecutor$Worker.run�[m  �[32mThreadPoolExecutor.java:  635�[m
 �[37mjava.util.concurrent.ThreadPoolExecutor.runWorker�[m  �[32mThreadPoolExecutor.java: 1136�[m
               �[37mjava.util.concurrent.FutureTask.run�[m  �[32m        FutureTask.java:  264�[m
                                               �[37m...�[m  �[32m                             �[m
               �[33mclojure.core/binding-conveyor-fn/�[1;33mfn�[m  �[32m               core.clj: 2047�[m
                          �[33mcues.queue/start-join/�[1;33mfn�[m  �[32m              queue.clj: 1204�[m
                         �[33mcues.queue/�[1;33mprocessor-loop�[m  �[32m              
queue.clj: 1160�[m
                         �[33mcues.queue/�[1;33mprocessor-step�[m  �[32m              queue.clj: 1005�[m
                          �[33mcues.queue/�[1;33mprocessor-run�[m  �[32m              queue.clj:  951�[m
                         �[33mcues.queue/�[1;33mdefault-run-fn�[m  �[32m              queue.clj:  941�[m
                             �[33mcues.queue/�[1;33mmerge-meta�[m  �[32m              queue.clj:  921�[m
                         �[33mcues.queue/�[1;33massoc-meta-out�[m  �[32m              queue.clj:  909�[m
                            �[33mclojure.core/�[1;33mreduce-kv�[m  �[32m               core.clj: 6928�[m
                       �[33mclojure.core.protocols/fn/�[1;33mG�[m  �[32m          protocols.clj:  175�[m
                                   �[33mclojure.core/�[1;33mfn�[m  �[32m               core.clj: 6917�[m
                                               �[37m...�[m  �[32m                             �[m
                      �[33mcues.queue/assoc-meta-out/�[1;33mfn�[m  
�[32m              queue.clj:  912�[m
                               �[33mclojure.core/�[1;33mupdate�[m  �[32m               core.clj: 6239�[m
                                �[33mclojure.core/�[1;33massoc�[m  �[32m               core.clj:  193�[m
                                               �[37m...�[m  �[32m                             �[m
�[1;31mjava.lang.IllegalArgumentException�[m: �[3mKey must be integer�[m

What is interesting is that if I build the map myself .. ie. hard code in a map at the end of the processor, it seems to work but I cannot seem to do it dynamically. Any suggestions on how to fix this ?

Thank you for all help, .. in advance!

Multiple non-persistent tailers on one queue do not always unblock

When more than one non-persistent tailer (any tailer without an id) are created on the same queue, only one of them will unblock.

This does not affect either persistent tailers (any tailer that has been given an id), or any tailers that participate in graphs (all tailers that particpate in graphs have an id).

Repro

The following will hang on the last line:

(let [done-1 (promise)
      done-2 (promise)
      q      (q/queue ::tmp)
      t1     (q/tailer q) ; no id given
      t2     (q/tailer q) ; no id given
      a      (q/appender q)]
  (future (deliver done-1 (q/read!! t1)))
  (future (deliver done-2 (q/read!! t2)))
  (Thread/sleep 1)
  (q/write a {:x 1})
  (= @done-2 {:x 1})
  (= @done-1 {:x 1}))

The solution is to ensure all tailers have ids: non-persistent tailers receive ephemeral ids that are not passed along on the ChronicleQueue tailer constructor.

The user facing API does not change.

Oracle Java 20

nREPL server started on port 64179 on host localhost - nrepl://localhost:64179
(def q (q/queue ::my-queue))
Reflection warning, qbits/tape/tailer.clj:88:21 - call to static method sleep on java.lang.Thread can't be resolved (argument types: java.lang.Object).
Reflection warning, qbits/tape/tailer.clj:102:18 - call to static method sleep on java.lang.Thread can't be resolved (argument types: java.lang.Object).
=> nil
Execution error (NoSuchMethodException) at java.lang.Class/getDeclaredMethod (Class.java:2772).
sun.nio.ch.FileChannelImpl.unmap0(long,long)
(def a (q/appender q))
Execution error (AssertionError) at cues.queue/appender (queue.clj:211).
Assert failed: (queue? queue)

Same JVM opts as for Java 17. Same code and deps.edn works on Amazon Corretto Java 17.

Regression in ensure-done

Stateful processor implementation is now stored in a reference, but the new implementation of ensure-done fails to properly dereference.

Getting a single processor to generate multiple messages onto the next queue

Hi
This is a question rather than an issue : How would I (within a single processor - eg. JDBC Database Source processor) that reads 5 records from the database, process all the five records and dump the results of each record processing as a single new output on the following queue for serial processing ?

I hope the question is clear ? a processor generates/reads 5 records and wants to pass them into the next stage of the graph as single records ...

Resultsets from jdbc.next not serializable with cues

Hi
Thanks for this amazing library wrapper. I am trying to create a processor graph that uses jdbc.next from Sean Corfield (https://cljdoc.org/d/com.github.seancorfield/next.jdbc/1.3.874/doc/datafy-nav-and-schema) and it returns functions for datafy and nav in its metadata.
So trying to put a result set (even a plain map but with functions in the metadata) is causing the processors to fail since there are functions in the metadata automatically. I tried to set my graphs to ignore metadata but it does not seem to have an effect :

Here is the graph

(defn pgraph []
    {:id         ::fuel-graph
     :errors     ::graph-errors
     :source     ::source
     :queue-opts {::q/default {:queue-meta false}
                  :util.cues/notification_sources {:queue-meta false}
                  ::source {:queue-meta false}}
     :processors [{:id ::source}
                  {:id :util.processors/jdbc-reader
                   :queue-opts {::q/default {:queue-meta false}}
                   :in {:input ::source}
                   :out {:output :util.cues/notification_sources}}
                  ;; first logger
                  {:id  ::logger1
                   :fn  :util.processors/tap-processor
                   :in  {:input :util.cues/notification_sources}}]})

here is the processor list


(defmethod q/processor ::tap-processor
  ;; this processor taps out a message without change
  [ctx msg]
  (println "===> Processor: tap-processor")

  ;; simply taps out a message
  (tap> ctx)
  (tap> msg)
  nil)


(defmethod q/processor ::jdbc-reader
  [stuff {:keys [input]}]
  ;; uses a connection to read a message and return it
  (println "===> Processor: jdbc-reader")
  (let [c (:conspec input)
        cat (:cat input)
        q (:query input)
        ;; This is a core/async call below that will pull the data. works nicely.
        r  (async/<!! (wg/go-runner (eval (symbol q)) c cat) )
        skeys (map #(keyword (str "r" %)) (range 0 (count r)))
        k (map #(select-keys % [:source_uri :source_type]) r)
        z (zipmap skeys k)
        ]

     ;; execute query and return the results as output
    ;; putting z here fails because it somehow still sees the navify functions
    {:output {:rs z}}))

Here is the failure exception


===> Processor: jdbc-reader - skeys:  (:r0 :r1)
===> Processor: jdbc-reader - k:  ({:source_uri https://www.epra.go.ke/wp-content/uploads/2021/11/15th-November-14th-December-2021-1.csv, :source_type excel_file} {:source_uri https://www.epra.go.ke/wp-content/uploads/2021/11/15th-November-14th-December-2021-1.csv, :source_type excel_file})
===> Processor: jdbc-reader - z:  {:r0 {:source_uri https://www.epra.go.ke/wp-content/uploads/2021/11/15th-November-14th-December-2021-1.csv, :source_type excel_file}, :r1 {:source_uri https://www.epra.go.ke/wp-content/uploads/2021/11/15th-November-14th-December-2021-1.csv, :source_type excel_file}}
ERROR [cues.queue:1013] - :util.processors/jdbc-reader ({:input :util.cues/source} -> {:output :util.cues/notification_sources})
                              �[37mjava.lang.Thread.run�[m  �[32m            Thread.java:  833�[m
�[37mjava.util.concurrent.ThreadPoolExecutor$Worker.run�[m  �[32mThreadPoolExecutor.java:  635�[m
 �[37mjava.util.concurrent.ThreadPoolExecutor.runWorker�[m  �[32mThreadPoolExecutor.java: 1136�[m
               �[37mjava.util.concurrent.FutureTask.run�[m  �[32m        FutureTask.java:  264�[m
                                               �[37m...�[m  �[32m                             �[m
               �[33mclojure.core/binding-conveyor-fn/�[1;33mfn�[m  �[32m               core.clj: 2047�[m
                          �[33mcues.queue/start-join/�[1;33mfn�[m  �[32m              queue.clj: 1204�[m
                         �[33mcues.queue/�[1;33mprocessor-loop�[m  �[32m              queue.clj: 1160�[m
                         �[33mcues.queu
e/�[1;33mprocessor-step�[m  �[32m              queue.clj: 1005�[m
                          �[33mcues.queue/�[1;33mprocessor-run�[m  �[32m              queue.clj:  952�[m
                                               �[37m...�[m  �[32m                             �[m
                           �[33mcues.queue/eval16136/�[1;33mfn�[m  �[32m              queue.clj:  761�[m
                           �[33mcues.queue/�[1;33mattempt-full�[m  �[32m              queue.clj:  747�[m
                          �[33mcues.queue/wrap-write/�[1;33mfn�[m  �[32m              queue.clj:  464�[m
                      �[33mcues.queue/wrap-throwable/�[1;33mfn�[m  �[32m              queue.clj:  735�[m
                        �[33mcues.queue/wrap-attempt/�[1;33mfn�[m  �[32m              queue.clj:  715�[m
                             �[33mcues.queue/�[1;33mencode-msg�[m  �[32m              queue.clj:  694�[m
                      �[33mcues.queue/codec/reify/�[1;33mwrite�[m  �[32m              queue.clj:   80�[m
                    
         �[33mtaoensso.nippy/�[1;33mfreeze�[m  �[32m              nippy.clj: 1331�[m
                             �[33mtaoensso.nippy/�[1;33mfreeze�[m  �[32m              nippy.clj: 1337�[m
                 �[33mtaoensso.nippy/�[1;33mcall-with-bindings�[m  �[32m              nippy.clj: 1279�[m
                          �[33mtaoensso.nippy/freeze/�[1;33mfn�[m  �[32m              nippy.clj: 1356�[m
                       �[33mtaoensso.nippy/freeze/fn/�[1;33mfn�[m  �[32m              nippy.clj: 1356�[m
                     �[33mtaoensso.nippy/eval14559/fn/�[1;33mG�[m  �[32m              nippy.clj:  570�[m
                       �[33mtaoensso.nippy/eval14578/�[1;33mfn�[m  �[32m              nippy.clj:  578�[m
                     �[33mtaoensso.nippy/eval14531/fn/�[1;33mG�[m  �[32m              nippy.clj:  569�[m
                       �[33mtaoensso.nippy/eval14827/�[1;33mfn�[m  �[32m              nippy.clj: 1174�[m
                          �[33mtaoensso.nippy/�[1;33mwrite-map�[m  �[32m              nippy.clj:  8
86�[m
                            �[33mclojure.core/�[1;33mreduce-kv�[m  �[32m               core.clj: 6919�[m
                       �[33mclojure.core.protocols/fn/�[1;33mG�[m  �[32m          protocols.clj:  175�[m
                                   �[33mclojure.core/�[1;33mfn�[m  �[32m               core.clj: 6908�[m
                                               �[37m...�[m  �[32m                             �[m
                       �[33mtaoensso.nippy/write-map/�[1;33mfn�[m  �[32m              nippy.clj:  886�[m
                    �[33mtaoensso.nippy/write-map/fn/�[1;33mfn�[m  �[32m              nippy.clj:  889�[m
                     �[33mtaoensso.nippy/eval14559/fn/�[1;33mG�[m  �[32m              nippy.clj:  570�[m
                       �[33mtaoensso.nippy/eval14578/�[1;33mfn�[m  �[32m              nippy.clj:  578�[m
                     �[33mtaoensso.nippy/eval14531/fn/�[1;33mG�[m  �[32m              nippy.clj:  569�[m
                       �[33mtaoensso.nippy/eval14827/�[1;33mfn�[m  �[32m        
      nippy.clj: 1174�[m
                          �[33mtaoensso.nippy/�[1;33mwrite-map�[m  �[32m              nippy.clj:  886�[m
                            �[33mclojure.core/�[1;33mreduce-kv�[m  �[32m               core.clj: 6919�[m
                       �[33mclojure.core.protocols/fn/�[1;33mG�[m  �[32m          protocols.clj:  175�[m
                                   �[33mclojure.core/�[1;33mfn�[m  �[32m               core.clj: 6908�[m
                                               �[37m...�[m  �[32m                             �[m
                       �[33mtaoensso.nippy/write-map/�[1;33mfn�[m  �[32m              nippy.clj:  886�[m
                    �[33mtaoensso.nippy/write-map/fn/�[1;33mfn�[m  �[32m              nippy.clj:  889�[m
                     �[33mtaoensso.nippy/eval14559/fn/�[1;33mG�[m  �[32m              nippy.clj:  570�[m
                       �[33mtaoensso.nippy/eval14578/�[1;33mfn�[m  �[32m              nippy.clj:  577�[m
                     �[33mtaoensso.nippy/eval14531/fn/�[1;33m
G�[m  �[32m              nippy.clj:  569�[m
                       �[33mtaoensso.nippy/eval14827/�[1;33mfn�[m  �[32m              nippy.clj: 1174�[m
                          �[33mtaoensso.nippy/�[1;33mwrite-map�[m  �[32m              nippy.clj:  886�[m
                            �[33mclojure.core/�[1;33mreduce-kv�[m  �[32m               core.clj: 6919�[m
                       �[33mclojure.core.protocols/fn/�[1;33mG�[m  �[32m          protocols.clj:  175�[m
                                   �[33mclojure.core/�[1;33mfn�[m  �[32m               core.clj: 6908�[m
                                               �[37m...�[m  �[32m                             �[m
                       �[33mtaoensso.nippy/write-map/�[1;33mfn�[m  �[32m              nippy.clj:  886�[m
                    �[33mtaoensso.nippy/write-map/fn/�[1;33mfn�[m  �[32m              nippy.clj:  889�[m
                     �[33mtaoensso.nippy/eval14559/fn/�[1;33mG�[m  �[32m              nippy.clj:  570�[m
                       �[33mtaoensso.nippy
/eval14578/�[1;33mfn�[m  �[32m              nippy.clj:  578�[m
                     �[33mtaoensso.nippy/eval14531/fn/�[1;33mG�[m  �[32m              nippy.clj:  569�[m
                       �[33mtaoensso.nippy/eval14867/�[1;33mfn�[m  �[32m              nippy.clj: 1255�[m
                  �[33mtaoensso.nippy/�[1;33mthrow-unfreezable�[m  �[32m              nippy.clj: 1005�[m
�[1;31mclojure.lang.ExceptionInfo�[m: �[3mUnfreezable type: class next.jdbc.result_set$navize_row$fn__49308�[m
    �[1mas-str�[m: "#object[next.jdbc.result_set$navize_row$fn__49308 0x23c00cbf \"next.jdbc.result_set$navize_row$fn__49308@23c00cbf\"]"
      �[1mtype�[m: next.jdbc.result_set$navize_row$fn__49308
�[1;31mclojure.lang.ExceptionInfo�[m: �[3mUnfreezable type: class next.jdbc.result_set$navize_row$fn__49308�[m
      �[1merr.proc/config�[m: {:id :util.processors/jdbc-reader,
                        :in {:input :util.cues/source},
                        :out {:output :util.cues/notification_sources},
                        :strateg
y :cues.queue/exactly-once}

..... (THERE IS A LOT MORE BUT YOU CAN SEE THE EXCEPTION INFO JUST 4-5 lines above here)

Any suggestions to help get around this ?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.