Code Monkey home page Code Monkey logo

Comments (7)

FZambia avatar FZambia commented on June 7, 2024

Hello @ghstahl

See some description of epoch in docs.

So epoch describes the generation of stream, to help properly detect missed messages in case there was stream like this one:

EPOCH#1 0, 1, 2, 3

And then stream was lost and new publications were added to a stream with new epoch:

EPOCH#2 0, 1, 2, 3

In this case when client tries to recover from EPOCH#1 and offset 2 – history call will receive unrecoverable position error (code 112). And automatic recovery in client protocol will provide recovered: false flag, signalling that state should be loaded from the main application database.

Let's assume the following.
maxUint64 is 10.

It's not 10, not sure why we should assume this. You can stream to channel in 240 Hz more than a billion years to achieve max uint64 value. Centrifugo will handle case when epoch changed and offset was reset, but it wont handle the case when epoch is the same but offset is 18446744073709551615 0 1 2 - it's UB, and there are no plans to change this as there is no practical reason.

from centrifugo.

ghstahl avatar ghstahl commented on June 7, 2024

To get more clarification.
My tests are stopping my go client and adding a few messages to centrifugo.
I also block OnPublication events from centrifugo until I have caught up.

OnStartup case 1:

I load from my storage EPOCH#1 and offset 2

I get an OnSubscribed event from centrifugo with streamposition;
EPOCH#2 and offset 4

Being different this causes me to go into a history recovery loop.

What will happen? From what I can gather I lost everything in EPOCH#1 and probably should change my historical stream position to
EPOCH#2 and offset 0 to get something.

OnStartup case 2:

I load from my storage EPOCH#1 and offset 2

I get an OnSubscribed event from centrifugo with streamposition;
EPOCH#1 and offset 5000

Being different I enter a history recovery loop to fetch those missed messages.

During that loop I get another OnSubscribed event from centrifugo. I have seen this happen in my tests.
I can assume this means I have lost connection and I now get a new streamposition from centrifuge that is further along than the initial one.
This cause me to shutdown my previous catchup loop and spawn another.

I assume I may get a different epoch like EPOCH#2 in the second onSubscribedEvent. What now?

A more high level question: Should I just consider the streamposition as opaque and deal with whatever errors I get from the history apis. I probably should have no business even knowing what the internals of StreamPostion are?

from centrifugo.

ghstahl avatar ghstahl commented on June 7, 2024

Tests so far.

From the looks of it, under failure I just get the Epoch of the current subscription if my stored Epoch doesn't match and set my offset to 0.

I set history config as follows.

{
  "token_hmac_secret_key": "my_secret",
  "api_key": "my_api_key",
  "admin_password": "password",
  "admin_secret": "secret",
  "admin": true,
  "allowed_origins": ["http://localhost:3000"],
  "allow_subscribe_for_client": true,
  "namespaces": [
    {
      "name": "chat",
      "presence": true,
      "join_leave": true,
      "history_size": 2,
      "history_ttl": "300h",
      "force_positioning": true,
      "force_recovery": true,
      "allow_history_for_subscriber": true,
      "allow_publish_for_client": true,
      "allow_subscribe_for_client": true
    }
  ]
}

I know my Epoc and made a history call with

{"Offset":0,"Epoch":"FMvS"}

Sending in the wrong Epoch I expect an error - good

{"error":"112: unrecoverable position"}

I only add 2 message and I get my 2 from history - good

{
   "level": "info",
   "batch": {
       "Publications": [
           {
               "Offset": 1,
               "Data": "eyJpbnB1dCI6ImgwMDAifQ==",
               "Info": {
                   "Client": "9a7eca8c-545c-4c8c-a150-884720d65589",
                   "User": "49",
                   "ConnInfo": null,
                   "ChanInfo": null
               },
               "Tags": null
           },
           {
               "Offset": 2,
               "Data": "eyJpbnB1dCI6ImgwMDEifQ==",
               "Info": {
                   "Client": "0252a95d-dff7-4a25-a011-27d849b23684",
                   "User": "49",
                   "ConnInfo": null,
                   "ChanInfo": null
               },
               "Tags": null
           }
       ]
   }
}

I sent 10 messages, so the theory is that only the last 2 are kept for 300h.
Got the last to even though I said my offset == 0
Good.

{
    "level": "info",
    "batch": {
        "Publications": [
            {
                "Offset": 9,
                "Data": "eyJpbnB1dCI6ImgwMDgifQ==",
                "Info": {
                    "Client": "40c844c8-f150-4573-93b4-bd4fc3bb229d",
                    "User": "49",
                    "ConnInfo": null,
                    "ChanInfo": null
                },
                "Tags": null
            },
            {
                "Offset": 10,
                "Data": "eyJpbnB1dCI6ImgwMDkifQ==",
                "Info": {
                    "Client": "126ca5f1-829c-4f18-b0de-7c1e8781e8e5",
                    "User": "49",
                    "ConnInfo": null,
                    "ChanInfo": null
                },
                "Tags": null
            }
        ]
    } 
}

from centrifugo.

FZambia avatar FZambia commented on June 7, 2024

Centrifugo SDKs deal with recovery automatically as soon as recovery is enabled, looking at your configuration it's enabled ("force_recovery" : true).

At the same time client API exposes stream position and possibility to call history API manually – so that there is a way to construct your own recovery mechanism on top of Centrifugo history API.

But I do not understand what problem you are solving. You have automatic recovery and at the same time want to understand internals like you want to recover manually – this two facts do not match in my head.

Generally, when using auto recovery feature of Centrifugo you need to rely on recovered flag in subscribed event context only - without looking at stream position at all. When flag is true – all messages were recovered from Centrifugo, if false – it's a signal to load state from scratch from the main database.

from centrifugo.

ghstahl avatar ghstahl commented on June 7, 2024

TLDR;

Is the centrifigu-go client sdk history aware or am I developing a new client SDK that accounts for historical data?
It would be nice if Centrifugo had a kafka type commit that is tracked server side per client connection.

What I am trying to do.

I have a single consumer for a channel.
The messages I get from Centrifugo are sent into a Benthos stream which does an ACK back to me and that messages StreamPostion gets put into persistent storage.
Doing my own Kafka type commit thing here.

When my consumer goes down for a while, I rely on persistent storage to know that last stream position I was at, stream_position_0.
I get an initial OnSubscribed event that tells me the current centrifuge stream position.
stream_position_A

Thats the startup catchup.
I block the sdk at the OnPublication event until I am caught up.
i.e. pull data from stream_position_0 to stream_position_A.

Let's assume that my catchup takes a long time and many messages have come into centrifugo which makes the stream_position_A I got earlier as now behind. The real streamposition is now stream_position_B, but I don't know this, and I assume neither does the client SDK. Will the client SDK handle these new messages. i.e. Can I trust that the OnPublications event once unblocked start at stream_position_A and do its own catchup to stream_position_B++

Is there ever a case where the centrifuge-go client is blocked (by me at the OnPublication event), loses connection to the server, and upon doing a reconnect has now lost messages?

Continual Catchup Solution:

If at any time the OnPublicationEvent delivers me a message that is more than 1 ahead of where I think I am at, I must enter into a catchup loop.

If at any time the OnSubscribed event delivers a streamposition that is ahead of where I think I am at, I must enter into a catchup loop.

from centrifugo.

FZambia avatar FZambia commented on June 7, 2024

It would be nice if Centrifugo had a kafka type commit that is tracked server side per client connection.

I doubt it will happen in the observable future. Centrifugo is different model than Kafka, more focused on in-memory short-term streams which accompany applications to recover missed messages upon short-term reconnect. This becomes important when lots of connections reconnect at the same time to avoid aggressive load on the main storage.

Maybe one day we will shift towards more persistent streams and consumers similar to Kafka, did not consider this seriously though since this model assumes disk storage, data replication – there are already systems which do it well - so out of scope for Centrifugo at this point.

Is there ever a case where the centrifuge-go client is blocked (by me at the OnPublication event), loses connection to the server, and upon doing a reconnect has now lost messages?

Yes, it can loose messages but it will provide recovered: false flag in subscribed even context which tells about that. In that case state should be restored from source of truth - in usual use case this is an application main database. But since you are using Centrifugo as part of pipeline – I am not sure what to do in that case.

Generally messages may be lost due to several reasons:

  • Centrifugo defaul broker is in-memory – any Centrifugo restart results into publication history and epoch drop, so it's impossible to recover and when using auto recovery feature clients will get recovered: false after reconnecting
  • With Redis – persistence depends on Redis configuration. Without RDB and AOF restart of Redis will result into publications and epoch drop also. And again - clients will get recovered: false as a signal to reload state from source of truth.
  • History has TTL and max size – so if messages will be lost due to retention policies, again - messages lost but recovered flag will be properly set.
  • And also there is a limit of messages which can be automatically recovered - see in docs - this also controls the max number of messages clients can load during automatic recovery. Also, recovered flag should be properly set to false if limit exceeds.

When Centrifugo see that it's impossible to recover continuity in stream for the client it won't send any publications to the client – since in most cases it does not make sense, as current state should be reloaded from the source of truth (app database).

from centrifugo.

FZambia avatar FZambia commented on June 7, 2024

I guess this may be closed

from centrifugo.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.