redboltz / async_mqtt Goto Github PK
View Code? Open in Web Editor NEWAsynchronous MQTT communication library based on Boost.Asio
License: Boost Software License 1.0
Asynchronous MQTT communication library based on Boost.Asio
License: Boost Software License 1.0
You can close when you have read this (maybe enable discussions in this repo)
I did not finish the example but here is how you can do coroutines without boost yield
#include <boost/asio.hpp>
#include <async_mqtt/all.hpp>
namespace asio = boost::asio;
namespace am = async_mqtt;
static asio::awaitable<void> make_coro(asio::io_context& ctx) {
asio::ip::tcp::socket resolve_sock{ctx};
asio::ip::tcp::resolver res{resolve_sock.get_executor()};
am::endpoint<am::role::client, am::protocol::mqtt> amep {
am::protocol_version::v3_1_1,
ctx.get_executor()
};
asio::ip::tcp::resolver::results_type resolved_ip = co_await res.async_resolve("localhost", "1883", asio::use_awaitable);
[[maybe_unused]] asio::ip::tcp::endpoint endpoint = co_await asio::async_connect(amep.next_layer(), resolved_ip, asio::use_awaitable);
co_await amep.send(am::v3_1_1::connect_packet{
true, // clean_session
0x1234, // keep_alive
am::allocate_buffer("cid1"),
am::nullopt, // will
am::nullopt, // username set like am::allocate_buffer("user1"),
am::nullopt // password set like am::allocate_buffer("pass1")
}, asio::use_awaitable);
[[maybe_unused]] am::packet_variant packet_variant = co_await amep.recv(asio::use_awaitable);
// todo do something with the above packet
co_await amep.send(am::v3_1_1::publish_packet{ *amep.acquire_unique_packet_id(), am::allocate_buffer("signal_topic"),
am::allocate_buffer("signal_payload"), am::qos::at_least_once }, asio::use_awaitable);
}
int main (int argc, char* argv[]) {
asio::io_context ctx{};
asio::co_spawn(ctx, make_coro(ctx), asio::detached);
ctx.run();
return EXIT_SUCCESS;
}
Btw, nice library!
Hi @redboltz , I am trying to access the value of Session Expiry Interval from a connect_packet. I have the following code:
auto conn_packet = std::get_if<async_mqtt::v5::connect_packet>(&packet);
for (auto &p: conn_packet->props()) {
std::cout << p << std::endl;
std::cout << p.id() << std::endl;
}
which gives me the following output:
{id:session_expiry_interval,val:0}
session_expiry_interval
but I can't seem to find the function to retrieve the value, i.e. I want to be able to assign 0 to a variable.
How can I do this?
recv(
filter fil,
std::set<control_packet_type> types,
CompletionToken&& token
) {
I am trying to filter received packages by their type and was wondering why the function with package filtering doesn't support async as the normal recv function without filtering does support async.
recv(
CompletionToken&& token
) {
I was working with std::error_codes and reason codes in my application and came across something interesting which might be a good addition to this library. It is possible to define async_mqtt reason codes as std::error_codes in order to make them more accessible.
Example
namespace detail {
class suback_error_category : public std::error_category {
public:
virtual const char* name() const noexcept override { return "async_mqtt::suback_error_category"; }
virtual std::string message(int value) const override {
return suback_reason_code_to_str(static_cast<suback_reason_code>(value));
}
};
} // namespace detail
const std::error_category& suback_error_category() {
static detail::suback_error_category instance;
return instance;
}
inline std::error_code make_error_code(suback_reason_code v) {
return std::error_code(static_cast<uint8_t>(v), async_mqtt::suback_error_category());
}
This is just for suback_reason_code to give you the idea. Using this I can call:
for (auto const& entry : suback_packet.entries()) {
if (entry != async_mqtt::suback_reason_code::granted_qos_0) {
logger_.error("Error subscribing to topic: {}, reason code: {}", ncmd_topic,
async_mqtt::suback_reason_code_to_str(entry));
co_return async_mqtt::make_error_code(entry);
}
}
and get a std::error_code from the async_mqtt reason codes. Do you like this? I can write for other reason codes and PR if you'd like.
Hi I've got a question regarding the behavior I've observed. It may not be a bug or sth, maybe I just use the endpoint in the wrong way. Anyway, I have an endpoint that I connect to a broker, keep alive is set to 15 sec. Once I establish a connection with the broker I move the endpoint variable into the other variable let's call it endpoint2 (it is a simplified example based on my code, just to ease reproduction). Then, using endpoint2 variable I try to receive some packets. After about 15 seconds (the same as keep alive) the code gets crashed.
The "stack trace" is pretty long thus I'm showing only the relevant parts:
endpoint.hpp:138:31: runtime error: member call on null pointer of type 'struct element_type'
pc_0x557f6e551b55###func_async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::strand()###file_/home/bielpa/.priv/repo/hhctrl-mgmt/cmake-build-linux_gcc_debug_integration_tests_sanitizers/deps/async_mqtt-src-src/include/async_mqtt/endpoint.hpp###line_138###obj(home_assistant_it_v2+0x110db55)
pc_0x557f6e859bb9###func_void async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::send_impl<async_mqtt::v3_1_1::pingreq_packet>::operator()<boost::asio::detail::composed_op<async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::send_impl<async_mqtt::v3_1_1::pingreq_packet>, boost::asio::detail::composed_work<void ()>, async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::reset_pingreq_send_timer()::{lambda(boost::system::error_code const&)#1}::operator()(boost::system::error_code const&) const::{lambda(async_mqtt::system_error const&)#1}, void (async_mqtt::system_error)> >(boost::asio::detail::composed_op<async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::send_impl<async_mqtt::v3_1_1::pingreq_packet>, boost::asio::detail::composed_work<void ()>, async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::reset_pingreq_send_timer()::{lambda(boost::system::error_code const&)#1}::operator()(boost::system::error_code const&) const::{lambda(async_mqtt::system_error const&)#1}, void (async_mqtt::system_error)>&, boost::system::error_code const&, unsigned long)###file_/home/bielpa/.priv/repo/hhctrl-mgmt/cmake-build-linux_gcc_debug_integration_tests_sanitizers/deps/async_mqtt-src-src/include/async_mqtt/endpoint.hpp###line_860###obj(home_assistant_it_v2+0x1415bb9)
pc_0x557f6e83b430###func_void boost::asio::detail::composed_op<async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::send_impl<async_mqtt::v3_1_1::pingreq_packet>, boost::asio::detail::composed_work<void ()>, async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::reset_pingreq_send_timer()::{lambda(boost::system::error_code const&)#1}::operator()(boost::system::error_code const&) const::{lambda(async_mqtt::system_error const&)#1}, void (async_mqtt::system_error)>::operator()<>()###file_/home/bielpa/.conan/data/boost/1.82.0///package/8cc3305c27e5ff838d1c7590662e309638310dfc/include/boost/asio/compose.hpp###line_107###obj_(home_assistant_it_v2+0x13f7430)
pc_0x557f6e823011###func_void boost::asio::detail::initiate_composed_op<void (async_mqtt::system_error), void ()>::operator()<async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::reset_pingreq_send_timer()::{lambda(boost::system::error_code const&)#1}::operator()(boost::system::error_code const&) const::{lambda(async_mqtt::system_error const&)#1}, async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::send_impl<async_mqtt::v3_1_1::pingreq_packet> >(async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::reset_pingreq_send_timer()::{lambda(boost::system::error_code const&)#1}::operator()(boost::system::error_code const&) const::{lambda(async_mqtt::system_error const&)#1}&&, async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::send_impl<async_mqtt::v3_1_1::pingreq_packet>&&) const###file_/home/bielpa/.conan/data/boost/1.82.0///package/8cc3305c27e5ff838d1c7590662e309638310dfc/include/boost/asio/compose.hpp###line_275###obj_(home_assistant_it_v2+0x13df011)
I guess that after those 15 seconds the ping timer expires and the endpoint wants to send a ping req, however, due to the previous move operation on the endpoint variable something goes wrong. Thus my question is - Is the move operation on the endpoint that was already connected allowed? Of course, it can be easily mitigated by e.g. allocating the endpoint on the heap, however I would like to hear your opinion.
Below is the code you can use to reproduce this "issue".
#include <catch2/catch_all.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/connect.hpp>
#include <async_mqtt/all.hpp>
#include <spdlog/spdlog.h>
void rethrow(std::exception_ptr ptr)
{
if (ptr) {
std::rethrow_exception(ptr);
}
}
TEST_CASE("Client can connect to broker 2")
{
auto ioc = boost::asio::io_context{};
auto ep = async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt> {
async_mqtt::protocol_version::v3_1_1, ioc.get_executor()
};
// NOLINTBEGIN
boost::asio::co_spawn(ioc.get_executor(), [&ep, &ioc]() mutable -> boost::asio::awaitable<void> {
// Resolve
auto resolver = boost::asio::ip::tcp::resolver{ep.next_layer().get_executor()};
auto eps = co_await resolver.async_resolve("127.0.0.1", "1883", boost::asio::use_awaitable);
// Connect
co_await boost::asio::async_connect(
ep.next_layer(),
eps,
boost::asio::use_awaitable);
// Send connect packet
auto packet = async_mqtt::v3_1_1::connect_packet {
true,
15,
async_mqtt::allocate_buffer("some_unique_id_1234567"),
std::nullopt,
async_mqtt::allocate_buffer("test_user"), // credentials for my broker
async_mqtt::allocate_buffer("test")
};
if (auto system_error = co_await ep.send(std::move(packet), boost::asio::use_awaitable)) {
throw std::runtime_error{system_error.what()};
}
// Recv connack packet
if (async_mqtt::packet_variant packet_variant = co_await ep.recv(boost::asio::use_awaitable)) {
packet_variant.visit(
async_mqtt::overload{
[](const auto&) {
}
});
} else {
throw std::runtime_error{packet_variant.get<async_mqtt::system_error>().message()};
}
auto new_ep = std::move(ep);
// Receive
while (true) {
if (auto recv_packet = co_await new_ep.recv(boost::asio::use_awaitable)) {
auto type = recv_packet.type();
if (!type) {
throw std::runtime_error{"Invalid packet"};
}
spdlog::get("async_mqtt_client")->debug("Packet received: {}", async_mqtt::control_packet_type_to_str(type.value()));
} else {
throw std::runtime_error{recv_packet.template get<async_mqtt::system_error>().message()};
}
}
}, rethrow);
// NOLINTEND
ioc.run();
}
Hi there,
there does not seem to be a CMake-macro FindAsyncMQTT.cmake (or similar) that allows searching for the library? Before I try to invent such a macro -- is this an oversight from me?
Kind Regards,
Rüdiger
see cmake-packages for version file and the package config helpers on how to quickly add this file, since there been a few major bumps in ~6 months in this project a version file might be a good thing.
// using namespace std;
namespace asio = boost::asio;
class test_class {
private:
asio::io_context& ctx_;
std::shared_ptr<async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt>> mqtt_client;
public:
test_class(asio::io_context& ctx)
: ctx_(ctx), mqtt_client(std::make_shared<async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt>>(
async_mqtt::protocol_version::v3_1_1,
ctx_.get_executor())) {
asio::co_spawn(mqtt_client->strand(), test_func(), asio::detached);
}
auto test_func() -> asio::awaitable<void> {
asio::ip::tcp::socket resolve_sock{ ctx_ };
asio::ip::tcp::resolver res{ resolve_sock.get_executor() };
asio::ip::tcp::resolver::results_type resolved_ip = co_await res.async_resolve("localhost", "1883", asio::use_awaitable);
try {
asio::ip::tcp::endpoint endpoint =
co_await asio::async_connect(mqtt_client->next_layer(), resolved_ip, asio::use_awaitable);
std::cout << endpoint.address().to_string() << '\n';
} catch (const std::exception& e) {
std::cout << "Exception during connect: " << e.what() << '\n';
} catch (...) {
std::cout << "Unknown exception during connect\n";
}
co_await mqtt_client->send(
async_mqtt::v3_1_1::connect_packet{ true, 0x1234, async_mqtt::allocate_buffer("cid1"), async_mqtt::nullopt,
async_mqtt::nullopt, async_mqtt::nullopt },
asio::use_awaitable);
async_mqtt::packet_variant packet_variant = co_await mqtt_client->recv(asio::use_awaitable);
co_await mqtt_client->send(
async_mqtt::v3_1_1::publish_packet{ mqtt_client->acquire_unique_packet_id().value(),
async_mqtt::allocate_buffer("test_topic"),
async_mqtt::allocate_buffer("test_payload"), async_mqtt::qos::at_least_once },
asio::use_awaitable);
std::string payload = "n";
while (true) {
std::cout << "Sending payload: " << payload << "\n";
auto result = co_await mqtt_client->send(
async_mqtt::v3_1_1::publish_packet{ mqtt_client->acquire_unique_packet_id().value(),
async_mqtt::allocate_buffer("test_topic"),
async_mqtt::allocate_buffer(payload), async_mqtt::qos::at_least_once },
asio::use_awaitable);
if (!result) {
std::cout << "Successfully sent payload\n";
} else {
std::cout << "Failed to send payload\n";
}
asio::steady_timer timer{ ctx_, std::chrono::seconds{ 3 } };
co_await timer.async_wait(asio::use_awaitable);
payload += "n";
}
co_return;
}
};
auto main() -> int {
asio::io_context ctx{};
test_class tester(ctx);
ctx.run();
return 0;
}
[2023-06-27 12:09:21.979104] [0x00007fb47e4de780] [info] send:v3_1_1::connect{cid:cid1,ka:4660,cs:true}
[2023-06-27 12:09:21.979115] [0x00007fb47e4de780] [info] [store] clear
[2023-06-27 12:09:21.979142] [0x00007fb47e4de780] [info] recv
[2023-06-27 12:09:21.979174] [0x00007fb47e4de780] [info] recv:v3_1_1::connack{rc:accepted,sp:false}
[2023-06-27 12:09:21.979178] [0x00007fb47e4de780] [info] [store] clear
[2023-06-27 12:09:21.979181] [0x00007fb47e4de780] [info] acquire_unique_packet_id:1
[2023-06-27 12:09:21.979186] [0x00007fb47e4de780] [info] send:v3_1_1::publish{topic:test_topic,qos:at_least_once,retain:no,dup:no,pid:1}
Sending payload: n
[2023-06-27 12:09:21.979202] [0x00007fb47e4de780] [info] acquire_unique_packet_id:2
[2023-06-27 12:09:21.979204] [0x00007fb47e4de780] [info] send:v3_1_1::publish{topic:test_topic,qos:at_least_once,retain:no,dup:no,pid:2}
Successfully sent payload
Sending payload: nn
[2023-06-27 12:09:24.979298] [0x00007fb47e4de780] [info] acquire_unique_packet_id:3
[2023-06-27 12:09:24.979318] [0x00007fb47e4de780] [info] send:v3_1_1::publish{topic:test_topic,qos:at_least_once,retain:no,dup:no,pid:3}
Successfully sent payload
Sending payload: nnn
[2023-06-27 12:09:27.979727] [0x00007fb47e4de780] [info] acquire_unique_packet_id:4
[2023-06-27 12:09:27.979746] [0x00007fb47e4de780] [info] send:v3_1_1::publish{topic:test_topic,qos:at_least_once,retain:no,dup:no,pid:4}
Successfully sent payload
Sending payload: nnnn
[2023-06-27 12:09:30.980130] [0x00007fb47e4de780] [info] acquire_unique_packet_id:5
[2023-06-27 12:09:30.980148] [0x00007fb47e4de780] [info] send:v3_1_1::publish{topic:test_topic,qos:at_least_once,retain:no,dup:no,pid:5}
Successfully sent payload
Sending payload: nnnnn
[2023-06-27 12:09:33.980610] [0x00007fb47e4de780] [info] acquire_unique_packet_id:6
[2023-06-27 12:09:33.980627] [0x00007fb47e4de780] [info] send:v3_1_1::publish{topic:test_topic,qos:at_least_once,retain:no,dup:no,pid:6}
[2023-06-27 12:09:33.980655] [0x00007fb47e4de780] [info] send error:Broken pipe
Failed to send payload
Sending payload: nnnnnn
[2023-06-27 12:09:36.980985] [0x00007fb47e4de780] [info] acquire_unique_packet_id:7
[2023-06-27 12:09:36.981003] [0x00007fb47e4de780] [info] send:v3_1_1::publish{topic:test_topic,qos:at_least_once,retain:no,dup:no,pid:7}
I encountered this error while working in a larger project but tried to isolate it to the minimum viable example.
In order to test this I am sending a new payload with an added "n" every 3 seconds. The first three arrive succcessfully at the MQTT broker. This is when I turn off the MQTT broker. When this happens the next one reports as being successfully sent (this is incorrect), but all subsequent messages report as being unsuccessfully sent.
Perhaps I am using the library incorrectly but if so, I would like to know where I am incorrect. In a larger codebase I have slightly different code which tries to as forcefully as possible to send the subsequent messages. When this happens the new messages are sent but the missing ones are left out. This would be equivalent to, in this example, to the MQTT broker receiveing:
["n", "nn", "nnn", "nnnnn", "nnnnnn"].
Leaving out the messages when the broker was down.
Now, async_mqtt is using picosha2.h.
https://github.com/redboltz/async_mqtt/blob/main/include/async_mqtt/external/picosha2.h
A part of broker code use it. https://github.com/redboltz/async_mqtt/blob/main/include/async_mqtt/broker/security.hpp
It is for sha256 password digest.
async_mqtt/include/async_mqtt/broker/security.hpp
Lines 76 to 83 in 8d3ef84
If I propose async_mqtt to the Boost Libraries, this external dependency should be solved.
When brew updates lcov version 1.16 to 2.0, lcov starts failing.
The same issue is reported https://bytemeta.vip/repo/linux-test-project/lcov/issues/220
It seems that there is no solution, so far.
Back to 1.16 is a workaround.
I'm not sure how to install lcov 1.16 using brew.
https://github.com/Homebrew/homebrew-core/blob/master/Formula/lcov.rb
has already been updated to 2.0.
There is no [email protected]
Formula.
ASYNC_MQTT_USE_TLS
requires openssl but the port does not depend on it.
recv(
filter fil,
std::set<control_packet_type> types,
CompletionToken&& token
) {
I am trying to filter received packages by their type and was wondering why this function doesn't support async as the normal recv function without filtering does support async.
recv(
CompletionToken&& token
) {
Hello,
Thank you for writing this async_mqtt, it looks great!
I'm trying to build version 2.0.0 with GCC 9.3.0. I'm cross-compiling using arm-oe-linux-gnueabi-g++.
The build is failing right from the start:
/arm-oe-linux-gnueabi-g++ --sysroot=/home/bjc/quectel_sensor_sdk/rm520nglaar/ql-ol-extsdk/ql-sysroots -DASYNC_MQTT_USE_TLS -Iasync_mqtt-2.0.0/include -isystem /home/bjc/quectel_sensor_sdk/boost_1_81_0 -march=armv7-a -marm -mfpu=neon -mfloat-abi=hard -Os -DNDEBUG -std=c++2a -MD -MT example/CMakeFiles/ep_cb_mqtt_client.dir/ep_cb_mqtt_client.cpp.o -MF CMakeFiles/ep_cb_mqtt_client.dir/ep_cb_mqtt_client.cpp.o.d -o CMakeFiles/ep_cb_mqtt_client.dir/ep_cb_mqtt_client.cpp.o -c async_mqtt-2.0.0/example/ep_cb_mqtt_client.cpp In file included from async_mqtt-2.0.0/include/async_mqtt/packet/property_variant.hpp:11, from async_mqtt-2.0.0/include/async_mqtt/packet/will.hpp:12, from async_mqtt-2.0.0/include/async_mqtt/packet/v3_1_1_connect.hpp:26, from async_mqtt-2.0.0/include/async_mqtt/packet/packet_variant.hpp:11, from async_mqtt-2.0.0/include/async_mqtt/buffer_to_packet_variant.hpp:12, from async_mqtt-2.0.0/include/async_mqtt/all.hpp:11, from async_mqtt-2.0.0/example/ep_cb_mqtt_client.cpp:12: async_mqtt-2.0.0/include/async_mqtt/packet/property.hpp: In constructor ‘async_mqtt::property::detail::binary_property::binary_property(async_mqtt::property::id, async_mqtt::buffer)’: async_mqtt-2.0.0/include/async_mqtt/packet/property.hpp:142:19: error: use of deleted function ‘async_mqtt::buffer::buffer(const async_mqtt::buffer&)’ 142 | length_(2) // size 2 | ^ In file included from async_mqtt-2.0.0/include/async_mqtt/all.hpp:10, from async_mqtt-2.0.0/example/ep_cb_mqtt_client.cpp:12: async_mqtt-2.0.0/include/async_mqtt/buffer.hpp:48:5: note: ‘async_mqtt::buffer::buffer(const async_mqtt::buffer&) noexcept’ is implicitly deleted because its exception-specification does not match the implicit exception-specification ‘’ 48 | buffer(buffer const& other) noexcept = default; | ^~~~~~
It seems that the authorization logic for delivery doesn't seems to work well.
I wrote the following simple test case:
BOOST_AUTO_TEST_CASE(allow_pertial_deny_group) {
am::security security;
std::string test = R"*(
{
# Configure username/login
"authentication": [
{
"name": "u1",
"method": "plain_password",
"password": "hoge"
}
,
{
"name": "u2",
"method": "plain_password",
"password": "hoge"
}
],
# Give access to topics
"authorization": [
{
"topic": "#",
"allow": { "pub":["u1"], "sub":["u1"] }
}
,
{
"topic": "#",
"deny": { "pub":["u2"], "sub":["u2"] }
}
]
}
)*";
BOOST_CHECK_NO_THROW(load_config(security, test));
// sub
BOOST_CHECK(security.is_subscribe_authorized("u1", "topic"));
BOOST_CHECK(!security.is_subscribe_authorized("u2", "topic"));
// pub
BOOST_CHECK(security.auth_pub("topic", "u1") == am::security::authorization::type::allow);
BOOST_CHECK(security.auth_pub("topic", "u2") == am::security::authorization::type::deny);
// deliver
BOOST_CHECK(security.auth_sub_user(security.auth_sub("topic"), "u1") == am::security::authorization::type::allow);
BOOST_CHECK(security.auth_sub_user(security.auth_sub("topic"), "u2") == am::security::authorization::type::deny);
}
The same code is in https://github.com/redboltz/async_mqtt/pull/30/files
Expected behavior is for u1 pub/sub/deliver all allowed and u2 pub/sub/deliver all denied,.
However, the following check is failed:
BOOST_CHECK(security.auth_sub_user(security.auth_sub("topic"), "u1") == am::security::authorization::type::allow);
If I commented out the following part of the json, then all checks are passed.
{
# Configure username/login
"authentication": [
{
"name": "u1",
"method": "plain_password",
"password": "hoge"
}
,
{
"name": "u2",
"method": "plain_password",
"password": "hoge"
}
],
# Give access to topics
"authorization": [
{
"topic": "#",
"allow": { "pub":["u1"], "sub":["u1"] }
}
# ,
# {
# "topic": "#",
# "deny": { "pub":["u2"], "sub":["u2"] }
# }
]
}
@kleunen any ideas?
I have been dealing with this problem for a while now and hoping that you could help me.
What is the best way to know it the connection has died or has been lost? I cannot find the API to get this information.
So I have the following code for receiving a packet. Sometimes the incorrect packet is received, and I need to know of what type that packet is. Is there a way to know this?
auto publish_recv = mqtt_client_->recv(async_mqtt::filter::match, { async_mqtt::control_packet_type::publish}, asio::use_awaitable)
auto *publish_packet = publish_recv.template get_if<async_mqtt::v5::publish_packet>();
if (publish_packet == nullptr) {
...
}
It would be much easier to install this library from vcpkg manager.
I saw you made it at least once for mqtt_cpp
I am trying to build this library under Ubuntu 20.04.
Is it because I have to use gcc-12?
The build process fails with following errors:
include/async_mqtt/broker/session_state.hpp:408:22: error: no matching member function for call to 'erase'
handles_.erase(*handle);
Next one is shorted
async_mqtt/broker/session_state.hpp:682:50: error: no type named 'handle' in 'async_mqtt::multiple_subscription_map<async_mqtt::buffer, ....
using elem_t = typename sub_con_map<epsp_t>::handle;
async_mqtt/include/async_mqtt/broker/session_state.hpp:417:27: error: no matching member function for call to 'erase'
subs_map_.erase(h, client_id_);
Boost version 1.82.0
bootstrapped
clang version 10.0.0-4ubuntu1
Target: x86_64-pc-linux-gnu
Thread model: posix
InstalledDir: /usr/bin
Actually I took all the commands out from Dockerfile
mkdir async_mqtt_build && cd ./async_mqtt_build
cmake \
-DCMAKE_CXX_COMPILER=clang++ \
-DCMAKE_BUILD_TYPE=Release \
-DCMAKE_CXX_FLAGS="-std=c++17 -O3" \
-DASYNC_MQTT_USE_TLS=ON \
-DASYNC_MQTT_USE_WS=ON \
-DASYNC_MQTT_USE_STR_CHECK=OFF \
-DASYNC_MQTT_USE_LOG=ON \
-DASYNC_MQTT_BUILD_TOOLS=ON \
..
Output is:
-- The C compiler identification is GNU 9.4.0
-- The CXX compiler identification is Clang 10.0.0
-- Detecting C compiler ABI info
-- Detecting C compiler ABI info - done
-- Check for working C compiler: /usr/bin/cc - skipped
-- Detecting C compile features
-- Detecting C compile features - done
-- Detecting CXX compiler ABI info
-- Detecting CXX compiler ABI info - done
-- Check for working CXX compiler: /usr/bin/clang++ - skipped
-- Detecting CXX compile features
-- Detecting CXX compile features - done
-- Setting minimum C++ standard to C++17
-- Dynamically linking with Boost
-- Dynamically linking with Openssl
-- Performing Test CMAKE_HAVE_LIBC_PTHREAD
-- Performing Test CMAKE_HAVE_LIBC_PTHREAD - Failed
-- Looking for pthread_create in pthreads
-- Looking for pthread_create in pthreads - not found
-- Looking for pthread_create in pthread
-- Looking for pthread_create in pthread - found
-- Found Threads: TRUE
-- TLS enabled
-- WS enabled
-- Logging enabled
-- Found Boost: /usr/local/lib/cmake/Boost-1.82.0/BoostConfig.cmake (found suitable version "1.82.0", minimum required is "1.81.0") found components: log
-- Found OpenSSL: /usr/lib/x86_64-linux-gnu/libcrypto.so (found version "1.1.1f")
-- Examples disabled
-- Tools enabled
-- Found Boost: /usr/local/lib/cmake/Boost-1.82.0/BoostConfig.cmake (found suitable version "1.82.0", minimum required is "1.81.0") found components: program_options
-- Found Doxygen: /usr/bin/doxygen (found version "1.8.17") found components: doxygen missing components: dot
-- Configuring done
-- Generating done
-- Build files have been written to: /home/user/Development/async_mqtt/async_mqtt_build
cmake --build . --target broker bench client_cli
In file included from /var/async_mqtt/tool/broker.cpp:20:
In file included from /var/async_mqtt/include/async_mqtt/broker/broker.hpp:14:
/var/async_mqtt/include/async_mqtt/broker/session_state.hpp:682:50: error: no type named 'handle' in 'async_mqtt::multiple_subscription_map<async_mqtt::buffer, async_mqtt::subscription<async_mqtt::epsp_wrap<std::shared_ptr<async_mqtt::basic_endpoint_variant<async_mqtt::role::server, 2, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>>, boost::beast::websocket::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>>, true>, boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>>>, boost::beast::websocket::stream<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>>>, true>>>>>, boost::hash<async_mqtt::buffer>>'
using elem_t = typename sub_con_map<epsp_t>::handle;
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^~~~~~
/usr/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/refwrap.h:119:25: note: in instantiation of template class 'async_mqtt::session_state<async_mqtt::epsp_wrap<std::shared_ptr<async_mqtt::basic_endpoint_variant<async_mqtt::role::server, 2, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>>, boost::beast::websocket::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>>, true>, boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>>>, boost::beast::websocket::stream<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>>>, true>>>>>' requested here
__void_t<typename _Functor::result_type>>
This happens g++11 (ubuntu 22.04 default) libstdc++. If clang++ uses the g++11 libstdc++ then the same errors could happen.
Use g++12 or later.
I want to implement 2 clients with feature of publisher and subscriber in the same client.
Client 1: Publisher will send data in every 1 second and subscriber will listen for any incoming message in another topic.
Client 2: Subscriber will receive any data and as soon as it receives the data it will publish the data to another topic.
Can you provide me with any examples? I am new to this library.
This might be due to my own ignorance but I am having trouble initialing a client with std::optional
#include <async_mqtt/all.hpp>
#include <boost/asio.hpp>
auto main(int argc, char* argv[]) -> int {
std::optional<async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt>> mqtt_client_;
boost::asio::io_context io_ctx{};
mqtt_client_ =
async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt>{ async_mqtt::protocol_version::v5,
io_ctx.get_executor() };
io_ctx.run();
return 0;
}
this results in the compilation error:
/home/magni-the-developer/Documents/framework/vcpkg-sysroot-clang/x64-linux-clang/include/boost/asio/strand.hpp:129:15: error: no viable overloaded '='
executor_ = BOOST_ASIO_MOVE_CAST(Executor)(other.executor_);
~~~~~~~~~ ^ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/home/magni-the-developer/Documents/framework/vcpkg-sysroot-clang/x64-linux-clang/include/async_mqtt/store.hpp:28:7: note: in instantiation of member function 'boost::asio::strand<const boost::asio::io_context::basic_executor_type<std::allocator<void>, 0> &>::operator=' requested here
class store {
^
/home/magni-the-developer/Documents/framework/vcpkg-sysroot-clang/x64-linux-clang/include/boost/asio/io_context.hpp:696:24: note: candidate function not viable: 'this' argument has type 'const boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>', but method is not marked const
basic_executor_type& operator=(
^
/home/magni-the-developer/Documents/framework/vcpkg-sysroot-clang/x64-linux-clang/include/boost/asio/io_context.hpp:701:24: note: candidate function not viable: 'this' argument has type 'const boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>', but method is not marked const
basic_executor_type& operator=(
When attempting to recv a certain type of packet the library gives me an exception when decoding its value.
Relevant code:
auto puback_received = co_await mqtt_client_->recv(async_mqtt::filter::match, {async_mqtt::control_packet_type::puback}, asio::use_awaitable);
auto puback_packet = puback_received.template get<async_mqtt::v5::puback_packet>();
std::cout << puback_packet.code() << std::endl;
I am using the function as an asynchronous function co_await
-ing its value. Whenever I attempt to do this I get the following error:
Exception in send: std::get: wrong index for variant
I might be using the get
function wrong but I have tried other ways and always have the same error.
Hello,
Sorry for the intrusion.
Just wondering what would be the best way to implement a polling mechanism (after having subscribed to a topic) that would trigger a callback whenever a new message is received.
So far I only found examples that would receive a single message and then disconnect from the broker.
I need to implement an endpoint that would maintain a connection with the broker and that would permanently subscribe to a specific topic and keep receiving messages on said topic.
Thanks in advance.
Elia
I am calling recv with filter like this
auto connack_received = co_await mqtt_client_->recv(async_mqtt::filter::match, { async_mqtt::control_packet_type::connack }, asio::use_awaitable);
and I get a compiler error stating:
/home/magni-the-developer/Documents/framework/vcpkg-sysroot/x64-linux-gcc/include/async_mqtt/endpoint.hpp:442:30: error: no match for call to ‘(boost::asio::use_awaitable_t<>) (std::remove_reference_t<async_mqtt::basic_packet_variant<2>&>)’
442 | token(force_move(pv));
| ~~~~~^~~~~~~~~~~~~~~~
It's not an issue per se, it's rather a question because I kinda don't fully understand how to properly use the acquire_packet_id method. It's probably due to my misunderstanding of how to use it.
Recently I've stumbled upon the error:
td::optional<typename async_mqtt::packet_id_type<PacketIdBytes>::type> async_mqtt::basic_endpoint<Role, PacketIdBytes, NextLayer>::acquire_unique_packet_id() [with async_mqtt::role Role = async_mqtt::role::client; long unsigned int PacketIdBytes = 2; NextLayer = boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0> >; typename async_mqtt::packet_id_type<PacketIdBytes>::type = short unsigned int]: Assertion
strand().running_in_this_thread()' failed.`
It took me to take a close look at the documentation and the code of the library.
There is a part saying that: "f you call those APIs out of strand, then assertion failed.". This statement is followed by a table where one of the entries is: "acquire_unique_packet_id".
Now, let me briefly show (using pseudocode) how I use your client in my code, the code is simple and short so it should be quite easy to understand.
// ClientWrapper - is a wrapper that is using async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt>; under the hood
// rethrow - is a function that just rethrows an exception
class ClientWrapper
{
boost::asio::awaitable<void> async_send(....)
{
auto packet_id = ep_.acquire_packet_id();
[...]
}
boost::asio::awaitable<...> async_receive()
{
[...]
}
private:
async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt> ep_;
};
auto ioc = io_context{};
auto client1 = ClientWrapper{...};
auto client2 = ClientWrapper{...};
boost::asio::co_spawn(ioc, [&client1, &client2]() -> boost::asio::awaitable<void>()
{
co_await client1.async_send(...);
co_await client2.async_send(...);
co_await client1.async_receive(...);
}, rethrow);
Notice that I'm using SYNC version of acquire_packet_id.
So, the error I showed before, and the documentation that says that there is an async version acquire_packet_id took me to the other solution where I replaced SYNC call to acquire_packet_id with async version with boost::use_awaitable completition hander.
So my questions are:
When I run clang-tidy on my code, I get a warning regarding the basic_publish_packet constructor. Looking at the code that is checked in at main right now; first a move of topic_name is done at row 56. Later on topic_name.size() is executed at row 71.
I believe that topic_name_.size() should be called instead.
Regards,
Christian
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.