Code Monkey home page Code Monkey logo

Comments (18)

X-Ryl669 avatar X-Ryl669 commented on July 25, 2024

Can you zip your project (at least the minimum required to show the issue) and post it here ?
I'll take a look.

from emqtt5.

yunglook avatar yunglook commented on July 25, 2024

eMQTT5 project.zip

There you go. Note that this project will not be runnable, because it doesn't include the entire project. I cannot share the entire project due to its size and company restrictions.

The error occurs in the following test case:
The application subscribes to the MQTT broker and after 1 second it unsubscribes.

Whenever I am not sending messages (with a separate client), eMQTT5 gets stuck in the while loop, printing the error above.
Whenever I am sending multiple messages, it only handles a single message in the 1 second period of being subscribed.

I hope this will be of assistance.

from emqtt5.

X-Ryl669 avatar X-Ryl669 commented on July 25, 2024

Can you increase all the log level and post them all here ?

Few ideas from reading your code:

  1. connect should leave the socket as blocking with a timeout on receive and send (so it's easier to deal with than doing a select(...timeout) & read loop). I'm pretty sure MbedTLS does not want a non-blocking socket by itself.
  2. Can you add to your logs the value of r, buffer and ret in MBTLSSocket::recv so we get an idea of what MBedTLS is returning (typical log pattern: ESP_LOGI("RECV", "Received: %d, r:%d, ret:%d, buf:%.*s", 1 or 2 or 3..., r, ret, ret > 0 ? ret : 0, buffer)) ?
  3. Similarly in receiveControlPacket() please log the variable values (ret and r)

Please notice that's perfectly ok to run the event loop in a task, as long as you don't publish from another task at the same time (or you need to use a mutex to prevent both using the socket at the same time), that's how I'm using it.

Also, have you changed the definition of MQTTLock / Lock and ScopeLock (to use FreeRTOS's mutex) ? Else it's very slow on ESP32.

from emqtt5.

X-Ryl669 avatar X-Ryl669 commented on July 25, 2024

Just a thought here, but the log above looks like normal usage of mbed_ssl_read. I'm asking for 5 bytes buffer and mbed just says it doesn't have them. It shouldn't cause any issue by itself. However, the log doesn't match the mbedtls source code that I have (for example, mbedtls_ssl_read is in ssl_msg.c and it's located around line 5300/5600). So maybe there's a mismatch between what's in esp-idf and your implementation.

Also, this lines 523: #include "mbedtls/esp_debug.h" in the middle of a class definition is very dangerous here (not sure how you ended up with something that build). You should probably move all include directive at the top of the file.

When you said you've tried with non blocking socket, you meant with mbedtls_net_set_block and not just fcntl ?
Because I'm using the bare minimum as BIO functions here (those implemented by mbed, not mine), so if you touch the underlying socket object by yourself, you're likely to break the logic in mbed.

from emqtt5.

yunglook avatar yunglook commented on July 25, 2024

Also, have you changed the definition of MQTTLock / Lock and ScopeLock (to use FreeRTOS's mutex) ? Else it's very slow on ESP32

No, I haven't changed that yet. Do I have to change the functions' (acquire, release, etc.) implementation to call similar FreeRTOS mutex functions (lock, unlock)?
I'm looping the eventLoop whenever the client is subscribed, and exit the loop when its unsubscribed.

from emqtt5.

X-Ryl669 avatar X-Ryl669 commented on July 25, 2024

This is what I have in my own code:

// We need FreeRTOS stuff
#include "freertos/FreeRTOS.h"
#include "freertos/semphr.h"

/** Convert millisecond to FreeRTOS' ticks */
static unsigned fromMs(const unsigned waitMillisecond)
{
    return waitMillisecond == (unsigned)-1 ? portMAX_DELAY : (waitMillisecond / portTICK_PERIOD_MS);
}

/** A very simple, but complete non-recursive mutex class */
class Mutex
{
private:
    StaticSemaphore_t buffer;
    SemaphoreHandle_t sem;

public:
    /** Construct a mutex */
    Mutex() : sem(xSemaphoreCreateMutexStatic(&buffer)) {}

    /** Acquire a mutex and wait for the given timeout.
        @param waitMillisecond  The time to wait in milliseconds
        @return true if the mutex was held */
    bool acquire(unsigned waitMillisecond = (unsigned)-1)
    {
        return xSemaphoreTake(sem, fromMs(waitMillisecond)) == pdTRUE;
    }
    /** Release a mutex.
        @return true upon successful releasing */
    bool release() { return xSemaphoreGive(sem) == pdTRUE; }
};

/** A scoped lock.
    A RAII class for mutex locking on construction and unlocking upon destruction. */
struct ScopedLock
{
    Mutex & mutex;
    /** Hold a mutex upon construction */
    ScopedLock(Mutex & mutex) : mutex(mutex) { mutex.acquire(); }
    ~ScopedLock() { mutex.release(); }
};

#define MQTTLock        Mutex
#define MQTTScopedLock  ScopedLock

from emqtt5.

yunglook avatar yunglook commented on July 25, 2024

Output when running eventLoop without receiving publishes:

I (41345) EVENTLOOP: -1
I (41345) mbedtls: ssl_tls.c:8805 => write

I (41345) mbedtls: ssl_tls.c:3445 => write record

I (41355) mbedtls: ssl_tls.c:1446 => encrypt buf

I (41365) mbedtls: ssl_tls.c:1780 <= encrypt buf

I (41365) mbedtls: ssl_tls.c:2847 => flush output

I (41375) mbedtls: ssl_tls.c:2866 message length: 55, out_left: 55

I (41385) mbedtls: ssl_tls.c:2871 ssl->f_send() returned 55 (-0xffffffc9)

I (41385) mbedtls: ssl_tls.c:2899 <= flush output

I (41395) mbedtls: ssl_tls.c:3578 <= write record

I (41395) mbedtls: ssl_tls.c:8833 <= write

I (41405) mbedtls: ssl_tls.c:8389 => read

I (41405) mbedtls: ssl_tls.c:4419 => read record

I (41415) mbedtls: ssl_tls.c:2628 => fetch input

I (41415) mbedtls: ssl_tls.c:2789 in_left: 0, nb_want: 5

I (41825) mbedtls: ssl_tls.c:2813 in_left: 0, nb_want: 5

I (41825) mbedtls: ssl_tls.c:2814 ssl->f_recv(_timeout)() returned 5 (-0xfffffffb)

I (41825) mbedtls: ssl_tls.c:2834 <= fetch input

I (41835) mbedtls: ssl_tls.c:2628 => fetch input

I (41845) mbedtls: ssl_tls.c:2789 in_left: 5, nb_want: 35

I (41845) mbedtls: ssl_tls.c:2813 in_left: 5, nb_want: 35

I (41855) mbedtls: ssl_tls.c:2814 ssl->f_recv(_timeout)() returned 30 (-0xffffffe2)

I (41865) mbedtls: ssl_tls.c:2834 <= fetch input

I (41865) mbedtls: ssl_tls.c:1943 => decrypt buf

I (41875) mbedtls: ssl_tls.c:2468 <= decrypt buf

I (41875) mbedtls: ssl_tls.c:4493 <= read record

I (41885) mbedtls: ssl_tls.c:8681 <= read

I (41885) RECV: Received: 1, r:2, ret:2, buf:�
I (41895) mbedtls: ssl_tls.c:8389 => read

I (41895) mbedtls: ssl_tls.c:8681 <= read

I (41905) RECV: Received: 1, r:1, ret:1, buf:
I (41905) mbedtls: ssl_tls.c:8389 => read

I (41915) mbedtls: ssl_tls.c:8681 <= read

I (41915) RECV: Received: 1, r:3, ret:3, buf:
I (41925) receiveControlPacket: Received: ret:3, r:1
I (41925) EVENTLOOP: -0.5
I (41935) EVENTLOOP: 0
I (41935) EVENTLOOP: 01
I (41935) EVENTLOOP: 02
I (41945) mbedtls: ssl_tls.c:8389 => read

I (41945) mbedtls: ssl_tls.c:4419 => read record

I (41955) mbedtls: ssl_tls.c:2628 => fetch input

I (41955) mbedtls: ssl_tls.c:2789 in_left: 0, nb_want: 5

I (44965) mbedtls: ssl_tls.c:2813 in_left: 0, nb_want: 5

I (44965) mbedtls: ssl_tls.c:8389 => read

I (44965) mbedtls: ssl_tls.c:4419 => read record

I (44965) mbedtls: ssl_tls.c:2628 => fetch input

I (44975) mbedtls: ssl_tls.c:2789 in_left: 0, nb_want: 5

I (47975) mbedtls: ssl_tls.c:2813 in_left: 0, nb_want: 5

I (47975) mbedtls: ssl_tls.c:8389 => read

I (47975) mbedtls: ssl_tls.c:4419 => read record

I (47975) mbedtls: ssl_tls.c:2628 => fetch input

I (47985) mbedtls: ssl_tls.c:2789 in_left: 0, nb_want: 5

Output when running eventLoop when receiving publishMessages:

I (223015) EVENTLOOP: -1
I (223015) mbedtls: ssl_tls.c:8805 => write

I (223015) mbedtls: ssl_tls.c:3445 => write record

I (223015) mbedtls: ssl_tls.c:1446 => encrypt buf

I (223025) mbedtls: ssl_tls.c:1780 <= encrypt buf

I (223025) mbedtls: ssl_tls.c:2847 => flush output

I (223035) mbedtls: ssl_tls.c:2866 message length: 55, out_left: 55

I (223045) mbedtls: ssl_tls.c:2871 ssl->f_send() returned 55 (-0xffffffc9)

I (223045) mbedtls: ssl_tls.c:2899 <= flush output

I (223055) mbedtls: ssl_tls.c:3578 <= write record

I (223065) mbedtls: ssl_tls.c:8833 <= write

I (223065) mbedtls: ssl_tls.c:8389 => read

I (223075) mbedtls: ssl_tls.c:4419 => read record

I (223075) mbedtls: ssl_tls.c:2628 => fetch input

I (223085) mbedtls: ssl_tls.c:2789 in_left: 0, nb_want: 5

I (223085) mbedtls: ssl_tls.c:2813 in_left: 0, nb_want: 5

I (223095) mbedtls: ssl_tls.c:2814 ssl->f_recv(_timeout)() returned 5 (-0xfffffffb)

I (223105) mbedtls: ssl_tls.c:2834 <= fetch input

I (223105) mbedtls: ssl_tls.c:2628 => fetch input

I (223115) mbedtls: ssl_tls.c:2789 in_left: 5, nb_want: 35

I (223125) mbedtls: ssl_tls.c:2813 in_left: 5, nb_want: 35

I (223125) mbedtls: ssl_tls.c:2814 ssl->f_recv(_timeout)() returned 30 (-0xffffffe2)

I (223135) mbedtls: ssl_tls.c:2834 <= fetch input

I (223145) mbedtls: ssl_tls.c:1943 => decrypt buf

I (223145) mbedtls: ssl_tls.c:2468 <= decrypt buf

I (223155) mbedtls: ssl_tls.c:4493 <= read record

I (223155) mbedtls: ssl_tls.c:8681 <= read

I (223165) RECV: Received: 1, r:2, ret:2, buf:�
I (223165) mbedtls: ssl_tls.c:8389 => read

I (223175) mbedtls: ssl_tls.c:8681 <= read

I (223175) RECV: Received: 1, r:1, ret:1, buf:
I (223185) mbedtls: ssl_tls.c:8389 => read

I (223185) mbedtls: ssl_tls.c:8681 <= read

I (223195) RECV: Received: 1, r:3, ret:3, buf:
I (223195) receiveControlPacket: Received: ret:3, r:1
I (223205) EVENTLOOP: -0.5
I (223205) EVENTLOOP: 0
I (223215) EVENTLOOP: 01
I (223215) EVENTLOOP: 02
I (223215) mbedtls: ssl_tls.c:8389 => read

I (223225) mbedtls: ssl_tls.c:4419 => read record

I (223225) mbedtls: ssl_tls.c:2628 => fetch input

I (223235) mbedtls: ssl_tls.c:2789 in_left: 0, nb_want: 5

I (223235) mbedtls: ssl_tls.c:2813 in_left: 0, nb_want: 5

I (223245) mbedtls: ssl_tls.c:2814 ssl->f_recv(_timeout)() returned 5 (-0xfffffffb)

I (223255) mbedtls: ssl_tls.c:2834 <= fetch input

I (223255) mbedtls: ssl_tls.c:2628 => fetch input

I (223265) mbedtls: ssl_tls.c:2789 in_left: 5, nb_want: 152

I (223275) mbedtls: ssl_tls.c:2813 in_left: 5, nb_want: 152

I (223275) mbedtls: ssl_tls.c:2814 ssl->f_recv(_timeout)() returned 147 (-0xffffff6d)

I (223285) mbedtls: ssl_tls.c:2834 <= fetch input

I (223295) mbedtls: ssl_tls.c:1943 => decrypt buf

I (223295) mbedtls: ssl_tls.c:2468 <= decrypt buf

I (223305) mbedtls: ssl_tls.c:4493 <= read record

I (223305) mbedtls: ssl_tls.c:8681 <= read

I (223315) RECV: Received: 1, r:2, ret:2, buf:0y
I (223315) mbedtls: ssl_tls.c:8389 => read

I (223325) mbedtls: ssl_tls.c:8681 <= read

I (223325) RECV: Received: 1, r:1, ret:1, buf:
I (223335) mbedtls: ssl_tls.c:8389 => read

I (223335) mbedtls: ssl_tls.c:8681 <= read

I (223345) RECV: Received: 1, r:120, ret:120, buf:topic/benchmarking
I (223355) receiveControlPacket: Received: ret:120, r:1
I (223355) EVENTLOOP: 03
I (223365) EVENTLOOP: 04
I (223365) EVENTLOOP: 05
I (223365) EVENTLOOP: 1
I (223365) EVENTLOOP: 2
I (223375) EVENTLOOP: 3
I (223375) eMQTT5: Msg received: (0000)
I (223385) eMQTT5:   Topic: topic/benchmarking
I (223385) eMQTT5:   Payload: This is a message with the size of 100 bytes, it will be sent to a topic for MQTT Benchmark Service.
I (223395) EVENTLOOP: 4
I (223405) EVENTLOOP: -1
I (223405) EVENTLOOP: -0.5
I (223405) EVENTLOOP: 0
I (223415) EVENTLOOP: 01
I (223415) EVENTLOOP: 02
I (223415) mbedtls: ssl_tls.c:8389 => read

I (223425) mbedtls: ssl_tls.c:4419 => read record

I (223425) mbedtls: ssl_tls.c:2628 => fetch input

I (223435) mbedtls: ssl_tls.c:2789 in_left: 0, nb_want: 5

I (223445) mbedtls: ssl_tls.c:2813 in_left: 0, nb_want: 5

I (223445) mbedtls: ssl_tls.c:2814 ssl->f_recv(_timeout)() returned 5 (-0xfffffffb)

I (223455) mbedtls: ssl_tls.c:2834 <= fetch input

I (223455) mbedtls: ssl_tls.c:2628 => fetch input

I (223465) mbedtls: ssl_tls.c:2789 in_left: 5, nb_want: 152

I (223475) mbedtls: ssl_tls.c:2813 in_left: 5, nb_want: 152

I (223475) mbedtls: ssl_tls.c:2814 ssl->f_recv(_timeout)() returned 147 (-0xffffff6d)

I (223485) mbedtls: ssl_tls.c:2834 <= fetch input

I (223495) mbedtls: ssl_tls.c:1943 => decrypt buf

I (223495) mbedtls: ssl_tls.c:2468 <= decrypt buf

I (223505) mbedtls: ssl_tls.c:4493 <= read record

I (223505) mbedtls: ssl_tls.c:8681 <= read

I (223515) RECV: Received: 1, r:2, ret:2, buf:0y
I (223515) mbedtls: ssl_tls.c:8389 => read

I (223525) mbedtls: ssl_tls.c:8681 <= read

I (223525) RECV: Received: 1, r:1, ret:1, buf:
I (223535) mbedtls: ssl_tls.c:8389 => read

I (223535) mbedtls: ssl_tls.c:8681 <= read

I (223545) RECV: Received: 1, r:120, ret:120, buf:topic/benchmarking
I (223555) receiveControlPacket: Received: ret:120, r:1
I (223555) EVENTLOOP: 03
I (223565) EVENTLOOP: 04
I (223565) EVENTLOOP: 05
I (223565) EVENTLOOP: 1
I (223565) EVENTLOOP: 2
I (223575) EVENTLOOP: 3
I (223575) eMQTT5: Msg received: (0000)
I (223585) eMQTT5:   Topic: topic/benchmarking
I (223585) eMQTT5:   Payload: This is a message with the size of 100 bytes, it will be sent to a topic for MQTT Benchmark Service.
I (223595) EVENTLOOP: 4
I (223605) EVENTLOOP: -1
I (223605) EVENTLOOP: -0.5
I (223605) EVENTLOOP: 0
I (223615) EVENTLOOP: 01
I (223615) EVENTLOOP: 02
I (223615) mbedtls: ssl_tls.c:8389 => read

I (223625) mbedtls: ssl_tls.c:4419 => read record

I (223625) mbedtls: ssl_tls.c:2628 => fetch input

I (223635) mbedtls: ssl_tls.c:2789 in_left: 0, nb_want: 5

I (223645) mbedtls: ssl_tls.c:2813 in_left: 0, nb_want: 5

I (223645) mbedtls: ssl_tls.c:2814 ssl->f_recv(_timeout)() returned 5 (-0xfffffffb)

I (223655) mbedtls: ssl_tls.c:2834 <= fetch input

I (223665) mbedtls: ssl_tls.c:2628 => fetch input

I (223665) mbedtls: ssl_tls.c:2789 in_left: 5, nb_want: 152

I (223675) mbedtls: ssl_tls.c:2813 in_left: 5, nb_want: 152

I (223675) mbedtls: ssl_tls.c:2814 ssl->f_recv(_timeout)() returned 147 (-0xffffff6d)

I (223685) mbedtls: ssl_tls.c:2834 <= fetch input

I (223695) mbedtls: ssl_tls.c:1943 => decrypt buf

I (223695) mbedtls: ssl_tls.c:2468 <= decrypt buf

I (223705) mbedtls: ssl_tls.c:4493 <= read record

I (223705) mbedtls: ssl_tls.c:8681 <= read

I (223715) RECV: Received: 1, r:2, ret:2, buf:0y
I (223715) mbedtls: ssl_tls.c:8389 => read

I (223725) mbedtls: ssl_tls.c:8681 <= read

I (223725) RECV: Received: 1, r:1, ret:1, buf:
I (223735) mbedtls: ssl_tls.c:8389 => read

I (223735) mbedtls: ssl_tls.c:8681 <= read

I (223745) RECV: Received: 1, r:120, ret:120, buf:topic/benchmarking
I (223755) receiveControlPacket: Received: ret:120, r:1
I (223755) EVENTLOOP: 03
I (223765) EVENTLOOP: 04
I (223765) EVENTLOOP: 05
I (223765) EVENTLOOP: 1
I (223775) EVENTLOOP: 2
I (223775) EVENTLOOP: 3
I (223775) eMQTT5: Msg received: (0000)
I (223785) eMQTT5:   Topic: topic/benchmarking
I (223785) eMQTT5:   Payload: This is a message with the size of 100 bytes, it will be sent to a topic for MQTT Benchmark Service.
I (223795) EVENTLOOP: 4
I (223805) EVENTLOOP: -1
I (223805) EVENTLOOP: -0.5
I (223805) EVENTLOOP: 0
I (223815) EVENTLOOP: 01
I (223815) EVENTLOOP: 02
I (223815) mbedtls: ssl_tls.c:8389 => read

I (223825) mbedtls: ssl_tls.c:4419 => read record

I (223825) mbedtls: ssl_tls.c:2628 => fetch input

I (223835) mbedtls: ssl_tls.c:2789 in_left: 0, nb_want: 5

I (223845) mbedtls: ssl_tls.c:2813 in_left: 0, nb_want: 5

I (223845) mbedtls: ssl_tls.c:2814 ssl->f_recv(_timeout)() returned 5 (-0xfffffffb)

I (223855) mbedtls: ssl_tls.c:2834 <= fetch input

I (223865) mbedtls: ssl_tls.c:2628 => fetch input

I (223865) mbedtls: ssl_tls.c:2789 in_left: 5, nb_want: 152

I (223875) mbedtls: ssl_tls.c:2813 in_left: 5, nb_want: 152

I (223875) mbedtls: ssl_tls.c:2814 ssl->f_recv(_timeout)() returned 147 (-0xffffff6d)

I (223885) mbedtls: ssl_tls.c:2834 <= fetch input

I (223895) mbedtls: ssl_tls.c:1943 => decrypt buf

I (223895) mbedtls: ssl_tls.c:2468 <= decrypt buf

I (223905) mbedtls: ssl_tls.c:4493 <= read record

I (223905) mbedtls: ssl_tls.c:8681 <= read

I (223915) RECV: Received: 1, r:2, ret:2, buf:0y
I (223925) mbedtls: ssl_tls.c:8389 => read

I (223925) mbedtls: ssl_tls.c:8681 <= read

I (223935) RECV: Received: 1, r:1, ret:1, buf:
I (223935) mbedtls: ssl_tls.c:8389 => read

I (223945) mbedtls: ssl_tls.c:8681 <= read

I (223945) RECV: Received: 1, r:120, ret:120, buf:topic/benchmarking
I (223955) receiveControlPacket: Received: ret:120, r:1
I (223955) EVENTLOOP: 03
I (223965) EVENTLOOP: 04
I (223965) EVENTLOOP: 05
I (223965) EVENTLOOP: 1
I (223975) EVENTLOOP: 2
I (223975) EVENTLOOP: 3
I (223975) eMQTT5: Msg received: (0000)
I (223985) eMQTT5:   Topic: topic/benchmarking
I (223985) eMQTT5:   Payload: This is a message with the size of 100 bytes, it will be sent to a topic for MQTT Benchmark Service.
I (223995) EVENTLOOP: 4
I (224005) EVENTLOOP: -1
I (224005) EVENTLOOP: -0.5
I (224005) EVENTLOOP: 0
I (224015) EVENTLOOP: 01
I (224015) EVENTLOOP: 02
I (224015) mbedtls: ssl_tls.c:8389 => read

I (224025) mbedtls: ssl_tls.c:4419 => read record

I (224025) mbedtls: ssl_tls.c:2628 => fetch input

I (224035) mbedtls: ssl_tls.c:2789 in_left: 0, nb_want: 5

I (224045) mbedtls: ssl_tls.c:2813 in_left: 0, nb_want: 5

I (224045) mbedtls: ssl_tls.c:2814 ssl->f_recv(_timeout)() returned 5 (-0xfffffffb)

I (224055) mbedtls: ssl_tls.c:2834 <= fetch input

I (224065) mbedtls: ssl_tls.c:2628 => fetch input

I (224065) mbedtls: ssl_tls.c:2789 in_left: 5, nb_want: 152

I (224075) mbedtls: ssl_tls.c:2813 in_left: 5, nb_want: 152

I (224075) mbedtls: ssl_tls.c:2814 ssl->f_recv(_timeout)() returned 147 (-0xffffff6d)

I (224085) mbedtls: ssl_tls.c:2834 <= fetch input

I (224095) mbedtls: ssl_tls.c:1943 => decrypt buf

I (224105) mbedtls: ssl_tls.c:2468 <= decrypt buf

I (224105) mbedtls: ssl_tls.c:4493 <= read record

I (224115) mbedtls: ssl_tls.c:8681 <= read

I (224115) RECV: Received: 1, r:2, ret:2, buf:0y
I (224125) mbedtls: ssl_tls.c:8389 => read

I (224125) mbedtls: ssl_tls.c:8681 <= read

I (224135) RECV: Received: 1, r:1, ret:1, buf:
I (224135) mbedtls: ssl_tls.c:8389 => read

I (224145) mbedtls: ssl_tls.c:8681 <= read

I (224145) RECV: Received: 1, r:120, ret:120, buf:topic/benchmarking
I (224155) receiveControlPacket: Received: ret:120, r:1
I (224155) EVENTLOOP: 03
I (224165) EVENTLOOP: 04
I (224165) EVENTLOOP: 05
I (224165) EVENTLOOP: 1
I (224175) EVENTLOOP: 2
I (224175) EVENTLOOP: 3
I (224175) eMQTT5: Msg received: (0000)
I (224185) eMQTT5:   Topic: topic/benchmarking
I (224185) eMQTT5:   Payload: This is a message with the size of 100 bytes, it will be sent to a topic for MQTT Benchmark Service.
I (224195) EVENTLOOP: 4
I (224205) mbedtls: ssl_tls.c:8805 => write

I (224205) mbedtls: ssl_tls.c:3445 => write record

I (224215) mbedtls: ssl_tls.c:1446 => encrypt buf

I (224215) mbedtls: ssl_tls.c:1780 <= encrypt buf

I (224225) mbedtls: ssl_tls.c:2847 => flush output

I (224235) mbedtls: ssl_tls.c:2866 message length: 54, out_left: 54

I (224245) mbedtls: ssl_tls.c:2871 ssl->f_send() returned 54 (-0xffffffca)

I (224245) mbedtls: ssl_tls.c:2899 <= flush output

I (224255) mbedtls: ssl_tls.c:3578 <= write record

I (224255) mbedtls: ssl_tls.c:8833 <= write

I (224265) mbedtls: ssl_tls.c:8389 => read

I (224265) mbedtls: ssl_tls.c:4419 => read record

I (224275) mbedtls: ssl_tls.c:2628 => fetch input

I (224275) mbedtls: ssl_tls.c:2789 in_left: 0, nb_want: 5

I (224285) mbedtls: ssl_tls.c:2813 in_left: 0, nb_want: 5

I (224295) mbedtls: ssl_tls.c:2814 ssl->f_recv(_timeout)() returned 5 (-0xfffffffb)

I (224295) mbedtls: ssl_tls.c:2834 <= fetch input

I (224305) mbedtls: ssl_tls.c:2628 => fetch input

I (224305) mbedtls: ssl_tls.c:2789 in_left: 5, nb_want: 152

I (224315) mbedtls: ssl_tls.c:2813 in_left: 5, nb_want: 152

I (224325) mbedtls: ssl_tls.c:2814 ssl->f_recv(_timeout)() returned 147 (-0xffffff6d)

I (224335) mbedtls: ssl_tls.c:2834 <= fetch input

I (224335) mbedtls: ssl_tls.c:1943 => decrypt buf

I (224345) mbedtls: ssl_tls.c:2468 <= decrypt buf

I (224345) mbedtls: ssl_tls.c:4493 <= read record

I (224355) mbedtls: ssl_tls.c:8681 <= read

I (224355) RECV: Received: 1, r:2, ret:2, buf:0y
I (224365) mbedtls: ssl_tls.c:8389 => read

I (224365) mbedtls: ssl_tls.c:8681 <= read

I (224375) RECV: Received: 1, r:1, ret:1, buf:
I (224375) mbedtls: ssl_tls.c:8389 => read

I (224385) mbedtls: ssl_tls.c:8681 <= read

I (224385) RECV: Received: 1, r:120, ret:120, buf:topic/benchmarking
I (224395) receiveControlPacket: Received: ret:120, r:1
E (224405) eMQTT5: Failed to unsubscribe from topic topic/benchmarking, return error: -6

As you can see, when I am sending messages with another client, it does handle them correctly and displays the correct publish payload. Then, it sends a unsubscribe message which DOES reach the broker, but the eMQTT5 client returns a fail for unsubscribing, after which it loses connection.

Here's the broker routing the publishes:

... alot of publishes from otherClient to eMQTT5 ...
1653986788: Received PUBLISH from otherClient (d0, q0, r0, m0, 'topic/benchmarking', ... (100 bytes))
1653986788: Sending PUBLISH to eMQTT5 (d0, q0, r0, m0, 'topic/benchmarking', ... (100 bytes))                  
1653986788: Received UNSUBSCRIBE from eMQTT5 
1653986788:     topic/benchmarking                                                                                      
1653986788: eMQTT5 topic/benchmarking                                                                          
1653986788: Sending UNSUBACK to eMQTT5 
1653986788: Client otherClient closed its connection.        // purposely closed connection                            
1653986850: Client eMQTT5 closed its connection.            // lost connection

from emqtt5.

X-Ryl669 avatar X-Ryl669 commented on July 25, 2024

Ok, I think I've found the culprit. The returned error (-6) is NetworkError and the only possible path is from this line.
Since I haven't ported the unsubscribe method in the esp-eMQTT5 repository, I wonder if you've copy and pasted/replaced the subscribe method. If it's the case, you need to replace the line that read:

if (type == Protocol::MQTT::V5::SUBACK)

with

if (type == Protocol::MQTT::V5::UNSUBACK)

In all cases, I'll port the last change of eMQTT5 to esp-eMQTT5 so it should work. I'll let you know when I'm done

from emqtt5.

yunglook avatar yunglook commented on July 25, 2024

I wonder if you've copy and pasted/replaced the subscribe method

Yes, I did, but I had already replaced the type to check for UNSUBACK. I printed type and it returned 3 (or publish), so I'm guessing that because the eventLoop didn't keep up with the amount of messages published, and unsubscribe just checks the last packet received, it returns the networkError.

Also, I don't think the lastPackedReceived explains the eventLoop getting stuck when not receiving messages? I have implemented the MQTTLock like u said.

from emqtt5.

X-Ryl669 avatar X-Ryl669 commented on July 25, 2024

So, let's sum this a bit, please confirm:

  1. You are using proper mutex, as I've copied here
  2. You are still using more or less the same code as you posted above
  3. You have 2 client and a broker, one (let's call it A is an ESP32 and the log above is for A) the other is B we don't care
  4. If B doesn't publish any message, the event loop is stuck (first part of the log above).
  5. If B publish N messages, A only get less than N messages (how much ?)
  6. When B is unsubscribing, at least a previous message that didn't get through the eventLoop before is now returned instead of UNSUBACK message.

For 4, the log seems correct to me (except that the mbedtls code is not those of esp-idf's codebase). Maybe you can break into a debugger (enable gdbstub in your esp-idf config) and break in another task, so you can figure out where the code is waiting for (in the gdb console, type thread apply all bt and locate the event loop's thread/task). Please notice that when the socket's buffer is empty, it's expected that it times out, and the timeout delay is something you control (in the constructor of the MBTLSSocket). So for the complete time out duration, the event loop is stuck. As soon as a message gets in, it should immediately get unstuck.

It seems there is something wrong with the socket and MBedTLS going on here, so I would double check this first.
I've updated the esp-eMQTTv5 repository, but it shouldn't change much with your code.

from emqtt5.

yunglook avatar yunglook commented on July 25, 2024

Yes, you are correct on every point.
Regarding point 5: For my system, it doesn't matter how many messages are being delivered to A, as I will count them afterwards. Just know that B is rapidly sending messages to topic T, and there is a set period of time in which client A gets called to subscribe to T, and after that period of time unsubscribe from T. This time varies from 1 second to 5 minutes.

Regarding mbedtls and socket code, I changed the connect function in your code to work with PEM formatted self signed certificates:

int connect(const char * host, uint16 port, const unsigned char* rootCert, const unsigned char* clientCert, const unsigned char* clientKey, const unsigned char* passphrase)
        {
            int ret = BaseSocket::connect(host, port, rootCert);
            if (ret) return ret;
            
            net.fd = socket;

            if (::mbedtls_ctr_drbg_seed(&entropySource, ::mbedtls_entropy_func, &entropy, NULL, 0)) 
                return false;

            mbedtls_ssl_config_defaults(&conf, MBEDTLS_SSL_IS_CLIENT, MBEDTLS_SSL_TRANSPORT_STREAM, MBEDTLS_SSL_PRESET_DEFAULT);
            mbedtls_x509_crt_parse(&m_cacert, rootCert, 1+strlen(reinterpret_cast<const char *>(rootCert)));

            mbedtls_ssl_conf_authmode(&conf, MBEDTLS_SSL_VERIFY_REQUIRED);
            
            mbedtls_x509_crt_init(&m_clientcert);
            mbedtls_pk_init(&m_clientkey);

            mbedtls_x509_crt_parse(&m_clientcert, clientCert, 1+strlen(reinterpret_cast<const char *>(clientCert)));
            mbedtls_pk_parse_key(&m_clientkey, clientKey, 1+strlen(reinterpret_cast<const char *>(clientKey)), passphrase, 1+strlen((const char*)passphrase));
            mbedtls_ssl_conf_own_cert(&conf, &m_clientcert, &m_clientkey);

            mbedtls_ssl_conf_ca_chain(&conf, &m_cacert, NULL );
            mbedtls_ssl_conf_authmode(&conf, MBEDTLS_SSL_VERIFY_REQUIRED);

            // // Random number generator
            ::mbedtls_ssl_conf_rng(&conf, ::mbedtls_ctr_drbg_random, &entropySource);

            if (::mbedtls_ssl_setup(&ssl, &conf))
                return false;

            if (::mbedtls_ssl_set_hostname(&ssl, host))                             return -9;
            // Set the method the SSL engine is using to fetch/send data to the other side
            mbedtls_ssl_set_bio(&ssl, &net, mbedtls_net_send, mbedtls_net_recv, NULL);

            ret = ::mbedtls_ssl_handshake(&ssl);
            if (ret != 0 && ret != MBEDTLS_ERR_SSL_WANT_READ && ret != MBEDTLS_ERR_SSL_WANT_WRITE) 
                return -10;


            mbedtls_esp_enable_debug_log(&conf, 3);
                    
            // Check certificate if one provided
            if (rootCert) 
            {

                uint32_t flags = mbedtls_ssl_get_verify_result(&ssl);
                if (flags != 0)
                { 
#if MQTTDumpCommunication == 1
                    char verify_buf[100] = {0};
                    mbedtls_x509_crt_verify_info(verify_buf, sizeof(verify_buf), "  ! ", flags);
                    printf("mbedtls_ssl_get_verify_result: %s flag: 0x%x\n", verify_buf, flags);
#endif
                    return -11;
                }
            }

            return 0;
        }

This works perfectly with all other functions (publish, subscribe, etc) though, so I think I did this correctly.

from emqtt5.

X-Ryl669 avatar X-Ryl669 commented on July 25, 2024

Yep, it sounds correct. You're calling twice the mbedtls_ssl_conf_authmode function, but I think there's no harm in that.
I guess I'll have to modify the test code in esp-eMQTT5 to something close to your problem so I can reproduce. I'm pretty sure there's something squishy with the mbedtls's BIO functions, but so far it worked for me.

Just a side note, you should really use DER encoded certificate, since they are 33% smaller than X509 PER, and you also skip x509 parsing code in your binary (dual saving in binary size).

from emqtt5.

X-Ryl669 avatar X-Ryl669 commented on July 25, 2024

I've improved the test code from the esp-eMQTT5 repository to build with latest esp-idf version. To build, it's very simple now, go to test folder, run idf.py build and it should produce a binary that you can flash & run on your platform.
I'm not commiting the sdkconfig file, but there's nothing special here.

Please try to modify the code in the test's folder and reproduce your issue there. Once done, please post your changes, and I'll try to reproduce on my side. Thanks!

from emqtt5.

X-Ryl669 avatar X-Ryl669 commented on July 25, 2024

I've tested the test code yesterday night and everything worked as expected.
In the test code, I'm creating a task for running the event loop as well.
Please modify the test code until it shows the issue and let me know.

from emqtt5.

yunglook avatar yunglook commented on July 25, 2024

Hey! First of all, thanks for all the help, the fast replies and everything else :)

I have discovered what caused the eventLoop being stuck. I call the eventLoop based on another thread, based on if a boolean value allows it, true or false. Whenever it was true, and the eventLoop being called, the loop would not stop even if the boolean would be set to false. I fixed this by just deleting the thread whenever it would be false.

I fixed the disconnecting of a client after receiving messages by also accepting a publish message type instead of just an UNSUBACK in the unsubscribe call.

I know both of these solutions are far from perfect, but they are good enough for my application.

from emqtt5.

X-Ryl669 avatar X-Ryl669 commented on July 25, 2024

I've just committed a change to esp-eMQTT5 that should fix your issue better than it is now. I've changed the way the MBEDTLS code is dealing with timeout so the event loop should run more often, you don't need to kill the task anymore. Also, you shouldn't miss any publish message anymore. Use a timeout of 2s or more for no issue (if set to 0, a timeout of 3s is used by default).

There is still a possible race error in the rare event a publish message is sent by the broker at the same time you're sending an unsubscribe packet to the broker so you'll receive this message instead of the unsubscribe ACK message.
Right now, unsubscribing is synchronous (you unsubscribe and wait for the ACK to get the broker's reason code), just like subscribing.

I might have to rework the event loop in the future to also handle UNSUBACK message, but it means that you'd unsubscribe but not get the server's answer when the unsubscribe method is returning, so if you are interested in the broker's reason code, it means you'd have to wait a bit before querying the client.

from emqtt5.

X-Ryl669 avatar X-Ryl669 commented on July 25, 2024

Ok. I think I've fixed the potential race condition too, by rewriting the processing logic. Now unsubscribe returns without waiting for UNSUBACK. Such packet is now handled in the eventLoop as other packets.
If you are interested in the Unsubscribe's ACK reason code (unlikely), you can now call client.getUnsubscribeResult() after unsubscribing and running the event loop.

Until the ACK is received, this method will return "WaitingForResult", so it's safe to call. Usually, one one iteration of the event loop is required to get the ACK's result.

from emqtt5.

yunglook avatar yunglook commented on July 25, 2024

Wow, everything works perfectly now! Thank you

from emqtt5.

Related Issues (17)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.