Code Monkey home page Code Monkey logo

async_mqtt's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

async_mqtt's Issues

FYI, c++20 coroutines example

You can close when you have read this (maybe enable discussions in this repo)

I did not finish the example but here is how you can do coroutines without boost yield

#include <boost/asio.hpp>

#include <async_mqtt/all.hpp>

namespace asio = boost::asio;
namespace am = async_mqtt;

static asio::awaitable<void> make_coro(asio::io_context& ctx) {
  asio::ip::tcp::socket resolve_sock{ctx};
  asio::ip::tcp::resolver res{resolve_sock.get_executor()};
  am::endpoint<am::role::client, am::protocol::mqtt> amep {
    am::protocol_version::v3_1_1,
    ctx.get_executor()
  };
  asio::ip::tcp::resolver::results_type resolved_ip = co_await res.async_resolve("localhost", "1883", asio::use_awaitable);

  [[maybe_unused]] asio::ip::tcp::endpoint endpoint = co_await asio::async_connect(amep.next_layer(), resolved_ip, asio::use_awaitable);

  co_await amep.send(am::v3_1_1::connect_packet{
      true,   // clean_session
      0x1234, // keep_alive
      am::allocate_buffer("cid1"),
      am::nullopt, // will
      am::nullopt, // username set like am::allocate_buffer("user1"),
      am::nullopt  // password set like am::allocate_buffer("pass1")
  }, asio::use_awaitable);

  [[maybe_unused]] am::packet_variant packet_variant = co_await amep.recv(asio::use_awaitable);
  // todo do something with the above packet

  co_await amep.send(am::v3_1_1::publish_packet{ *amep.acquire_unique_packet_id(), am::allocate_buffer("signal_topic"),
                                        am::allocate_buffer("signal_payload"), am::qos::at_least_once }, asio::use_awaitable);

}


int main (int argc, char* argv[]) {

  asio::io_context ctx{};

  asio::co_spawn(ctx, make_coro(ctx), asio::detached);

  ctx.run();

  return EXIT_SUCCESS;
}

Btw, nice library!

Access the value of Session Expiry Interval.

Hi @redboltz , I am trying to access the value of Session Expiry Interval from a connect_packet. I have the following code:

auto conn_packet = std::get_if<async_mqtt::v5::connect_packet>(&packet);

for (auto &p: conn_packet->props()) {
       std::cout << p << std::endl;
       std::cout << p.id() << std::endl;
}

which gives me the following output:

{id:session_expiry_interval,val:0}
session_expiry_interval

but I can't seem to find the function to retrieve the value, i.e. I want to be able to assign 0 to a variable.

How can I do this?

Supporting async when filtering.

    recv(
        filter fil,
        std::set<control_packet_type> types,
        CompletionToken&& token
    ) {

I am trying to filter received packages by their type and was wondering why the function with package filtering doesn't support async as the normal recv function without filtering does support async.

    recv(
        CompletionToken&& token
    ) {

Reason codes

I was working with std::error_codes and reason codes in my application and came across something interesting which might be a good addition to this library. It is possible to define async_mqtt reason codes as std::error_codes in order to make them more accessible.

Example

namespace detail {

class suback_error_category : public std::error_category {
public:
  virtual const char* name() const noexcept override { return "async_mqtt::suback_error_category"; }
  virtual std::string message(int value) const override {
    return suback_reason_code_to_str(static_cast<suback_reason_code>(value));
  }
};

}  // namespace detail

const std::error_category& suback_error_category() {
  static detail::suback_error_category instance;
  return instance;
}

inline std::error_code make_error_code(suback_reason_code v) {
  return std::error_code(static_cast<uint8_t>(v), async_mqtt::suback_error_category());
}

This is just for suback_reason_code to give you the idea. Using this I can call:

    for (auto const& entry : suback_packet.entries()) {
      if (entry != async_mqtt::suback_reason_code::granted_qos_0) {
        logger_.error("Error subscribing to topic: {}, reason code: {}", ncmd_topic,
                      async_mqtt::suback_reason_code_to_str(entry));
        co_return async_mqtt::make_error_code(entry);
      }
    }

and get a std::error_code from the async_mqtt reason codes. Do you like this? I can write for other reason codes and PR if you'd like.

Nullpointer dereference on connected, moved constructed endpoint

Hi I've got a question regarding the behavior I've observed. It may not be a bug or sth, maybe I just use the endpoint in the wrong way. Anyway, I have an endpoint that I connect to a broker, keep alive is set to 15 sec. Once I establish a connection with the broker I move the endpoint variable into the other variable let's call it endpoint2 (it is a simplified example based on my code, just to ease reproduction). Then, using endpoint2 variable I try to receive some packets. After about 15 seconds (the same as keep alive) the code gets crashed.
The "stack trace" is pretty long thus I'm showing only the relevant parts:

endpoint.hpp:138:31: runtime error: member call on null pointer of type 'struct element_type'
pc_0x557f6e551b55###func_async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::strand()###file_/home/bielpa/.priv/repo/hhctrl-mgmt/cmake-build-linux_gcc_debug_integration_tests_sanitizers/deps/async_mqtt-src-src/include/async_mqtt/endpoint.hpp###line_138###obj(home_assistant_it_v2+0x110db55)
pc_0x557f6e859bb9###func_void async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::send_impl<async_mqtt::v3_1_1::pingreq_packet>::operator()<boost::asio::detail::composed_op<async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::send_impl<async_mqtt::v3_1_1::pingreq_packet>, boost::asio::detail::composed_work<void ()>, async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::reset_pingreq_send_timer()::{lambda(boost::system::error_code const&)#1}::operator()(boost::system::error_code const&) const::{lambda(async_mqtt::system_error const&)#1}, void (async_mqtt::system_error)> >(boost::asio::detail::composed_op<async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::send_impl<async_mqtt::v3_1_1::pingreq_packet>, boost::asio::detail::composed_work<void ()>, async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::reset_pingreq_send_timer()::{lambda(boost::system::error_code const&)#1}::operator()(boost::system::error_code const&) const::{lambda(async_mqtt::system_error const&)#1}, void (async_mqtt::system_error)>&, boost::system::error_code const&, unsigned long)###file_/home/bielpa/.priv/repo/hhctrl-mgmt/cmake-build-linux_gcc_debug_integration_tests_sanitizers/deps/async_mqtt-src-src/include/async_mqtt/endpoint.hpp###line_860###obj(home_assistant_it_v2+0x1415bb9)
pc_0x557f6e83b430###func_void boost::asio::detail::composed_op<async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::send_impl<async_mqtt::v3_1_1::pingreq_packet>, boost::asio::detail::composed_work<void ()>, async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::reset_pingreq_send_timer()::{lambda(boost::system::error_code const&)#1}::operator()(boost::system::error_code const&) const::{lambda(async_mqtt::system_error const&)#1}, void (async_mqtt::system_error)>::operator()<>()###file_/home/bielpa/.conan/data/boost/1.82.0///package/8cc3305c27e5ff838d1c7590662e309638310dfc/include/boost/asio/compose.hpp###line_107###obj_(home_assistant_it_v2+0x13f7430)
pc_0x557f6e823011###func_void boost::asio::detail::initiate_composed_op<void (async_mqtt::system_error), void ()>::operator()<async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::reset_pingreq_send_timer()::{lambda(boost::system::error_code const&)#1}::operator()(boost::system::error_code const&) const::{lambda(async_mqtt::system_error const&)#1}, async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::send_impl<async_mqtt::v3_1_1::pingreq_packet> >(async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::reset_pingreq_send_timer()::{lambda(boost::system::error_code const&)#1}::operator()(boost::system::error_code const&) const::{lambda(async_mqtt::system_error const&)#1}&&, async_mqtt::basic_endpoint<(async_mqtt::role)1, 2ul, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator, 0ul> > >::send_impl<async_mqtt::v3_1_1::pingreq_packet>&&) const###file_/home/bielpa/.conan/data/boost/1.82.0///package/8cc3305c27e5ff838d1c7590662e309638310dfc/include/boost/asio/compose.hpp###line_275###obj_(home_assistant_it_v2+0x13df011)

I guess that after those 15 seconds the ping timer expires and the endpoint wants to send a ping req, however, due to the previous move operation on the endpoint variable something goes wrong. Thus my question is - Is the move operation on the endpoint that was already connected allowed? Of course, it can be easily mitigated by e.g. allocating the endpoint on the heap, however I would like to hear your opinion.

Below is the code you can use to reproduce this "issue".

#include <catch2/catch_all.hpp>

#include <boost/asio/io_context.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/connect.hpp>
#include <async_mqtt/all.hpp>
#include <spdlog/spdlog.h>

void rethrow(std::exception_ptr ptr)
{
    if (ptr) {
        std::rethrow_exception(ptr);
    }
}

TEST_CASE("Client can connect to broker 2")
{
    auto ioc = boost::asio::io_context{};
    auto ep = async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt> {
        async_mqtt::protocol_version::v3_1_1, ioc.get_executor()
    };

    // NOLINTBEGIN
    boost::asio::co_spawn(ioc.get_executor(), [&ep, &ioc]() mutable -> boost::asio::awaitable<void> {

        // Resolve
        auto resolver = boost::asio::ip::tcp::resolver{ep.next_layer().get_executor()};
        auto eps = co_await resolver.async_resolve("127.0.0.1", "1883", boost::asio::use_awaitable);

        // Connect
        co_await boost::asio::async_connect(
                ep.next_layer(),
                eps,
                boost::asio::use_awaitable);

        // Send connect packet
        auto packet = async_mqtt::v3_1_1::connect_packet {
                true,
                15,
                async_mqtt::allocate_buffer("some_unique_id_1234567"),
                std::nullopt,
                async_mqtt::allocate_buffer("test_user"), // credentials for my broker
                async_mqtt::allocate_buffer("test")
        };

        if (auto system_error = co_await ep.send(std::move(packet), boost::asio::use_awaitable)) {
            throw std::runtime_error{system_error.what()};
        }

        // Recv connack packet
        if (async_mqtt::packet_variant packet_variant = co_await ep.recv(boost::asio::use_awaitable)) {
            packet_variant.visit(
                    async_mqtt::overload{
                            [](const auto&) {
                            }
                    });
        } else {
            throw std::runtime_error{packet_variant.get<async_mqtt::system_error>().message()};
        }

        auto new_ep = std::move(ep);

        // Receive
        while (true) {
            if (auto recv_packet = co_await new_ep.recv(boost::asio::use_awaitable)) {
                auto type = recv_packet.type();
                if (!type) {
                    throw std::runtime_error{"Invalid packet"};
                }
                spdlog::get("async_mqtt_client")->debug("Packet received: {}", async_mqtt::control_packet_type_to_str(type.value()));
            } else {
                throw std::runtime_error{recv_packet.template get<async_mqtt::system_error>().message()};
            }
        }
    }, rethrow);
    // NOLINTEND

    ioc.run();
}

Finding async_mqtt, CMake

Hi there,
there does not seem to be a CMake-macro FindAsyncMQTT.cmake (or similar) that allows searching for the library? Before I try to invent such a macro -- is this an oversight from me?
Kind Regards,
Rüdiger

Send doesn't queue messages and incorrectly reports status of messages.

// using namespace std;
namespace asio = boost::asio;

class test_class {
private:
  asio::io_context& ctx_;
  std::shared_ptr<async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt>> mqtt_client;

public:
  test_class(asio::io_context& ctx)
      : ctx_(ctx), mqtt_client(std::make_shared<async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt>>(
                       async_mqtt::protocol_version::v3_1_1,
                       ctx_.get_executor())) {
    asio::co_spawn(mqtt_client->strand(), test_func(), asio::detached);
  }

  auto test_func() -> asio::awaitable<void> {
    asio::ip::tcp::socket resolve_sock{ ctx_ };
    asio::ip::tcp::resolver res{ resolve_sock.get_executor() };
    asio::ip::tcp::resolver::results_type resolved_ip = co_await res.async_resolve("localhost", "1883", asio::use_awaitable);

    try {
      asio::ip::tcp::endpoint endpoint =
          co_await asio::async_connect(mqtt_client->next_layer(), resolved_ip, asio::use_awaitable);

      std::cout << endpoint.address().to_string() << '\n';
    } catch (const std::exception& e) {
      std::cout << "Exception during connect: " << e.what() << '\n';
    } catch (...) {
      std::cout << "Unknown exception during connect\n";
    }

    co_await mqtt_client->send(
        async_mqtt::v3_1_1::connect_packet{ true, 0x1234, async_mqtt::allocate_buffer("cid1"), async_mqtt::nullopt,
                                            async_mqtt::nullopt, async_mqtt::nullopt },
        asio::use_awaitable);

    async_mqtt::packet_variant packet_variant = co_await mqtt_client->recv(asio::use_awaitable);

    co_await mqtt_client->send(
        async_mqtt::v3_1_1::publish_packet{ mqtt_client->acquire_unique_packet_id().value(),
                                            async_mqtt::allocate_buffer("test_topic"),
                                            async_mqtt::allocate_buffer("test_payload"), async_mqtt::qos::at_least_once },
        asio::use_awaitable);

    std::string payload = "n";
    while (true) {
      std::cout << "Sending payload: " << payload << "\n";
      auto result = co_await mqtt_client->send(
          async_mqtt::v3_1_1::publish_packet{ mqtt_client->acquire_unique_packet_id().value(),
                                              async_mqtt::allocate_buffer("test_topic"),
                                              async_mqtt::allocate_buffer(payload), async_mqtt::qos::at_least_once },
          asio::use_awaitable);

      if (!result) {
        std::cout << "Successfully sent payload\n";
      } else {
        std::cout << "Failed to send payload\n";
      }

      asio::steady_timer timer{ ctx_, std::chrono::seconds{ 3 } };
      co_await timer.async_wait(asio::use_awaitable);
      payload += "n";
    }
    co_return;
  }

};

auto main() -> int {
  asio::io_context ctx{};
  test_class tester(ctx);
  ctx.run();
  return 0;
}
[2023-06-27 12:09:21.979104] [0x00007fb47e4de780] [info]    send:v3_1_1::connect{cid:cid1,ka:4660,cs:true}
[2023-06-27 12:09:21.979115] [0x00007fb47e4de780] [info]    [store] clear
[2023-06-27 12:09:21.979142] [0x00007fb47e4de780] [info]    recv
[2023-06-27 12:09:21.979174] [0x00007fb47e4de780] [info]    recv:v3_1_1::connack{rc:accepted,sp:false}
[2023-06-27 12:09:21.979178] [0x00007fb47e4de780] [info]    [store] clear
[2023-06-27 12:09:21.979181] [0x00007fb47e4de780] [info]    acquire_unique_packet_id:1
[2023-06-27 12:09:21.979186] [0x00007fb47e4de780] [info]    send:v3_1_1::publish{topic:test_topic,qos:at_least_once,retain:no,dup:no,pid:1}
Sending payload: n
[2023-06-27 12:09:21.979202] [0x00007fb47e4de780] [info]    acquire_unique_packet_id:2
[2023-06-27 12:09:21.979204] [0x00007fb47e4de780] [info]    send:v3_1_1::publish{topic:test_topic,qos:at_least_once,retain:no,dup:no,pid:2}
Successfully sent payload
Sending payload: nn
[2023-06-27 12:09:24.979298] [0x00007fb47e4de780] [info]    acquire_unique_packet_id:3
[2023-06-27 12:09:24.979318] [0x00007fb47e4de780] [info]    send:v3_1_1::publish{topic:test_topic,qos:at_least_once,retain:no,dup:no,pid:3}
Successfully sent payload
Sending payload: nnn
[2023-06-27 12:09:27.979727] [0x00007fb47e4de780] [info]    acquire_unique_packet_id:4
[2023-06-27 12:09:27.979746] [0x00007fb47e4de780] [info]    send:v3_1_1::publish{topic:test_topic,qos:at_least_once,retain:no,dup:no,pid:4}
Successfully sent payload
Sending payload: nnnn
[2023-06-27 12:09:30.980130] [0x00007fb47e4de780] [info]    acquire_unique_packet_id:5
[2023-06-27 12:09:30.980148] [0x00007fb47e4de780] [info]    send:v3_1_1::publish{topic:test_topic,qos:at_least_once,retain:no,dup:no,pid:5}
Successfully sent payload
Sending payload: nnnnn
[2023-06-27 12:09:33.980610] [0x00007fb47e4de780] [info]    acquire_unique_packet_id:6
[2023-06-27 12:09:33.980627] [0x00007fb47e4de780] [info]    send:v3_1_1::publish{topic:test_topic,qos:at_least_once,retain:no,dup:no,pid:6}
[2023-06-27 12:09:33.980655] [0x00007fb47e4de780] [info]    send error:Broken pipe
Failed to send payload
Sending payload: nnnnnn
[2023-06-27 12:09:36.980985] [0x00007fb47e4de780] [info]    acquire_unique_packet_id:7
[2023-06-27 12:09:36.981003] [0x00007fb47e4de780] [info]    send:v3_1_1::publish{topic:test_topic,qos:at_least_once,retain:no,dup:no,pid:7}

image

I encountered this error while working in a larger project but tried to isolate it to the minimum viable example.

In order to test this I am sending a new payload with an added "n" every 3 seconds. The first three arrive succcessfully at the MQTT broker. This is when I turn off the MQTT broker. When this happens the next one reports as being successfully sent (this is incorrect), but all subsequent messages report as being unsuccessfully sent.

Perhaps I am using the library incorrectly but if so, I would like to know where I am incorrect. In a larger codebase I have slightly different code which tries to as forcefully as possible to send the subsequent messages. When this happens the new messages are sent but the missing ones are left out. This would be equivalent to, in this example, to the MQTT broker receiveing:

["n", "nn", "nnn", "nnnnn", "nnnnnn"].

Leaving out the messages when the broker was down.

SHA library

Now, async_mqtt is using picosha2.h.
https://github.com/redboltz/async_mqtt/blob/main/include/async_mqtt/external/picosha2.h

A part of broker code use it. https://github.com/redboltz/async_mqtt/blob/main/include/async_mqtt/broker/security.hpp
It is for sha256 password digest.

struct authentication {
enum class method {
sha256,
plain_password,
client_cert,
anonymous,
unauthenticated
};

If I propose async_mqtt to the Boost Libraries, this external dependency should be solved.

Supporting async when filtering.

    recv(
        filter fil,
        std::set<control_packet_type> types,
        CompletionToken&& token
    ) {

I am trying to filter received packages by their type and was wondering why this function doesn't support async as the normal recv function without filtering does support async.

    recv(
        CompletionToken&& token
    ) {

Build error: "use of deleted function"

Hello,
Thank you for writing this async_mqtt, it looks great!
I'm trying to build version 2.0.0 with GCC 9.3.0. I'm cross-compiling using arm-oe-linux-gnueabi-g++.

The build is failing right from the start:

/arm-oe-linux-gnueabi-g++ --sysroot=/home/bjc/quectel_sensor_sdk/rm520nglaar/ql-ol-extsdk/ql-sysroots -DASYNC_MQTT_USE_TLS -Iasync_mqtt-2.0.0/include -isystem /home/bjc/quectel_sensor_sdk/boost_1_81_0 -march=armv7-a -marm -mfpu=neon -mfloat-abi=hard -Os -DNDEBUG -std=c++2a -MD -MT example/CMakeFiles/ep_cb_mqtt_client.dir/ep_cb_mqtt_client.cpp.o -MF CMakeFiles/ep_cb_mqtt_client.dir/ep_cb_mqtt_client.cpp.o.d -o CMakeFiles/ep_cb_mqtt_client.dir/ep_cb_mqtt_client.cpp.o -c async_mqtt-2.0.0/example/ep_cb_mqtt_client.cpp In file included from async_mqtt-2.0.0/include/async_mqtt/packet/property_variant.hpp:11, from async_mqtt-2.0.0/include/async_mqtt/packet/will.hpp:12, from async_mqtt-2.0.0/include/async_mqtt/packet/v3_1_1_connect.hpp:26, from async_mqtt-2.0.0/include/async_mqtt/packet/packet_variant.hpp:11, from async_mqtt-2.0.0/include/async_mqtt/buffer_to_packet_variant.hpp:12, from async_mqtt-2.0.0/include/async_mqtt/all.hpp:11, from async_mqtt-2.0.0/example/ep_cb_mqtt_client.cpp:12: async_mqtt-2.0.0/include/async_mqtt/packet/property.hpp: In constructor ‘async_mqtt::property::detail::binary_property::binary_property(async_mqtt::property::id, async_mqtt::buffer)’: async_mqtt-2.0.0/include/async_mqtt/packet/property.hpp:142:19: error: use of deleted function ‘async_mqtt::buffer::buffer(const async_mqtt::buffer&)’ 142 | length_(2) // size 2 | ^ In file included from async_mqtt-2.0.0/include/async_mqtt/all.hpp:10, from async_mqtt-2.0.0/example/ep_cb_mqtt_client.cpp:12: async_mqtt-2.0.0/include/async_mqtt/buffer.hpp:48:5: note: ‘async_mqtt::buffer::buffer(const async_mqtt::buffer&) noexcept’ is implicitly deleted because its exception-specification does not match the implicit exception-specification ‘’ 48 | buffer(buffer const& other) noexcept = default; | ^~~~~~

broker auth logic doesn't work well for delivery

It seems that the authorization logic for delivery doesn't seems to work well.

I wrote the following simple test case:

BOOST_AUTO_TEST_CASE(allow_pertial_deny_group) {
    am::security security;
    std::string test = R"*(
        {
            # Configure username/login
            "authentication": [
                {
                    "name": "u1",
                    "method": "plain_password",
                    "password": "hoge"
                }
                ,
                {
                    "name": "u2",
                    "method": "plain_password",
                    "password": "hoge"
                }
            ],
            # Give access to topics
            "authorization": [
                {
                    "topic": "#",
                    "allow": { "pub":["u1"], "sub":["u1"] }
                }
                ,
                {
                    "topic": "#",
                    "deny": { "pub":["u2"], "sub":["u2"] }
                }
            ]
        }
        )*";
    BOOST_CHECK_NO_THROW(load_config(security, test));

    // sub
    BOOST_CHECK(security.is_subscribe_authorized("u1", "topic"));
    BOOST_CHECK(!security.is_subscribe_authorized("u2", "topic"));

    // pub
    BOOST_CHECK(security.auth_pub("topic", "u1") == am::security::authorization::type::allow);
    BOOST_CHECK(security.auth_pub("topic", "u2") == am::security::authorization::type::deny);

    // deliver
    BOOST_CHECK(security.auth_sub_user(security.auth_sub("topic"), "u1") == am::security::authorization::type::allow);
    BOOST_CHECK(security.auth_sub_user(security.auth_sub("topic"), "u2") == am::security::authorization::type::deny);
}

The same code is in https://github.com/redboltz/async_mqtt/pull/30/files

Expected behavior is for u1 pub/sub/deliver all allowed and u2 pub/sub/deliver all denied,.

However, the following check is failed:

    BOOST_CHECK(security.auth_sub_user(security.auth_sub("topic"), "u1") == am::security::authorization::type::allow);

If I commented out the following part of the json, then all checks are passed.

        {
            # Configure username/login
            "authentication": [
                {
                    "name": "u1",
                    "method": "plain_password",
                    "password": "hoge"
                }
                ,
                {
                    "name": "u2",
                    "method": "plain_password",
                    "password": "hoge"
                }
            ],
            # Give access to topics
            "authorization": [
                {
                    "topic": "#",
                    "allow": { "pub":["u1"], "sub":["u1"] }
                }
#                ,
#                {
#                    "topic": "#",
#                    "deny": { "pub":["u2"], "sub":["u2"] }
#                }
            ]
        }

@kleunen any ideas?

Knowing if the connection is open

I have been dealing with this problem for a while now and hoping that you could help me.

What is the best way to know it the connection has died or has been lost? I cannot find the API to get this information.

Receiving packet

So I have the following code for receiving a packet. Sometimes the incorrect packet is received, and I need to know of what type that packet is. Is there a way to know this?

auto publish_recv = mqtt_client_->recv(async_mqtt::filter::match, { async_mqtt::control_packet_type::publish}, asio::use_awaitable)

auto *publish_packet = publish_recv.template get_if<async_mqtt::v5::publish_packet>();

if (publish_packet == nullptr) {
   ...
}

vcpkg version of library

It would be much easier to install this library from vcpkg manager.

I saw you made it at least once for mqtt_cpp

Could not compile under Ubuntu 20.04

I am trying to build this library under Ubuntu 20.04.

Is it because I have to use gcc-12?

The build process fails with following errors:

include/async_mqtt/broker/session_state.hpp:408:22: error: no matching member function for call to 'erase'
    handles_.erase(*handle);

Next one is shorted

async_mqtt/broker/session_state.hpp:682:50: error: no type named 'handle' in 'async_mqtt::multiple_subscription_map<async_mqtt::buffer, ....

    using elem_t = typename sub_con_map<epsp_t>::handle;
async_mqtt/include/async_mqtt/broker/session_state.hpp:417:27: error: no matching member function for call to 'erase'
      subs_map_.erase(h, client_id_);

Boost version

Boost version 1.82.0 bootstrapped

Compiler

clang version 10.0.0-4ubuntu1 
Target: x86_64-pc-linux-gnu
Thread model: posix
InstalledDir: /usr/bin

Steps to reproduce

Actually I took all the commands out from Dockerfile

1. Clone repository

2. Create build folder

mkdir async_mqtt_build && cd ./async_mqtt_build

3. Cmake

cmake \
       -DCMAKE_CXX_COMPILER=clang++ \
       -DCMAKE_BUILD_TYPE=Release \
       -DCMAKE_CXX_FLAGS="-std=c++17 -O3" \
       -DASYNC_MQTT_USE_TLS=ON \
       -DASYNC_MQTT_USE_WS=ON \
       -DASYNC_MQTT_USE_STR_CHECK=OFF \
       -DASYNC_MQTT_USE_LOG=ON \
       -DASYNC_MQTT_BUILD_TOOLS=ON \
       ..

Output is:

-- The C compiler identification is GNU 9.4.0
-- The CXX compiler identification is Clang 10.0.0
-- Detecting C compiler ABI info
-- Detecting C compiler ABI info - done
-- Check for working C compiler: /usr/bin/cc - skipped
-- Detecting C compile features
-- Detecting C compile features - done
-- Detecting CXX compiler ABI info
-- Detecting CXX compiler ABI info - done
-- Check for working CXX compiler: /usr/bin/clang++ - skipped
-- Detecting CXX compile features
-- Detecting CXX compile features - done
-- Setting minimum C++ standard to C++17
-- Dynamically linking with Boost
-- Dynamically linking with Openssl
-- Performing Test CMAKE_HAVE_LIBC_PTHREAD
-- Performing Test CMAKE_HAVE_LIBC_PTHREAD - Failed
-- Looking for pthread_create in pthreads
-- Looking for pthread_create in pthreads - not found
-- Looking for pthread_create in pthread
-- Looking for pthread_create in pthread - found
-- Found Threads: TRUE  
-- TLS enabled
-- WS enabled
-- Logging enabled
-- Found Boost: /usr/local/lib/cmake/Boost-1.82.0/BoostConfig.cmake (found suitable version "1.82.0", minimum required is "1.81.0") found components: log 
-- Found OpenSSL: /usr/lib/x86_64-linux-gnu/libcrypto.so (found version "1.1.1f")  
-- Examples disabled
-- Tools enabled
-- Found Boost: /usr/local/lib/cmake/Boost-1.82.0/BoostConfig.cmake (found suitable version "1.82.0", minimum required is "1.81.0") found components: program_options 
-- Found Doxygen: /usr/bin/doxygen (found version "1.8.17") found components: doxygen missing components: dot
-- Configuring done
-- Generating done
-- Build files have been written to: /home/user/Development/async_mqtt/async_mqtt_build

4. Build library

cmake --build . --target broker bench client_cli

g++11 compile error

Problem

In file included from /var/async_mqtt/tool/broker.cpp:20:
In file included from /var/async_mqtt/include/async_mqtt/broker/broker.hpp:14:
/var/async_mqtt/include/async_mqtt/broker/session_state.hpp:682:50: error: no type named 'handle' in 'async_mqtt::multiple_subscription_map<async_mqtt::buffer, async_mqtt::subscription<async_mqtt::epsp_wrap<std::shared_ptr<async_mqtt::basic_endpoint_variant<async_mqtt::role::server, 2, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>>, boost::beast::websocket::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>>, true>, boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>>>, boost::beast::websocket::stream<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>>>, true>>>>>, boost::hash<async_mqtt::buffer>>'
    using elem_t = typename sub_con_map<epsp_t>::handle;
                   ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^~~~~~
/usr/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/refwrap.h:119:25: note: in instantiation of template class 'async_mqtt::session_state<async_mqtt::epsp_wrap<std::shared_ptr<async_mqtt::basic_endpoint_variant<async_mqtt::role::server, 2, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>>, boost::beast::websocket::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>>, true>, boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>>>, boost::beast::websocket::stream<boost::asio::ssl::stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>>>, true>>>>>' requested here
                                  __void_t<typename _Functor::result_type>>

This happens g++11 (ubuntu 22.04 default) libstdc++. If clang++ uses the g++11 libstdc++ then the same errors could happen.

Solution

Use g++12 or later.

Publisher and Subscriber in the same MQTT client

I want to implement 2 clients with feature of publisher and subscriber in the same client.

Client 1: Publisher will send data in every 1 second and subscriber will listen for any incoming message in another topic.

Client 2: Subscriber will receive any data and as soon as it receives the data it will publish the data to another topic.

Can you provide me with any examples? I am new to this library.

Using MQTT client with std::optional.

This might be due to my own ignorance but I am having trouble initialing a client with std::optional

#include <async_mqtt/all.hpp>
#include <boost/asio.hpp>

auto main(int argc, char* argv[]) -> int {
  std::optional<async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt>> mqtt_client_;

  boost::asio::io_context io_ctx{};

  mqtt_client_ =
      async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt>{ async_mqtt::protocol_version::v5,
                                                                                  io_ctx.get_executor() };

  io_ctx.run();

  return 0;
}

this results in the compilation error:

/home/magni-the-developer/Documents/framework/vcpkg-sysroot-clang/x64-linux-clang/include/boost/asio/strand.hpp:129:15: error: no viable overloaded '='
    executor_ = BOOST_ASIO_MOVE_CAST(Executor)(other.executor_);
    ~~~~~~~~~ ^ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/home/magni-the-developer/Documents/framework/vcpkg-sysroot-clang/x64-linux-clang/include/async_mqtt/store.hpp:28:7: note: in instantiation of member function 'boost::asio::strand<const boost::asio::io_context::basic_executor_type<std::allocator<void>, 0> &>::operator=' requested here
class store {
      ^
/home/magni-the-developer/Documents/framework/vcpkg-sysroot-clang/x64-linux-clang/include/boost/asio/io_context.hpp:696:24: note: candidate function not viable: 'this' argument has type 'const boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>', but method is not marked const
  basic_executor_type& operator=(
                       ^
/home/magni-the-developer/Documents/framework/vcpkg-sysroot-clang/x64-linux-clang/include/boost/asio/io_context.hpp:701:24: note: candidate function not viable: 'this' argument has type 'const boost::asio::io_context::basic_executor_type<std::allocator<void>, 0>', but method is not marked const
  basic_executor_type& operator=(

Receiving a packet then "get"-ing that packet.

When attempting to recv a certain type of packet the library gives me an exception when decoding its value.

Relevant code:

auto puback_received = co_await mqtt_client_->recv(async_mqtt::filter::match, {async_mqtt::control_packet_type::puback}, asio::use_awaitable);

auto puback_packet = puback_received.template get<async_mqtt::v5::puback_packet>();

std::cout << puback_packet.code() << std::endl;

I am using the function as an asynchronous function co_await-ing its value. Whenever I attempt to do this I get the following error:

 Exception in send: std::get: wrong index for variant

I might be using the get function wrong but I have tried other ways and always have the same error.

Message Poller

Hello,

Sorry for the intrusion.

Just wondering what would be the best way to implement a polling mechanism (after having subscribed to a topic) that would trigger a callback whenever a new message is received.
So far I only found examples that would receive a single message and then disconnect from the broker.
I need to implement an endpoint that would maintain a connection with the broker and that would permanently subscribe to a specific topic and keep receiving messages on said topic.

Thanks in advance.
Elia

Calling recv with filter

I am calling recv with filter like this

auto connack_received = co_await mqtt_client_->recv(async_mqtt::filter::match, { async_mqtt::control_packet_type::connack }, asio::use_awaitable);

and I get a compiler error stating:

/home/magni-the-developer/Documents/framework/vcpkg-sysroot/x64-linux-gcc/include/async_mqtt/endpoint.hpp:442:30: error: no match for call to ‘(boost::asio::use_awaitable_t<>) (std::remove_reference_t<async_mqtt::basic_packet_variant<2>&>)’
  442 |                         token(force_move(pv));
      |                         ~~~~~^~~~~~~~~~~~~~~~

How to properly use acquire_packet_id together with C++ 20 coroutines?

It's not an issue per se, it's rather a question because I kinda don't fully understand how to properly use the acquire_packet_id method. It's probably due to my misunderstanding of how to use it.

Recently I've stumbled upon the error:
td::optional<typename async_mqtt::packet_id_type<PacketIdBytes>::type> async_mqtt::basic_endpoint<Role, PacketIdBytes, NextLayer>::acquire_unique_packet_id() [with async_mqtt::role Role = async_mqtt::role::client; long unsigned int PacketIdBytes = 2; NextLayer = boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::basic_executor_type<std::allocator<void>, 0> >; typename async_mqtt::packet_id_type<PacketIdBytes>::type = short unsigned int]: Assertion strand().running_in_this_thread()' failed.`

It took me to take a close look at the documentation and the code of the library.
There is a part saying that: "f you call those APIs out of strand, then assertion failed.". This statement is followed by a table where one of the entries is: "acquire_unique_packet_id".

Now, let me briefly show (using pseudocode) how I use your client in my code, the code is simple and short so it should be quite easy to understand.

// ClientWrapper - is a wrapper that is using async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt>; under the hood

// rethrow - is a function that just rethrows an exception

class ClientWrapper
{
boost::asio::awaitable<void> async_send(....)
{
    auto packet_id = ep_.acquire_packet_id();
    [...]
}

boost::asio::awaitable<...> async_receive()
{
  [...]
}

private:
async_mqtt::endpoint<async_mqtt::role::client, async_mqtt::protocol::mqtt> ep_;
};


auto ioc = io_context{};
auto client1 =  ClientWrapper{...};
auto client2 = ClientWrapper{...};

boost::asio::co_spawn(ioc, [&client1, &client2]() -> boost::asio::awaitable<void>()
{
  co_await client1.async_send(...);
  co_await client2.async_send(...);
  
  co_await client1.async_receive(...);
}, rethrow);

Notice that I'm using SYNC version of acquire_packet_id.
So, the error I showed before, and the documentation that says that there is an async version acquire_packet_id took me to the other solution where I replaced SYNC call to acquire_packet_id with async version with boost::use_awaitable completition hander.

So my questions are:

  • should I always use the async version of acquire_packet_id in case of using coroutines?
  • is my fix correct?
  • when should I use SYNC version of acquire_packet_id?
  • why is the return value of async call to acquire_packet_id optional? When can it be returned as nullopt?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.