Comments (7)
Hello @ghstahl
See some description of epoch in docs.
So epoch describes the generation of stream, to help properly detect missed messages in case there was stream like this one:
EPOCH#1 0, 1, 2, 3
And then stream was lost and new publications were added to a stream with new epoch:
EPOCH#2 0, 1, 2, 3
In this case when client tries to recover from EPOCH#1 and offset 2 – history call will receive unrecoverable position
error (code 112
). And automatic recovery in client protocol will provide recovered: false
flag, signalling that state should be loaded from the main application database.
Let's assume the following.
maxUint64 is 10.
It's not 10, not sure why we should assume this. You can stream to channel in 240 Hz more than a billion years to achieve max uint64 value. Centrifugo will handle case when epoch changed and offset was reset, but it wont handle the case when epoch is the same but offset is 18446744073709551615 0 1 2
- it's UB, and there are no plans to change this as there is no practical reason.
from centrifugo.
To get more clarification.
My tests are stopping my go client and adding a few messages to centrifugo.
I also block OnPublication events from centrifugo until I have caught up.
OnStartup case 1:
I load from my storage EPOCH#1
and offset 2
I get an OnSubscribed
event from centrifugo with streamposition;
EPOCH#2
and offset 4
Being different this causes me to go into a history recovery loop.
What will happen? From what I can gather I lost everything in EPOCH#1
and probably should change my historical stream position to
EPOCH#2
and offset 0
to get something.
OnStartup case 2:
I load from my storage EPOCH#1
and offset 2
I get an OnSubscribed
event from centrifugo with streamposition;
EPOCH#1
and offset 5000
Being different I enter a history recovery loop to fetch those missed messages.
During that loop I get another OnSubscribed
event from centrifugo. I have seen this happen in my tests.
I can assume this means I have lost connection and I now get a new streamposition from centrifuge that is further along than the initial one.
This cause me to shutdown my previous catchup loop and spawn another.
I assume I may get a different epoch like EPOCH#2
in the second onSubscribedEvent. What now?
A more high level question
: Should I just consider the streamposition as opaque and deal with whatever errors I get from the history apis. I probably should have no business even knowing what the internals of StreamPostion are?
from centrifugo.
Tests so far.
From the looks of it, under failure I just get the Epoch of the current subscription if my stored Epoch doesn't match and set my offset to 0
.
I set history config as follows.
{
"token_hmac_secret_key": "my_secret",
"api_key": "my_api_key",
"admin_password": "password",
"admin_secret": "secret",
"admin": true,
"allowed_origins": ["http://localhost:3000"],
"allow_subscribe_for_client": true,
"namespaces": [
{
"name": "chat",
"presence": true,
"join_leave": true,
"history_size": 2,
"history_ttl": "300h",
"force_positioning": true,
"force_recovery": true,
"allow_history_for_subscriber": true,
"allow_publish_for_client": true,
"allow_subscribe_for_client": true
}
]
}
I know my Epoc and made a history call with
{"Offset":0,"Epoch":"FMvS"}
Sending in the wrong Epoch I expect an error - good
{"error":"112: unrecoverable position"}
I only add 2 message and I get my 2 from history - good
{
"level": "info",
"batch": {
"Publications": [
{
"Offset": 1,
"Data": "eyJpbnB1dCI6ImgwMDAifQ==",
"Info": {
"Client": "9a7eca8c-545c-4c8c-a150-884720d65589",
"User": "49",
"ConnInfo": null,
"ChanInfo": null
},
"Tags": null
},
{
"Offset": 2,
"Data": "eyJpbnB1dCI6ImgwMDEifQ==",
"Info": {
"Client": "0252a95d-dff7-4a25-a011-27d849b23684",
"User": "49",
"ConnInfo": null,
"ChanInfo": null
},
"Tags": null
}
]
}
}
I sent 10 messages, so the theory is that only the last 2 are kept for 300h.
Got the last to even though I said my offset == 0
Good.
{
"level": "info",
"batch": {
"Publications": [
{
"Offset": 9,
"Data": "eyJpbnB1dCI6ImgwMDgifQ==",
"Info": {
"Client": "40c844c8-f150-4573-93b4-bd4fc3bb229d",
"User": "49",
"ConnInfo": null,
"ChanInfo": null
},
"Tags": null
},
{
"Offset": 10,
"Data": "eyJpbnB1dCI6ImgwMDkifQ==",
"Info": {
"Client": "126ca5f1-829c-4f18-b0de-7c1e8781e8e5",
"User": "49",
"ConnInfo": null,
"ChanInfo": null
},
"Tags": null
}
]
}
}
from centrifugo.
Centrifugo SDKs deal with recovery automatically as soon as recovery is enabled, looking at your configuration it's enabled ("force_recovery" : true
).
At the same time client API exposes stream position and possibility to call history API manually – so that there is a way to construct your own recovery mechanism on top of Centrifugo history API.
But I do not understand what problem you are solving. You have automatic recovery and at the same time want to understand internals like you want to recover manually – this two facts do not match in my head.
Generally, when using auto recovery feature of Centrifugo you need to rely on recovered
flag in subscribed
event context only - without looking at stream position at all. When flag is true – all messages were recovered from Centrifugo, if false – it's a signal to load state from scratch from the main database.
from centrifugo.
TLDR;
Is the centrifigu-go client sdk history aware or am I developing a new client SDK that accounts for historical data?
It would be nice if Centrifugo had a kafka type commit that is tracked server side per client connection.
What I am trying to do.
I have a single consumer for a channel.
The messages I get from Centrifugo are sent into a Benthos stream which does an ACK back to me and that messages StreamPostion gets put into persistent storage.
Doing my own Kafka type commit thing here.
When my consumer goes down for a while, I rely on persistent storage to know that last stream position I was at, stream_position_0
.
I get an initial OnSubscribed
event that tells me the current centrifuge stream position.
stream_position_A
Thats the startup catchup.
I block the sdk at the OnPublication event until I am caught up.
i.e. pull data from stream_position_0
to stream_position_A
.
Let's assume that my catchup takes a long time and many messages have come into centrifugo which makes the stream_position_A
I got earlier as now behind. The real streamposition is now stream_position_B
, but I don't know this, and I assume neither does the client SDK. Will the client SDK handle these new messages. i.e. Can I trust that the OnPublications
event once unblocked start at stream_position_A
and do its own catchup to stream_position_B++
Is there ever a case where the centrifuge-go client is blocked (by me at the OnPublication event), loses connection to the server, and upon doing a reconnect has now lost messages?
Continual Catchup Solution:
If at any time the OnPublicationEvent
delivers me a message that is more than 1 ahead of where I think I am at, I must enter into a catchup loop.
If at any time the OnSubscribed
event delivers a streamposition that is ahead of where I think I am at, I must enter into a catchup loop.
from centrifugo.
It would be nice if Centrifugo had a kafka type commit that is tracked server side per client connection.
I doubt it will happen in the observable future. Centrifugo is different model than Kafka, more focused on in-memory short-term streams which accompany applications to recover missed messages upon short-term reconnect. This becomes important when lots of connections reconnect at the same time to avoid aggressive load on the main storage.
Maybe one day we will shift towards more persistent streams and consumers similar to Kafka, did not consider this seriously though since this model assumes disk storage, data replication – there are already systems which do it well - so out of scope for Centrifugo at this point.
Is there ever a case where the centrifuge-go client is blocked (by me at the OnPublication event), loses connection to the server, and upon doing a reconnect has now lost messages?
Yes, it can loose messages but it will provide recovered: false
flag in subscribed
even context which tells about that. In that case state should be restored from source of truth - in usual use case this is an application main database. But since you are using Centrifugo as part of pipeline – I am not sure what to do in that case.
Generally messages may be lost due to several reasons:
- Centrifugo defaul broker is in-memory – any Centrifugo restart results into publication history and epoch drop, so it's impossible to recover and when using auto recovery feature clients will get
recovered: false
after reconnecting - With Redis – persistence depends on Redis configuration. Without RDB and AOF restart of Redis will result into publications and epoch drop also. And again - clients will get
recovered: false
as a signal to reload state from source of truth. - History has TTL and max size – so if messages will be lost due to retention policies, again - messages lost but recovered flag will be properly set.
- And also there is a limit of messages which can be automatically recovered - see in docs - this also controls the max number of messages clients can load during automatic recovery. Also, recovered flag should be properly set to false if limit exceeds.
When Centrifugo see that it's impossible to recover continuity in stream for the client it won't send any publications to the client – since in most cases it does not make sense, as current state should be reloaded from the source of truth (app database).
from centrifugo.
I guess this may be closed
from centrifugo.
Related Issues (20)
- [feature] Silently Reject Publish Attempts on Proxied Channel Without Sending Client Errors HOT 4
- [question] What are the ways of decreasing the message payload? HOT 1
- [feature] Better error message when subscription token is used as connection token
- Unable To Connect With anonymous mode HOT 4
- [feature] serve admin interface via http2 HOT 4
- [feature] Send server time in initial connect response. HOT 2
- [question] Upgrading to 4.1.5 increase in response time HOT 3
- "Websocket Is Closed Before The Connection Is Established" - Web console error HOT 8
- [feature] Support for MessagePack Serialization HOT 2
- How to add multyple aud for a token in config file HOT 1
- [question] Centrifugo customization HOT 13
- [question] Support for Rate Limiting Messages Sent to Clients HOT 2
- [bug] centrifugo-pro not getting OnPublishEvents when using a token with channels claim HOT 4
- [bug] Can't pull history with channel JWT HOT 7
- [bug] With protobuf binary, RPCRequest.Data is empty. HOT 3
- [question] jwt invalid token in subscriptions HOT 3
- `bad request` and `disconnect` immediately after connection established HOT 6
- [question] When using gzip compression, http_stream transport fails to connect because of timeout. HOT 4
- [question] Get list of channels that have history HOT 6
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from centrifugo.