atheriel / longears Goto Github PK
View Code? Open in Web Editor NEWThe RabbitMQ client for R
Home Page: https://atheriel.github.io/longears/
The RabbitMQ client for R
Home Page: https://atheriel.github.io/longears/
Hi,
I am looking for help in keeping the consumer alive and listen messages continuously, is there any such feature available using this library ?
Another question: How to read message text (body) from amqp_message
object
Thanks,
Surendhar
Hello,
The timeout is limited to 10 seconds.
Is this a limitation of RabbitMQ?
Line 137 in 2c1f841
Congratulations on the project.
Thanks
This is possible via the underlying librabbitmq
(see example) but has not been implemented in longears
.
First of all, thank you for this great package!
I've been trying different ways to implement a consumer in a R Shiny app, but haven't been able to figure out how to do it with amqp_consume()
or amqp_consume_later()
without running into problems.
In your documentation you mention that amqp_consume_later()
is "primarily for use inside applications (especially Shiny applications)". However, I haven't been able to figure out how to make this work properly, and unfortunately I haven't seen any threads on stackoverflow or examples on github.
It would be a great help if you could expand the documentation, e.g. with a minimal working example.
Hi @atheriel. Thanks for your great work on rabbitMQ client.
I see that client sends acknowledgement before callback is executed and regardless the callback status. Shouldn't we send it only if callback haven't thrown an error? Otherwise it becomes a situation that message is not properly consumed and nobody notice that. So essentially message is lost for a system.
I'm trying to publish message which consist of raw data (in my case messagepack, but as an example i'm just sending byte encoded string).
library(longears)
con = amqp_connect()
msg = charToRaw("test")
amqp_publish(con, msg, exchange = "", routing_key = "test")
However somewhere internally it got encoded into "NA".
I installed longEars and successfully connected to my RabbitMQ broker. But I don't understand why the payloads are like "66 69 72 73 74" instead of simple text like "first".
As the same happens in the examples in Readme, I suppose there must be some reason to encode the payload like this. But how do I decode the payload ?
#10 added fairly sophisticated control of acknowledgements, but the documentation isn't very discoverable and these techniques are too specialised to put in the README. It makes sense to add a vignette about them instead.
Can't install with https://github.com/r-hub/[r-minima](https://github.com/r-hub/r-minimal) and starting throwing errors
"consume_later.cpp:181:18: error: aggregate 'timeval tv' has incomplete type and cannot be defined" because of alpine timeval
Hi - Being that R is single threaded by default, and often we use queues to farm out batch work or long running work, i think it makes a lot of sense to allow the prefetch size to be changed from 50, which seems very high, especially as the unit of work grows. Totally understand the desire to keep many of the details hidden from the lower level api, which is great, but there should be a way to do this as an optional flag.
Line 11 in 268398f
For Sending and immediatly receiving messages, RabbitMQ has the Direct Reply-To feature: https://www.rabbitmq.com/direct-reply-to.html
I can't get it to work with this code, even though there's no error in the consumer or the queue and the message is sent back:
library(longears)
conn <- amqp_connect()
consumer <- amqp_consume(
conn = conn,
no_ack = TRUE,
# prefetch_count = 1,
timeout = 15,
queue = "amq.rabbitmq.reply-to",
fun = function(msg) {
received <<- received + 1
})
amqp_publish(conn, '{"content":1707163749285}', exchange = "", routing_key = "QUEUE_NAME", properties=amqp_properties(content_type="application/json", reply_to="amq.rabbitmq.reply-to.bdwkehdwhecbhjwe", delivery_mode=2))
while (TRUE) {
amqp_listen(conn, timeout = 15)
}
Is it possible to install it at Google Colab (http://colab.fan/r) ?
As the summary reads. Please verify.
Hi,
I always have to reconnect manually if the connection has been idle for more than a minute or so.
> amqp_publish(conn, message.raw, exchange = "run.function", routing_key = "#")
Error in amqp_publish(conn, message.raw, exchange = "run.function", routing_key = "#") :
Failed to publish message. Disconnected from server
I then have to reconnect and try again:
> amqp_reconnect(conn)
> amqp_publish(conn, message.raw, exchange = "run.function", routing_key = "#")
>
That's not a problem per se but ?amqp_reconnect
says:
When possible, we automatically recover from connection errors, so manual reconnection is not usually necessary.
So before I write tryCatch()
wrappers I wanted to ask if there's something I'm doing wrong. I followed the Basic Usage example just with a remote server. Is this related to the timeout=
parameter in amqp_connect
? Do I have to set some other parameter to enable automatic reconnects?
Thank you.
Best regards,
Stefan
Hello,
Apologies if this is not the right place to request support. I must preface with the disclaimer that I am primarily a java developer crossing over. I have limited experience with R and none with C++, so the probability that i'm doing something wrong is quite high.
using the readme.md as a guideline, I am trying to get started with using longears to implement a rabbitmq consumer in R.
installing the package longears from github fails. These are the last lines on the terminal:
** libs
gcc -std=gnu99 -I"/usr/share/R/include" -DNDEBUG -I. -I'/home/bhagwat_chaitanya/R/x86_64-pc-linux-gnu-library/4.0/later/include' -I'/home/bhagwat_chaitanya/R/x86_64-pc-linux-gnu-library/4.0/Rcpp/include' -fpic -g -O2 -fdebug-prefix-map=/build/r-base-tRgc13/r-base-4.0.5=. -fstack-protector-strong -Wformat -Werror=format-security -Wdate-time -D_FORTIFY_SOURCE=2 -g -c basic.c -o basic.o
gcc -std=gnu99 -I"/usr/share/R/include" -DNDEBUG -I. -I'/home/bhagwat_chaitanya/R/x86_64-pc-linux-gnu-library/4.0/later/include' -I'/home/bhagwat_chaitanya/R/x86_64-pc-linux-gnu-library/4.0/Rcpp/include' -fpic -g -O2 -fdebug-prefix-map=/build/r-base-tRgc13/r-base-4.0.5=. -fstack-protector-strong -Wformat -Werror=format-security -Wdate-time -D_FORTIFY_SOURCE=2 -g -c bind.c -o bind.o
gcc -std=gnu99 -I"/usr/share/R/include" -DNDEBUG -I. -I'/home/bhagwat_chaitanya/R/x86_64-pc-linux-gnu-library/4.0/later/include' -I'/home/bhagwat_chaitanya/R/x86_64-pc-linux-gnu-library/4.0/Rcpp/include' -fpic -g -O2 -fdebug-prefix-map=/build/r-base-tRgc13/r-base-4.0.5=. -fstack-protector-strong -Wformat -Werror=format-security -Wdate-time -D_FORTIFY_SOURCE=2 -g -c connection.c -o connection.o
gcc -std=gnu99 -I"/usr/share/R/include" -DNDEBUG -I. -I'/home/bhagwat_chaitanya/R/x86_64-pc-linux-gnu-library/4.0/later/include' -I'/home/bhagwat_chaitanya/R/x86_64-pc-linux-gnu-library/4.0/Rcpp/include' -fpic -g -O2 -fdebug-prefix-map=/build/r-base-tRgc13/r-base-4.0.5=. -fstack-protector-strong -Wformat -Werror=format-security -Wdate-time -D_FORTIFY_SOURCE=2 -g -c consume.c -o consume.o
g++ -std=gnu++11 -I"/usr/share/R/include" -DNDEBUG -I. -I'/home/bhagwat_chaitanya/R/x86_64-pc-linux-gnu-library/4.0/later/include' -I'/home/bhagwat_chaitanya/R/x86_64-pc-linux-gnu-library/4.0/Rcpp/include' -fpic -g -O2 -fdebug-prefix-map=/build/r-base-tRgc13/r-base-4.0.5=. -fstack-protector-strong -Wformat -Werror=format-security -Wdate-time -D_FORTIFY_SOURCE=2 -g -c consume_later.cpp -o consume_later.o
consume_later.cpp: In function ‘void later_callback(void*)’:
consume_later.cpp:81:17: error: ‘strncmp’ was not declared in this scope
81 | while (elt && strncmp((const char *) elt->tag.bytes,
| ^~~~~~~
consume_later.cpp:10:1: note: ‘strncmp’ is defined in header ‘<cstring>’; did you forget to ‘#include <cstring>’?
9 | #include "utils.h"
+++ |+#include <cstring>
10 |
consume_later.cpp:109:3: error: ‘memcpy’ was not declared in this scope
109 | memcpy((void *) RAW(body), cdata->env->message.body.bytes, body_len);
| ^~~~~~
consume_later.cpp:109:3: note: ‘memcpy’ is defined in header ‘<cstring>’; did you forget to ‘#include <cstring>’?
consume_later.cpp: In function ‘void later_warn_callback(void*)’:
consume_later.cpp:163:7: error: ‘strncpy’ was not declared in this scope
163 | strncpy(tag, (const char *) err->payload.tag.bytes, err->payload.tag.len);
| ^~~~~~~
consume_later.cpp:163:7: note: ‘strncpy’ is defined in header ‘<cstring>’; did you forget to ‘#include <cstring>’?
consume_later.cpp: In function ‘void* consume_run(void*)’:
consume_later.cpp:245:25: error: ‘strncmp’ was not declared in this scope
245 | while (elt && strncmp((const char *) elt->tag.bytes,
| ^~~~~~~
consume_later.cpp:245:25: note: ‘strncmp’ is defined in header ‘<cstring>’; did you forget to ‘#include <cstring>’?
make: *** [/usr/lib/R/etc/Makeconf:181: consume_later.o] Error 1
ERROR: compilation failed for package ‘longears’
* removing ‘/home/bhagwat_chaitanya/R/x86_64-pc-linux-gnu-library/4.0/longears’
Warning message:
In i.p(...) :
installation of package ‘/tmp/RtmpJ07CEI/file414679868170/longears_0.2.4.tar.gz’ had non-zero exit status
To reproduce:
apt-get update && apt-get upgrade
sudo apt install dirmngr gnupg apt-transport-https ca-certificates software-properties-common
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys E298A3A825C0D65DFD57CBB651716619E084DAB9
sudo add-apt-repository 'deb https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/
sudo apt install r-base
#(this is version 4.0.5)sudo apt install librabbitmq-dev
install.packages("remotes")
remotes::install_github("longears")
the last step results in the error printed above
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.