Code Monkey home page Code Monkey logo

hyperion's Introduction

Hyperion

Star History Chart

Discord invite link wakatime

From the creator of SwarmBot:

How can we get 10k players to PvP at once on a Minecraft server to break the Guinness World Record for largest PvP battle in any game of 8,825 players?

The image below shows 100k zombies (with collisions) running at ~8 ms/tick on an M2 MacBook Pro.

Running

Step 1: The event

git clone https://github.com/andrewgazelka/hyperion
cd hyperion
cargo run --release -p infection

When joining the server downloads a map and loads it.

Step 2: The proxy

  1. Join the Discord server
  2. Look in the #build channel for the latest proxy release
  3. Run it with ./{executable_name}. You will likely need to make it executable first with chmod +x ./{exeuctable_name}

FAQ

Q: How is hyperion so fast?

  • Hyperion generally does a good job at utilizing all cores of your device. On an M2 MBP, CPU utilization is over 1000% when spawning hundreds of thousands of zombies.
  • We aim to utilize as much SIMD as possible. This is still a work in progress, but as it is built out we are aiming to use SIMD-friendly data structures. Make sure to compile with RUSTFLAGS='-C target-cpu=native' to allow the compiler to use SIMD intrinsics.
  • A lot of work has been done in reducing synchronization and limiting context switching. For instance, #[thread_local] is heavily relied upon.

Q: Aren't you re-inventing the wheel?

  • No, we rely on valence-protocol and evenio, an ECS framework created by rj00a the creator of valence.
    • Although we rely on this work, we do not completely depend upon valence as it is currently being rewritten to use evenio due to (among other things) performance limitations using bevy.

Q: What is the goal of this project? Making valence 2.0?

  • Nope, the goal of this project is to break the Guinness World Record. Everything else is secondary.
  • We are not implementing a 1:1 with a vanilla Minecraft server. We are only implementing enough to support the event which will have 10k players.

Q: How will this handle 10k players given the network requirements?

  • The current idea is to have load balancers which do encryption/decryption and compression/decompression with a direct link to hyperion.

Q: Why not just use a distributed server?

  • This adds a lot of complexity and there are always trade-offs. Of course given an event with 10k players real-world players are needed to see if a server can truly handle them (bots only are so realistic). I suppose if there is some inherent limiting factor, this could be distributed, but given current performance estimations, I highly doubt making the server distributed will be the best path of action—in particular because there will most likely not be isolated regions in the world.

hyperion's People

Contributors

andrewgazelka avatar testingplant avatar ruben2424 avatar eoghanmc22 avatar eduxstad avatar loofifteen avatar herin049 avatar

Stargazers

_creare_ avatar  avatar ld338_ avatar Thandi R. Menelas avatar  avatar Ran avatar  avatar  avatar Artyom avatar Marlon avatar Sakkyoi Cheng avatar  avatar Game_Time avatar henrycunh avatar avi avatar  avatar Leon Hajdari avatar raizo avatar Kesku avatar Théo Rozier avatar Diana avatar Rafael Ristovski avatar Aksel Hjerpbakk avatar Phyrone avatar Vamist avatar Arthur Damasceno avatar rewin avatar Sander Mertens avatar David Lee / 이다윗 avatar ashley avatar Aurélie Alvet avatar Jelles avatar Brody avatar Gio Rice avatar Cody avatar SzYmZa1 avatar Swargaraj Bhowmik avatar naomi avatar chronikum avatar  avatar Khinenw avatar hmmhmmhm avatar  avatar loloed avatar Sébastien VIAL avatar kuba avatar NickAc avatar jadon avatar Patrick Meade avatar Ben Hull avatar  avatar RaINi_ avatar David Carboveanu avatar hackermondev avatar Dj Isaac avatar Jones avatar B3NNY avatar Naz avatar w imie ojca skidow avatar euwbah avatar Budi Syahiddin avatar DevMiner avatar Ritik Mishra avatar Larson T. avatar earomc avatar TheGreatRambler avatar Ulrik avatar Tekena Solomon avatar Vincent Knight avatar  avatar Turbofish avatar  avatar  avatar  avatar  avatar Blue_ON avatar RanolP avatar  avatar Nolij avatar Jiseok CHOI avatar Iha Shin (Cosmo, 신의하) avatar storycraft avatar David Fischer avatar  avatar Vetlix avatar Austin Hale avatar Neil Chudleigh avatar  avatar Yusuf avatar Murilo Matsubara avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar Wyatt Stanke avatar Ben Caunt avatar  avatar

Watchers

 avatar  avatar

hyperion's Issues

Support masking

Networking Packet Management in Gaming

In online, the handling of packets plays a crucial role in maintaining smooth gameplay and efficient network communication. One concern is the scenario where a player sends a packet that is then echoed back to them unnecessarily. While this may not immediately disrupt gameplay, it can lead to inefficiencies and potentially impact the player experience over time.

Potential Issues

  • Redundant Data: Receiving echoed packets can result in redundant data processing and network usage.
  • Bandwidth Consumption: Unnecessary packet echoing can consume additional bandwidth without providing significant benefits.
  • Latency: Increased traffic from echoed packets may contribute to higher latency in the gameplay experience.

Recommendations

To optimize packet management and network efficiency, it is advisable to implement a masking mechanism in the networking logic. This mechanism would selectively filter out packets that do not require a response back to the originating player. By integrating this masking mechanism into the broadcast logic, developers can tailor the packet flow to prioritize relevant data transmission.

In essence, by masking specific packets intelligently, game developers can streamline network traffic, reduce unnecessary data processing, and ultimately enhance the overall gaming experience for players.

add custom rayon-like thread pool

I think rayon might slow things down. it also complicated stacktraces. I have an assumption rayon is made to run for a process that is not meant to be using max CPU constantly (as is common on a laptop with other processes). However, this is a game server and will be running as a dedicated process.

Handle ioctl error properly

// TODO: Handle ioctl error properly

        Ok(())
    }

    /// This function returns the number of bytes in the TCP send queue that have
    /// not yet been transmitted to the network.
    ///
    /// It utilizes the `ioctl` system call with the `TIOCOUTQ` operation on Unix-like systems to
    /// query this information. The function safely checks the `ioctl` return value to ensure no
    /// errors occur during the operation.
    ///
    /// If running on non-Unix systems, it currently returns `0` by default.
    ///
    /// Proper error handling for `ioctl` failures should be added, and support for other operating
    /// systems needs to be considered for portability.
    pub(crate) fn queued_send(&self) -> libc::c_int {
        if cfg!(unix) {
            let mut value: libc::c_int = 0;
            // SAFETY: raw_fd is valid since the TcpStream is still alive, and value is valid to
            // write to
            unsafe {
                // TODO: Handle ioctl error properly
                assert_ne!(
                    libc::ioctl(self.raw_fd, libc::TIOCOUTQ, core::ptr::addr_of_mut!(value)),
                    -1
                );
            }
            value
        } else {
            // TODO: Support getting queued send for other OS
            0
        }
    }
}

pub struct Packets {

handle error

// todo: handle error

use evenio::{prelude::*, rayon::prelude::*};
use tracing::instrument;
use valence_protocol::VarInt;

use crate::{KillAllEntities, MinecraftEntity, Player};

#[instrument(skip_all)]
pub fn kill_all(
    _r: ReceiverMut<KillAllEntities>,
    entities: Fetcher<(EntityId, &MinecraftEntity, Not<&Player>)>,
    mut players: Fetcher<&mut Player>,
    mut s: Sender<Despawn>,
) {
    let ids = entities.iter().map(|(id, ..)| id).collect::<Vec<_>>();

    #[allow(clippy::cast_possible_wrap)]
    let entity_ids = ids.iter().map(|id| VarInt(id.index().0 as i32)).collect();

    let despawn_packet = valence_protocol::packets::play::EntitiesDestroyS2c { entity_ids };

    players.par_iter_mut().for_each(|player| {
        // todo: handle error
        let _ = player.packets.writer.send_packet(&despawn_packet);
    });

    for id in ids {
        s.send(Despawn(id));
    }
}

Determine max_bytes using the player's network speed, latency, and current

send window size

let max_bytes = 25_000; // 4 Mbit/s

// TODO: Determine max_bytes using the player's network speed, latency, and current

use std::cell::Cell;

use bytes::Bytes;
use evenio::{event::Receiver, fetch::Fetcher, query::Not};
use fastrand::Rng;
use tracing::{instrument, trace};
use valence_protocol::math::DVec2;

use crate::{
    singleton::encoder::Encoder, BroadcastPackets, FullEntityPose, MinecraftEntity, Player, Uuid,
};

#[thread_local]
static RNG: Cell<Option<Rng>> = Cell::new(None);

// TODO: Split broadcast_packets into separate functions
#[allow(clippy::cognitive_complexity)]
#[instrument(skip_all)]
pub fn broadcast_packets(
    _: Receiver<BroadcastPackets>,
    player: Fetcher<(&Uuid, &FullEntityPose, &Player, Not<&MinecraftEntity>)>,
) {
    let start = std::time::Instant::now();

    Encoder::par_drain(|encoder| {
        if encoder.necessary_packets.is_empty() && encoder.droppable_packets.is_empty() {
            return;
        }

        let start = std::time::Instant::now();

        let mut rng = RNG.take().unwrap_or_default();

        // TODO: Avoid taking packet_data so that the capacity can be reused
        let packet_data = Bytes::from(core::mem::take(&mut encoder.packet_data));

        for (player_uuid, pose, player, _) in &player {
            let player_location = DVec2::new(pose.position.x, pose.position.y);

            // Max bytes that should be sent this tick
            // TODO: Determine max_bytes using the player's network speed, latency, and current
            // send window size
            let max_bytes = 25_000; // 4 Mbit/s
            let mut total_bytes_sent = 0;

            for packet in &encoder.necessary_packets {
                if packet.exclude_player == Some(player_uuid.0) {
                    continue;
                }

                if player
                    .packets
                    .writer
                    .send_raw(packet_data.slice(packet.offset..packet.offset + packet.len))
                    .is_err()
                {
                    return;
                }
                total_bytes_sent += packet.len;
            }

            if total_bytes_sent < max_bytes {
                let all_droppable_packets_len = encoder
                    .droppable_packets
                    .iter()
                    .map(|packet| packet.len)
                    .sum::<usize>();
                if all_droppable_packets_len + total_bytes_sent <= max_bytes {
                    for packet in &encoder.droppable_packets {
                        if packet.exclude_player == Some(player_uuid.0) {
                            continue;
                        }

                        if player
                            .packets
                            .writer
                            .send_raw(packet_data.slice(packet.offset..packet.offset + packet.len))
                            .is_err()
                        {
                            return;
                        }

                        // total_bytes_sent is not increased because it is no longer used
                    }
                } else {
                    // todo: remove shuffling; this is inefficient
                    rng.shuffle(&mut encoder.droppable_packets);
                    for packet in &encoder.droppable_packets {
                        if packet.exclude_player == Some(player_uuid.0) {
                            continue;
                        }

                        // TODO: Determine chance better
                        // This currently picks packets closest to the front more often than
                        // packets in the back. To compensate for this, droppable_packets is
                        // shuffled, but ideally shuffling shouldn't be necessary.
                        let distance_squared =
                            packet.prioritize_location.distance_squared(player_location);
                        let chance = (1.0 / distance_squared).clamp(0.05, 1.0);
                        let chance_u8 = (chance * 255.0) as u8;
                        let keep = rng.u8(..) > chance_u8;

                        if !keep {
                            continue;
                        }

                        if total_bytes_sent + packet.len > max_bytes {
                            // In theory, this loop could keep going if the current packet is large
                            // and the rest of the packets are small. However, most of these
                            // droppable packets are small, so it's not worth it to check the rest
                            // of the packets.
                            break;
                        }

                        if player
                            .packets
                            .writer
                            .send_raw(packet_data.slice(packet.offset..packet.offset + packet.len))
                            .is_err()
                        {
                            return;
                        }

                        total_bytes_sent += packet.len;
                    }
                }
            }
        }

        RNG.set(Some(rng));
        encoder.clear_packets();

        trace!(
            "took {:?} to broadcast packets with specific encoder",
            start.elapsed()
        );
    });
    trace!("took {:?} to broadcast packets", start.elapsed());
}

implement full login sequence

https://wiki.vg/index.php?title=Protocol_FAQ&oldid=18181

Client connects to the server

Split broadcast_packets into separate functions

// TODO: Split broadcast_packets into separate functions

use std::cell::Cell;

use bytes::Bytes;
use evenio::{event::Receiver, fetch::Fetcher, query::Not};
use fastrand::Rng;
use tracing::{instrument, trace};
use valence_protocol::math::DVec2;

use crate::{
    singleton::encoder::Encoder, BroadcastPackets, FullEntityPose, MinecraftEntity, Player, Uuid,
};

#[thread_local]
static RNG: Cell<Option<Rng>> = Cell::new(None);

// TODO: Split broadcast_packets into separate functions
#[allow(clippy::cognitive_complexity)]
#[instrument(skip_all)]
pub fn broadcast_packets(
    _: Receiver<BroadcastPackets>,
    player: Fetcher<(&Uuid, &FullEntityPose, &Player, Not<&MinecraftEntity>)>,
) {
    let start = std::time::Instant::now();

    Encoder::par_drain(|encoder| {
        if encoder.necessary_packets.is_empty() && encoder.droppable_packets.is_empty() {
            return;
        }

        let start = std::time::Instant::now();

        let mut rng = RNG.take().unwrap_or_default();

        // TODO: Avoid taking packet_data so that the capacity can be reused
        let packet_data = Bytes::from(core::mem::take(&mut encoder.packet_data));

        for (player_uuid, pose, player, _) in &player {
            let player_location = DVec2::new(pose.position.x, pose.position.y);

            // Max bytes that should be sent this tick
            // TODO: Determine max_bytes using the player's network speed, latency, and current
            // send window size
            let max_bytes = 25_000; // 4 Mbit/s
            let mut total_bytes_sent = 0;

            for packet in &encoder.necessary_packets {
                if packet.exclude_player == Some(player_uuid.0) {
                    continue;
                }

                if player
                    .packets
                    .writer
                    .send_raw(packet_data.slice(packet.offset..packet.offset + packet.len))
                    .is_err()
                {
                    return;
                }
                total_bytes_sent += packet.len;
            }

            if total_bytes_sent < max_bytes {
                let all_droppable_packets_len = encoder
                    .droppable_packets
                    .iter()
                    .map(|packet| packet.len)
                    .sum::<usize>();
                if all_droppable_packets_len + total_bytes_sent <= max_bytes {
                    for packet in &encoder.droppable_packets {
                        if packet.exclude_player == Some(player_uuid.0) {
                            continue;
                        }

                        if player
                            .packets
                            .writer
                            .send_raw(packet_data.slice(packet.offset..packet.offset + packet.len))
                            .is_err()
                        {
                            return;
                        }

                        // total_bytes_sent is not increased because it is no longer used
                    }
                } else {
                    // todo: remove shuffling; this is inefficient
                    rng.shuffle(&mut encoder.droppable_packets);
                    for packet in &encoder.droppable_packets {
                        if packet.exclude_player == Some(player_uuid.0) {
                            continue;
                        }

                        // TODO: Determine chance better
                        // This currently picks packets closest to the front more often than
                        // packets in the back. To compensate for this, droppable_packets is
                        // shuffled, but ideally shuffling shouldn't be necessary.
                        let distance_squared =
                            packet.prioritize_location.distance_squared(player_location);
                        let chance = (1.0 / distance_squared).clamp(0.05, 1.0);
                        let chance_u8 = (chance * 255.0) as u8;
                        let keep = rng.u8(..) > chance_u8;

                        if !keep {
                            continue;
                        }

                        if total_bytes_sent + packet.len > max_bytes {
                            // In theory, this loop could keep going if the current packet is large
                            // and the rest of the packets are small. However, most of these
                            // droppable packets are small, so it's not worth it to check the rest
                            // of the packets.
                            break;
                        }

                        if player
                            .packets
                            .writer
                            .send_raw(packet_data.slice(packet.offset..packet.offset + packet.len))
                            .is_err()
                        {
                            return;
                        }

                        total_bytes_sent += packet.len;
                    }
                }
            }
        }

        RNG.set(Some(rng));
        encoder.clear_packets();

        trace!(
            "took {:?} to broadcast packets with specific encoder",
            start.elapsed()
        );
    });
    trace!("took {:?} to broadcast packets", start.elapsed());
}

Handle getsockopt error properly

// TODO: Handle getsockopt error properly

    }

    /// This function returns the number of bytes in the TCP send queue that have
    /// been sent but have not been acknowledged by the client.
    ///
    /// If running on non-Unix systems, it currently returns `0` by default.
    ///
    /// Proper error handling for `ioctl` failures should be added, and support for other operating
    /// systems needs to be considered for portability.
    pub(crate) fn queued_send(&self) -> libc::c_int {
        #[cfg(target_os = "linux")]
        {
            let mut value: libc::c_int = 0;
            // SAFETY: raw_fd is valid since the TcpStream is still alive, and value is valid to
            // write to
            unsafe {
                // TODO: Handle ioctl error properly
                assert_ne!(
                    libc::ioctl(self.raw_fd, libc::TIOCOUTQ, addr_of_mut!(value)),
                    -1
                );
            }
            value
        }

        #[cfg(target_os = "macos")]
        {
            let mut value: libc::c_int = 0;
            let mut len: libc::socklen_t = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
            // SAFETY: raw_fd is valid since the TcpStream is still alive, value and len are valid
            // to write to, and value and len do not alias
            unsafe {
                // TODO: Handle getsockopt error properly
                assert_ne!(
                    libc::getsockopt(
                        self.raw_fd,
                        libc::SOL_SOCKET,
                        libc::SO_NWRITE,
                        addr_of_mut!(value).cast(),
                        addr_of_mut!(len)
                    ),
                    -1
                );
            }
            value
        }

        // TODO: Support getting queued send for other OS
    }
}

enable packets & logic for multiplayer

Multiplayer Game Development Update

Currently, in the multiplayer game, all players are invisible to each other, and every packet sent is essentially a broadcast packet. This broadcast is not localized, meaning any change in a player's or entity's location is sent to player, regardless of their distance from the event. While this setup suffices for simple tests, it poses challenges when applied to a larger game world.

To address this issue, a new branch has been created to implement logic that optimizes network traffic by only sending relevant information to each player. The aim is to enhance the game's performance by streamlining packet distribution based on the players' locations and interactions.

https://github.com/andrewgazelka/hyperion/tree/feat-broadcast/broadcast/

Determine chance better

This currently picks packets closest to the front more often than

packets in the back. To compensate for this, droppable_packets is

shuffled, but ideally shuffling shouldn't be necessary.

and the rest of the packets are small. However, most of these

droppable packets are small, so it's not worth it to check the rest

of the packets.

// TODO: Determine chance better

use std::cell::Cell;

use bytes::Bytes;
use evenio::{event::Receiver, fetch::Fetcher, query::Not};
use fastrand::Rng;
use tracing::{instrument, trace};
use valence_protocol::math::DVec2;

use crate::{
    singleton::encoder::Encoder, BroadcastPackets, FullEntityPose, MinecraftEntity, Player, Uuid,
};

#[thread_local]
static RNG: Cell<Option<Rng>> = Cell::new(None);

// TODO: Split broadcast_packets into separate functions
#[allow(clippy::cognitive_complexity)]
#[instrument(skip_all)]
pub fn broadcast_packets(
    _: Receiver<BroadcastPackets>,
    player: Fetcher<(&Uuid, &FullEntityPose, &Player, Not<&MinecraftEntity>)>,
) {
    let start = std::time::Instant::now();

    Encoder::par_drain(|encoder| {
        if encoder.necessary_packets.is_empty() && encoder.droppable_packets.is_empty() {
            return;
        }

        let start = std::time::Instant::now();

        let mut rng = RNG.take().unwrap_or_default();

        // TODO: Avoid taking packet_data so that the capacity can be reused
        let packet_data = Bytes::from(core::mem::take(&mut encoder.packet_data));

        for (player_uuid, pose, player, _) in &player {
            let player_location = DVec2::new(pose.position.x, pose.position.y);

            // Max bytes that should be sent this tick
            // TODO: Determine max_bytes using the player's network speed, latency, and current
            // send window size
            let max_bytes = 25_000; // 4 Mbit/s
            let mut total_bytes_sent = 0;

            for packet in &encoder.necessary_packets {
                if packet.exclude_player == Some(player_uuid.0) {
                    continue;
                }

                if player
                    .packets
                    .writer
                    .send_raw(packet_data.slice(packet.offset..packet.offset + packet.len))
                    .is_err()
                {
                    return;
                }
                total_bytes_sent += packet.len;
            }

            if total_bytes_sent < max_bytes {
                let all_droppable_packets_len = encoder
                    .droppable_packets
                    .iter()
                    .map(|packet| packet.len)
                    .sum::<usize>();
                if all_droppable_packets_len + total_bytes_sent <= max_bytes {
                    for packet in &encoder.droppable_packets {
                        if packet.exclude_player == Some(player_uuid.0) {
                            continue;
                        }

                        if player
                            .packets
                            .writer
                            .send_raw(packet_data.slice(packet.offset..packet.offset + packet.len))
                            .is_err()
                        {
                            return;
                        }

                        // total_bytes_sent is not increased because it is no longer used
                    }
                } else {
                    // todo: remove shuffling; this is inefficient
                    rng.shuffle(&mut encoder.droppable_packets);
                    for packet in &encoder.droppable_packets {
                        if packet.exclude_player == Some(player_uuid.0) {
                            continue;
                        }

                        // TODO: Determine chance better
                        // This currently picks packets closest to the front more often than
                        // packets in the back. To compensate for this, droppable_packets is
                        // shuffled, but ideally shuffling shouldn't be necessary.
                        let distance_squared =
                            packet.prioritize_location.distance_squared(player_location);
                        let chance = (1.0 / distance_squared).clamp(0.05, 1.0);
                        let chance_u8 = (chance * 255.0) as u8;
                        let keep = rng.u8(..) > chance_u8;

                        if !keep {
                            continue;
                        }

                        if total_bytes_sent + packet.len > max_bytes {
                            // In theory, this loop could keep going if the current packet is large
                            // and the rest of the packets are small. However, most of these
                            // droppable packets are small, so it's not worth it to check the rest
                            // of the packets.
                            break;
                        }

                        if player
                            .packets
                            .writer
                            .send_raw(packet_data.slice(packet.offset..packet.offset + packet.len))
                            .is_err()
                        {
                            return;
                        }

                        total_bytes_sent += packet.len;
                    }
                }
            }
        }

        RNG.set(Some(rng));
        encoder.clear_packets();

        trace!(
            "took {:?} to broadcast packets with specific encoder",
            start.elapsed()
        );
    });
    trace!("took {:?} to broadcast packets", start.elapsed());
}

Device not configured: `libc::ioctl`

❯ cargo run --release
   Compiling chunk v0.1.0 (/Users/andrewgazelka/Projects/personal/hyperion/chunk)
   Compiling server v0.1.0 (/Users/andrewgazelka/Projects/personal/hyperion/server)
    Finished `release` profile [optimized] target(s) in 12.54s
     Running `target/release/server`
thread 'io' panicked at server/src/io.rs:256:17:
assertion `left != right` failed
  left: -1
 right: -1
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

This occurs on my MBP Apple M2 Max on Sonoma.

    pub(crate) fn queued_send(&self) -> Result<libc::c_int, Errno> {
        if cfg!(unix) {
            let mut value: libc::c_int = 0;
            // SAFETY: raw_fd is valid since the TcpStream is still alive, and value is valid to
            // write to
            let result = unsafe {
                libc::ioctl(self.raw_fd, libc::TIOCOUTQ, core::ptr::addr_of_mut!(value))
                // libc::ioctl(self.raw_fd, libc::TIOCOUTQ, &mut value as *mut _)
            };
            if result == -1 {
                // Error occurred, extract errno
                Err(errno())
            } else {
                Ok(value)
            }
        } else {
            // TODO: Support getting queued send for other OS
            Err(Errno(libc::ENOSYS)) // Return an error indicating the operation is not implemented
        }
    }
thread 'io' panicked at server/src/io.rs:452:62:
called `Result::unwrap()` on an `Err` value: Errno { code: 6, description: Some("Device not configured") }
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

random

let uuid = offline_uuid(&username)?; // todo: random

        let username: Box<str> = Box::from(username.0);

        let uuid = offline_uuid(&username)?; // todo: random

        let packet = LoginSuccessS2c {
            uuid,
            username: Bounded::from(&*username),
            properties: Cow::default(),
        };

Support getting queued send for other OS

// TODO: Support getting queued send for other OS

        Ok(())
    }

    /// This function returns the number of bytes in the TCP send queue that have
    /// not yet been transmitted to the network.
    ///
    /// It utilizes the `ioctl` system call with the `TIOCOUTQ` operation on Unix-like systems to
    /// query this information. The function safely checks the `ioctl` return value to ensure no
    /// errors occur during the operation.
    ///
    /// If running on non-Unix systems, it currently returns `0` by default.
    ///
    /// Proper error handling for `ioctl` failures should be added, and support for other operating
    /// systems needs to be considered for portability.
    pub(crate) fn queued_send(&self) -> libc::c_int {
        if cfg!(unix) {
            let mut value: libc::c_int = 0;
            // SAFETY: raw_fd is valid since the TcpStream is still alive, and value is valid to
            // write to
            unsafe {
                // TODO: Handle ioctl error properly
                assert_ne!(
                    libc::ioctl(self.raw_fd, libc::TIOCOUTQ, core::ptr::addr_of_mut!(value)),
                    -1
                );
            }
            value
        } else {
            // TODO: Support getting queued send for other OS
            0
        }
    }
}

pub struct Packets {

Add packet sending benchmarks

So it's really important to benchmark the package sending and probably have a lot of bots, so I might try to actually benchmark it with SwarmBot and have that in one of the benches

remove all unwrap let ignored errors

It's very important to remove all unwraps and ignored errors. For instance, if there is a let _ = use case, that is potentially bad because are ignoring the error. We don't want to do that; we want to handle every single error more than just by logging it.

For example, if a player is sending a packet, we probably want to message that player or disconnect them if it's an invalid packet. We may not always want to just ignore it. If a player's TCP socket gets interrupted or closes, we want to handle that situation by making sure we can send every player a disconnect packet. Therefore, it would be a good idea to remove all the allows for the unwraps and try to eliminate most unwraps from the code base.

pretty sure there are edge cases that are not considered io read/write

System Design Considerations and Future Improvements

When it comes to the current process of reading and writing data, there are various complexities and invariants that must be maintained for the system to function correctly. It is crucial to address these potential pitfalls to prevent errors caused by overlooking these intricate invariants that can be challenging to debug.

One specific area of concern is the use of a thread-local encoder for both broadcast and local packets. This dual usage introduces the risk of errors, especially when handling serialization and removal of local packets post-serialization. The intricacies of this logic pose a significant opportunity for mistakes to occur.

To mitigate these issues, there is ongoing work on the broadcast branch aimed at enhancing the system's robustness. The improvements implemented in this branch are expected to address some of the existing challenges and enhance the overall reliability of the data reading and writing processes.

https://github.com/andrewgazelka/hyperion/tree/feat-broadcast/broadcast/

Avoid taking packet_data so that the capacity can be reused

// TODO: Avoid taking packet_data so that the capacity can be reused

use std::cell::Cell;

use bytes::Bytes;
use evenio::{event::Receiver, fetch::Fetcher, query::Not};
use fastrand::Rng;
use tracing::{instrument, trace};
use valence_protocol::math::DVec2;

use crate::{
    singleton::encoder::Encoder, BroadcastPackets, FullEntityPose, MinecraftEntity, Player, Uuid,
};

#[thread_local]
static RNG: Cell<Option<Rng>> = Cell::new(None);

// TODO: Split broadcast_packets into separate functions
#[allow(clippy::cognitive_complexity)]
#[instrument(skip_all)]
pub fn broadcast_packets(
    _: Receiver<BroadcastPackets>,
    player: Fetcher<(&Uuid, &FullEntityPose, &Player, Not<&MinecraftEntity>)>,
) {
    let start = std::time::Instant::now();

    Encoder::par_drain(|encoder| {
        if encoder.necessary_packets.is_empty() && encoder.droppable_packets.is_empty() {
            return;
        }

        let start = std::time::Instant::now();

        let mut rng = RNG.take().unwrap_or_default();

        // TODO: Avoid taking packet_data so that the capacity can be reused
        let packet_data = Bytes::from(core::mem::take(&mut encoder.packet_data));

        for (player_uuid, pose, player, _) in &player {
            let player_location = DVec2::new(pose.position.x, pose.position.y);

            // Max bytes that should be sent this tick
            // TODO: Determine max_bytes using the player's network speed, latency, and current
            // send window size
            let max_bytes = 25_000; // 4 Mbit/s
            let mut total_bytes_sent = 0;

            for packet in &encoder.necessary_packets {
                if packet.exclude_player == Some(player_uuid.0) {
                    continue;
                }

                if player
                    .packets
                    .writer
                    .send_raw(packet_data.slice(packet.offset..packet.offset + packet.len))
                    .is_err()
                {
                    return;
                }
                total_bytes_sent += packet.len;
            }

            if total_bytes_sent < max_bytes {
                let all_droppable_packets_len = encoder
                    .droppable_packets
                    .iter()
                    .map(|packet| packet.len)
                    .sum::<usize>();
                if all_droppable_packets_len + total_bytes_sent <= max_bytes {
                    for packet in &encoder.droppable_packets {
                        if packet.exclude_player == Some(player_uuid.0) {
                            continue;
                        }

                        if player
                            .packets
                            .writer
                            .send_raw(packet_data.slice(packet.offset..packet.offset + packet.len))
                            .is_err()
                        {
                            return;
                        }

                        // total_bytes_sent is not increased because it is no longer used
                    }
                } else {
                    // todo: remove shuffling; this is inefficient
                    rng.shuffle(&mut encoder.droppable_packets);
                    for packet in &encoder.droppable_packets {
                        if packet.exclude_player == Some(player_uuid.0) {
                            continue;
                        }

                        // TODO: Determine chance better
                        // This currently picks packets closest to the front more often than
                        // packets in the back. To compensate for this, droppable_packets is
                        // shuffled, but ideally shuffling shouldn't be necessary.
                        let distance_squared =
                            packet.prioritize_location.distance_squared(player_location);
                        let chance = (1.0 / distance_squared).clamp(0.05, 1.0);
                        let chance_u8 = (chance * 255.0) as u8;
                        let keep = rng.u8(..) > chance_u8;

                        if !keep {
                            continue;
                        }

                        if total_bytes_sent + packet.len > max_bytes {
                            // In theory, this loop could keep going if the current packet is large
                            // and the rest of the packets are small. However, most of these
                            // droppable packets are small, so it's not worth it to check the rest
                            // of the packets.
                            break;
                        }

                        if player
                            .packets
                            .writer
                            .send_raw(packet_data.slice(packet.offset..packet.offset + packet.len))
                            .is_err()
                        {
                            return;
                        }

                        total_bytes_sent += packet.len;
                    }
                }
            }
        }

        RNG.set(Some(rng));
        encoder.clear_packets();

        trace!(
            "took {:?} to broadcast packets with specific encoder",
            start.elapsed()
        );
    });
    trace!("took {:?} to broadcast packets", start.elapsed());
}

is Cow best practice here?

// todo: is Cow best practice here?

use std::borrow::Cow;

#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub struct RangeInclusive<T = u32> {
    pub start: T,
    pub end: T,
}

impl<T> RangeInclusive<T> {
    pub const fn new(start: T, end: T) -> Self {
        Self { start, end }
    }
}

struct GroupIterator<'a> {
    input: Cow<'a, [u32]>,
    current_pos: usize,
}

impl<'a> Iterator for GroupIterator<'a> {
    type Item = RangeInclusive<u32>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.current_pos >= self.input.len() {
            return None;
        }

        let start = self.input[self.current_pos];
        let mut end = start;

        while self.current_pos + 1 < self.input.len() && self.input[self.current_pos + 1] == end + 1
        {
            self.current_pos += 1;
            end = self.input[self.current_pos];
        }

        self.current_pos += 1; // Prepare for the next range

        Some(RangeInclusive { start, end })
    }
}

// todo: could probably make this more SIMD friendly
// todo: is Cow best practice here?
pub fn group<'a>(
    input: impl Into<Cow<'a, [u32]>>,
) -> impl Iterator<Item = RangeInclusive<u32>> + 'a {
    let input = input.into();
    debug_assert!(input.windows(2).all(|w| w[0] < w[1]));

    GroupIterator {
        input,
        current_pos: 0,
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_single_element_groups() {
        let input = vec![1, 3, 5];
        let result: Vec<_> = group(input).collect();
        assert_eq!(result, vec![
            RangeInclusive::new(1, 1),
            RangeInclusive::new(3, 3),
            RangeInclusive::new(5, 5),
        ]);
    }

    #[test]
    fn test_multiple_element_groups() {
        let input = vec![1, 2, 3, 6, 8];
        let result: Vec<_> = group(&input).collect();
        assert_eq!(result, vec![
            RangeInclusive::new(1, 3),
            RangeInclusive::new(6, 6),
            RangeInclusive::new(8, 8),
        ]);
    }

    #[test]
    fn test_empty_input() {
        let input: Vec<u32> = vec![];
        let result: Vec<_> = group(&input).collect();
        assert_eq!(result, Vec::<RangeInclusive<u32>>::new());
    }

    #[test]
    fn test_consecutive_and_nonconsecutive_mix() {
        let input = vec![1, 2, 3, 5, 6, 7, 9];
        let result: Vec<_> = group(&input).collect();
        assert_eq!(result, vec![
            RangeInclusive::new(1, 3),
            RangeInclusive::new(5, 7),
            RangeInclusive::new(9, 9),
        ]);
    }

    #[test]
    fn test_large_range() {
        let input = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let result: Vec<_> = group(&input).collect();
        assert_eq!(result, vec![RangeInclusive::new(1, 10),]);
    }
}

(low prio) scripting language to interface?

Decide if it makes sense to have a scripting language interface with this tool. I think it would be interesting to look into. There are a couple of options. Lua could be used. Another option would be to support WebAssembly and have a WebAssembly executor that would run code at native performance. In that case, we could support a lot of different languages, but I don't know if that would be worth it. I'm not exactly sure how that would work.

figure out how partitions are going to be implemeneted

Architecture Planning for Parallel Processing

Currently, the approach involves implementing parallelism at the entity level. The original concept, however, aimed at partitioning the world into distinct segments optimized for parallel processing. For instance, if there are 32 cores available, the idea is to have 32 partitions of the world that can operate concurrently.

Considerations and Challenges:

  • Efficiency: Assess the efficiency gains and trade-offs between entity-level parallelism and world partitioning.
  • Transition Handling: Ensure seamless transitions between partitions to prevent conflicts and maintain data integrity.
  • Concurrency Issues: Address concerns related to simultaneous access to shared resources within different partitions.

Future Steps:

  1. Evaluation: Determine the performance impact of transitioning from entity-level parallelism to world partitioning.
  2. Optimization: Explore optimization techniques such as RGB coloring or quadtree structures to manage entity transitions between partitions efficiently.
  3. Priority: Assess the priority of this optimization relative to other development needs.

By addressing these considerations, the system can potentially leverage parallel processing at a larger scale while mitigating potential concurrency challenges.

init

#[thread_local]

static BROADCASTER: RefCell<Option<Broadcaster>> = RefCell::new(None);

    }
}

// // todo init
// #[thread_local]
// static BROADCASTER: RefCell<Option<Broadcaster>> = RefCell::new(None);

#[thread_local]
static ENCODER: Cell<PacketBuffer> = Cell::new(PacketBuffer::new());

update speed storage to use player ping

it makes measuring speed more practical because the server will send packets

to the client more often than 1 second

// todo: clarify why 1 second?

        });

        monoio::spawn(async move {
            let mut past_queued_send = 0;
            let mut past_instant = Instant::now();
            while let Ok(bytes) = s2c_rx.recv_async().await {
                let len = bytes.len();
                if let Err(e) = io_write.send_packet(bytes).await {
                    error!("{e:?}");
                    break;
                }
                let elapsed = past_instant.elapsed();

                // todo: clarify why 1 second?
                if elapsed > Duration::from_secs(1) {
                    let queued_send = io_write.queued_send();
                    speed.store(
                        ((past_queued_send - queued_send) as f32 / elapsed.as_secs_f32()) as u32,
                        Ordering::Relaxed,
                    );
                    past_queued_send = io_write.queued_send();
                    past_instant = Instant::now();
                } else {
                    // This will make the estimated speed slightly lower than the actual speed, but
                    // it makes measuring speed more practical because the server will send packets
                    // to the client more often than 1 second
                    #[allow(clippy::cast_possible_wrap)]
                    {
                        past_queued_send += len as libc::c_int;
                    }
                }
            }
        });

add system for registering chat commands

allow registering chat command handlers or some way that we can make it not all one giant code glob (separation of concerns)

fn chat_command(
mut data: &[u8],
player: &mut Player,
full_entity_pose: &FullEntityPose,
sender: &mut Sender<(KickPlayer, InitEntity, KillAllEntities)>,
) -> anyhow::Result<()> {
const BASE_RADIUS: f32 = 4.0;
let pkt = play::CommandExecutionC2s::decode(&mut data)?;
let mut cmd = pkt.command.0.split(' ');
let first = cmd.next();
if first == Some("ka") {
sender.send(KillAllEntities);
}
if first == Some("spawn") {
let args: Vec<_> = cmd.collect();
let loc = full_entity_pose.position;
let [x, y, z] = match args.as_slice() {
&[x, y, z] => [x.parse()?, y.parse()?, z.parse()?],
[x] => {
let count = x.parse()?;
// normalize over the number
#[expect(clippy::cast_possible_truncation, reason = "sqrt of f64 is f32")]
let radius = BASE_RADIUS * f64::from(count).sqrt() as f32;
for _ in 0..count {
// spawn in 100 block radius
let x = (rand::random::<f32>() - 0.5).mul_add(radius, loc.x);
let y = loc.y;
let z = (rand::random::<f32>() - 0.5).mul_add(radius, loc.z);
sender.send(InitEntity {
pose: FullEntityPose {
position: Vec3::new(x, y, z),
yaw: 0.0,
pitch: 0.0,
bounding: BoundingBox::create(Vec3::new(x, y, z), 0.6, 1.8),
},
});
}
return Ok(());
}
[] => [HybridPos::Relative(0.0); 3],
_ => bail!("expected 3 numbers"),
};
let x = match x {
HybridPos::Absolute(x) => x,
HybridPos::Relative(x) => loc.x + x,
};
let y = match y {
HybridPos::Absolute(y) => y,
HybridPos::Relative(y) => loc.y + y,
};
let z = match z {
HybridPos::Absolute(z) => z,
HybridPos::Relative(z) => loc.z + z,
};
player
.packets
.writer
.send_chat_message(&format!("Spawning zombie at {x}, {y}, {z}"))?;
sender.send(InitEntity {
pose: FullEntityPose {
position: Vec3::new(x, y, z),
yaw: 0.0,
pitch: 0.0,
bounding: BoundingBox::create(Vec3::new(x, y, z), 0.6, 1.8),
},
});
}
Ok(())
}

add miri

It would be a very good idea to add Miri support for code that is unsafe at some time, just to verify that it works properly and isn't actually secretly undefined. We didn't realize that.

MVP Tracking issue

MVP: Overcast Network-inspired Game

play.oc.tc

Overview

  • Inspiration: Overcast Network, featuring games like Destroy the Obsidian and Capture the Wool
  • Requirements Based on Game Concept:
    • Fixed world size
    • Capacity for up to 10,000 players
    • Support for mining and placing blocks
    • Possible inclusion of fluids, bows, and items
    • Exclusion of entities with complex logic or unusual bounding boxes (e.g., Ender Dragons)
    • Minimal support for numerous NPCs, particularly in initial stages of development

move `*.zst` to seperate repo

I don't want to have to deal with them taking up a bunch of space and there are issue with LFS and contributors. #78

Possibly the other repo could have LFS.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.