Code Monkey home page Code Monkey logo

msg-rs's Introduction

MSG-RS

A flexible and lightweight messaging library for distributed systems built with Rust and Tokio.

CI License Book Discord

Overview

msg-rs is a messaging library that was inspired by projects like ZeroMQ and Nanomsg. It was built because we needed a Rust-native messaging library like those above.

MSG is still in ALPHA and is not ready for production use.

Documentation

The MSG-RS Book contains detailed information on how to use the library.

Features

  • Multiple socket types
    • Request/Reply
    • Publish/Subscribe
    • Channel
    • Push/Pull
    • Survey/Respond
  • Stats (RTT, throughput, packet drops etc.)
  • Request/Reply basic stats
  • Queuing
  • Pluggable transport layer
    • TCP
    • TLS
    • IPC
    • UDP
    • Inproc
  • Durable IO abstraction (built-in retries and reconnections)
  • Simulation modes with Turmoil

MSRV

The minimum supported Rust version is 1.70.

Contributions & Bug Reports

If you are interested in contributing or have found a bug, please check out the contributing guide. Please report any bugs or doubts you encounter by opening a Github issue.

Additionally, you can reach out to us on Discord if you have any questions or just want to chat.

License

This project is licensed under the Open Source MIT license.

msg-rs's People

Contributors

mempirate avatar merklefruit avatar quangkeu95 avatar solidoracle avatar patstiles avatar

Stargazers

Zeke Mostov avatar Hardik sharma avatar Alexander Kirillov avatar Javed Khan avatar Lev Khoroshansky avatar zerosnacks avatar kq avatar Pierre Tondereau avatar Jerome Gravel-Niquet avatar greyireland avatar DK avatar  avatar advaith avatar Jack McPherson avatar Haris Muzaffar avatar Rohan Sundar avatar Rubens Brandão avatar 0xDmtri avatar Sabnock avatar gibz avatar jpgonzalezra avatar eth_sign avatar Diego Brito avatar Sandalots avatar evalir avatar Roman Krasiuk avatar Ammar Arif avatar vanbeethoven.eth avatar  avatar  avatar Stone Gao avatar tesseract avatar Georgios Konstantopoulos avatar  avatar prames avatar  avatar  avatar chirag-bgh avatar

Watchers

 avatar  avatar  avatar  avatar

msg-rs's Issues

Proposal: `msg-sim` crate

Context

Being able to simulate latency, bandwidth, packet loss etc. ("shaping traffic") to reflect the real world is all incredibly useful for testing & benchmarking different protocols. It's how we discovered that QUIC in most cases is actually slower than TCP. We used comcast to add 50ms of latency and a max bandwidth of 1Gbps to localhost, and then ran our benchmarks again.

Proposal

It would be nice to be able to shape traffic in unit tests, integration tests and benchmarks inside of MSG, so I'm proposing the msg-sim crate to do just that. msg-sim will have the following features:

  • Support both MacOS and Linux
  • Highly configurable
  • Standalone crate for use in other projects
  • Will be able to do everything comcast can do

Simulation

We usually use localhost for testing, which is the loopback interface. Aside from it being a virtual network interface, it also has a much higher MTU than normal, doesn't use Ethernet etc.

qol: use `ToSocketAddrs` trait for connections

Tokio::net exposes a ToSocketAddrs trait (docs: https://docs.rs/tokio/latest/tokio/net/trait.ToSocketAddrs.html)
which can be handy for resolving dns names and binding to a socket address.

Here is how this is used in Tokio's TcpListener:

docs: https://docs.rs/tokio/latest/src/tokio/net/tcp/listener.rs.html#100-118

pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
            let addrs = to_socket_addrs(addr).await?;

            let mut last_err = None;

            for addr in addrs {
                match TcpListener::bind_addr(addr) {
                    Ok(listener) => return Ok(listener),
                    Err(e) => last_err = Some(e),
                }
            }

            Err(last_err.unwrap_or_else(|| {
                io::Error::new(
                    io::ErrorKind::InvalidInput,
                    "could not resolve to any address",
                )
            }))
        }

We could use this in the public facing socket API :)

Reasonable backoff strategies

Context

Our current backoff strategy is just retrying every 100ms. This should be configurable, and we should support both linear and exponential backoff strategies.

The current code is here:

fn on_disconnect(mut self: Pin<&mut Self>, cx: &mut Context<'_>) {
// We copy here because we can't do it after borrowing self in the match below
let endpoint = self.endpoint;
match &mut self.state {
SessionState::Connected(_) => {
error!("Session was disconnected from {}", self.endpoint);
self.state = SessionState::Disconnected(ReconnectStatus {
attempts: 0,
current_attempt: Some(Io::establish(self.endpoint)),
});
}
SessionState::Disconnected(reconnect_status) => {
debug!(
attempts = reconnect_status.attempts,
"Reconnect failed, retrying..."
);
reconnect_status.attempts += 1;
// Start and set the new reconnect attempt
let attempt = Box::pin(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
Io::establish(endpoint).await
});
reconnect_status.current_attempt = Some(attempt);
}
SessionState::Processing(_) => {
error!("Session was disconnected from {}", self.endpoint);
self.state = SessionState::Disconnected(ReconnectStatus {
attempts: 0,
current_attempt: Some(Io::establish(self.endpoint)),
});
}
SessionState::Terminated(_) => {
unreachable!("Session was already terminated")
}
}
// Register the waker to make sure the reconnect attempt is polled
cx.waker().wake_by_ref();
}

`SubSocket`: specify decompressor on a connection level

Context

Currently we can only specify a single decompressor that gets used across all publisher connections. It is likely however that a subscriber will subscribe to multiple publishers, that all use a different compression format. We should therefore be able to specify a decompressor per connection (i.e. as a connect option).

feat: subscription policies

As part of #13, we want to add the ability for subscribers to specify the "durability policy" that they want to use.
Here's how that would look like:

  • subscriber (ReqSocket or SubSocket) connects to a target, specifying the policy: if they would like to receive messages from the connection point onward, or if they want to also receive cached messages that were already broadcasted earlier than the connection time.
  • On the publisher's side there will always be a limit of how many messages to replay for the specific subscriber. Nonetheless, this will be useful for when clients are starting up to make sure they don't lose initial messages.

feat(reqrep): Add compression

We should add Compression to reqrep sockets like we did for pubsub.

It should work the same way: add a compression identifier in the header to specify how the message was compressed, and decompress messages based on that.

`flush_interval` for `ReqSocket` and `RepSocket`

Same as with the PubSocket, any high-throughput socket writer should have an optional flush_interval in the options that should be respected when flushing the Framed connection. Calling flush too often adds a lot of syscall overhead as discussed in #11.

The default here should actually be None imo, and flushing should happen after every send, since the usual behaviour for request response (at least in our case) will be low-latency and low throughput. But it should be configurable for other use cases nonetheless. The benchmarks should also play around with this option.

Tasks

Implement keepalives

In some situations, keepalives can be useful for making sure the connection stays up. We should implement a simple Ping/Pong mechanism to do this.

Tasks

Support request timeouts

ReqSocket should support request timeouts, i.e. all pending requests are given an expiration timestamp according to the config, and every interval (<= timeout duration), we evict timed out requests.

`msg-sim` Linux implementation

Context

Since #54, we have an initial implementation for our network simulation crate for MacOS, but we're still missing one for Linux. The simulation crate allows us to start simulated endpoints in Rust tests that we can then use to emulate a real world environment. The MacOS one uses dummynet with some ifconfig commands and a hacky localhost alias to configure a simulated endpoint. We want to do the same for Linux, but luckily Linux has some better tools for these types of things:

  • ip command: with the ip command, we can create network namespaces, virtual ethernet devices, dummy interfaces etc. I think dummy interfaces specifically will be most useful in the beginning (it's basically a separate localhost but with normal MTU for more accurate simulation).
  • tc command: the tc (traffic-control) command can be used with netem (a kernel module) to shape traffic.

Some inspiration:

Other notes:

  • These commands all require root access. Maybe there's a way to utilize network namespaces to not need root access? Also the CAP_NET_ADMIN capability allows us to do this stuff without root but I don't think it's useful when running tests. And granting capabilities also require root.
  • What other things can we do with network namespaces? A completely simulated p2p network where each peer is in its own namespace and can have its own latency, bandwidth and packet loss rates? This will need veth or virtual ethernet devices as well.

feat(bench): throughput benchmarks

We want to measure the throughput of different sockets in localhost to track possible improvements and changes over time.

In terms of libraries that we can use, we should look into:

As part of this issue, we should make a throughput benchmark for the existing socket types and make the process of adding a new benchmark as simple as possible for the future.

feat: driver high-water marks

The current req and pub driver have infinite capacity. This means that if for some reason messages accumulate in the pending requests queue, the application will eventually run out of memory and crash.

  • example in req driver:

// Check for outgoing messages from the socket handle
match this.from_socket.poll_recv(cx) {
Poll::Ready(Some(Command::Send { message, response })) => {
// Queue the message for sending
let start = std::time::Instant::now();
let msg = this.new_message(message);
let id = msg.id();
this.egress_queue.push_back(msg);
this.pending_requests.insert(
id,
PendingRequest {
start,
sender: response,
},

We should add some sort of HWM that either blocks or drops messages after a specific capacity is reached.

An option could be to use LruCache instead of the current FxHashMap.
Further improvements include different StorageBackends including layered cache between memory and disk (e.g. LruCache -> RocksDB)


Edit:

  • See conversation in #9 (comment)
  • On the subscriber level, a durability Policy settings should be added to indicate if we intend to receive cached messages or just the ones send from the connection point onward.

`ConnectionHooks` trait for user-provided callbacks on connection hooks

Context

There is currently no way to track the full lifecycle of individual connections. We should have some way for users to track those, which we can do with "hooks" or "callbacks".

Implementation

We should add a trait ConnectionHooks that a user can implement for their own structs and provide to the sockets. The connection hooks exposes the following methods that will be called at various points in the connection lifecycle:

  • on_connection(remote_addr, auth_token): a new connection was created (or recreated). On the server side, this will inject the authentication token (replaces Authenticator trait).
  • on_disconnect(remote_addr, error): a connection was disconnected with a transient failure (error & endpoint will be injected). The connection will be retried.
  • on_termination(remote_addr, error): a connection encountered a fatal error and will not be retried.

These methods will allow implementers to track the complete lifecycle of any connections.

Benchmarking tool similar to iperf3

Context

It would be cool to have a benchmarking tool (a binary) that can spin up different sockets and benchmark throughput & latency for different message sizes and socket configurations. This would allow users to test different socket configurations (transports, buffers, etc) to find their optimal performance profile.

feat(socket): PUB/SUB implementation

The pub/sub messaging pattern has many advantages in real-time messaging applications:

  • it removes the need for polling
  • it makes service discovery easy
  • it allows for decoupling between pub and sub

As part of msg-rs we want to implement pub/sub in such a way that:

  • The SUB socket is the client and can connect to $0$ to $n$ PUB sockets, which act as the server.
  • On connecting, the SUB socket sets a filter, which will be applied on the server side (inside of the PUB socket).
  • The PUB socket posts to various topics in the form of foo.bar.baz

As with other socket types in msg-rs, the implementation must be transport agnostic.

Tasks

Support `pubsub` compression

Context

Sockets should support compression in a modular way: socket.with_compressor(impl Compressor).
For now we'll just have one trait for both compression and decompression, though in theory we could split them up to allow one way compression. We implement it on a message level, not a stream level: each payload will be compressed.

/// This trait is used to implement message-level compression algorithms for payloads.
/// On outgoing messages, the payload is compressed before being sent using the `compress` method.
/// On incoming messages, the inverse happens using the `decompress` method.
pub trait Compressor: Send + Sync + Unpin + 'static {
    /// Compresses a byte slice payload into a `Bytes` object.
    fn compress(&self, data: &[u8]) -> Result<Bytes, io::Error>;

    /// Decompresses a compressed byte slice into a `Bytes` object.
    fn decompress(&self, data: &[u8]) -> Result<Bytes, io::Error>;
}

Implementation

Benchmarks

I tested the Gzip compression with an SSZ encoded block of 352239 bytes.

Below is a benchmark of the compression level, the resulting payload size, and the duration of compression/decompression (Macbook M1 Pro, release mode)

Level Resulting Size (bytes) Duration
1 207255 1.53ms
2. 201950 4.27ms
3 198711 5.26ms
6 196134 9.75ms

docs: `msg-sim` mdbook entry

We don't have a book entry with docs and a guide on how to use the sim crate for both macOS and linux yet. The README is already great, so it's probably fine to start by copying that over to the book and modify it where necessary.

feat: Pub/Sub Compression type in message Headers

Instead of specifying the Decompressor expected in the sub socket, we could add a flag in the message header to indicate if the payload has been compressed, and with which algorithm.

Example:

#[derive(Debug, Clone)]
pub struct Header {
    /// Compression used for the payload
    /// - 0: none
    /// - 1: gzip
    /// - 2: zstd
    pub(crate) compression_type: u16,

    /// Size of the topic in bytes.
    pub(crate) topic_size: u16,
    /// The actual topic.
    pub(crate) topic: Bytes,
    /// The UNIX timestamp in microseconds.
    pub(crate) timestamp: u64,
    /// The message sequence number.
    pub(crate) seq: u32,
    /// The size of the message. Max 4GiB.
    pub(crate) size: u32,
}

This would allow for both a better UX on the subscribers, and a more granular approach to compression to allow for link-aware optimisations (e.g. #42)

feat: Pub/Sub link-aware publishers

In order to implement link-aware optimisations (for instance, decide to compress messages over a link only if the latency over that link is greater than a specific threshold) we could implement the following:

  • all Publishers also subscribe to a "control topic"
  • all Subscribers also publish to that topic

Since every sub socket already retains its Stats, we would only need to publish aggregated statistics on a regular interval or every X messages received (aka "sampling").

This feature will allow for smarter, link-aware publishers.

feat(sim): Turmoil `SimulatedTcp` transport & example

Turmoil will allow us to perform network simulations for most of the msg-rs functionality.

The best way to handle this is probably by defining custom Simulated transport variants.

For this ticket, the goal is to created a SimulatedTcp transport that implements ClientTransport and ServerTransport, and use it to replicate the basic example in the context of a Turmoil simulation.

Fix `ReqSocket` connection retries

Context

In #59 we moved retry logic up to the socket level (got rid of Durable IO abstraction). This broke retries in the ReqSocket which rely on the lower level retries. To fix this, move connection & reconnection logic into the ReqDriver and follow the same pattern asSubSocket.

Support IPv6

Context

As of now, we only support IPv4. We should support IPv6 as well on our transport implementations.

Redesign client-side authentication flow + retry logic

Context

Authentication on the client side just means sending an auth token with the auth wire format over the stream before anything else. This should happen at the socket-level because it should be transport-agnostic. However, because we currently handle TCP reconnects at the stream-level with stubborn IO abstractions, the stream also needs to run through the auth flow every time it reconnects (and thus needs to be aware of the necessary authentication).

This is not ideal, and for example doesn't fit well with QUIC streams (that automatically handle reconnects and thus only need to authenticate once). That's why QUIC doesn't support authentication yet.

Proposal

Get rid of stubborn IO abstractions on such a low-level and introduce them at a higher level (in the socket). The transport abstraction is only used to get IO objects (normal AsyncRead + AsyncWrite streams). If one of these fails, the client socket will have some sort of ConnectionManager to retry the connection and run through the auth flow again. That way we don't leak this authentication abstraction into the lower levels.

Investigate `tokio-uring` usage

Context

A large part of CPU cycles in the benchmarks are spent on syscalls and the associated copying of data. We could (potentially) improve on this by leveraging io_uring through tokio-uring.

This should be gated by a feature and only possible on Linux platforms (if kernel is up-to-date).

For more information about tokio-uring check out the design document

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.