I wrote a tls server, but occasionally the program gets stuck after a few days of actual operation.
The tcp listening port is 50011.
I found that Recv-Q is always 1025.
impl DiscoveryServer {
pub async fn new(s: &Settings, n: &Arc<RwLock<HashMap<String, NodeDescription>>>) -> Self {
log::info!(
"create discovery server listener on {:?}",
format!("{}:{}", "0.0.0.0", s.server.listen_port)
);
DiscoveryServer {
tcp_socket: new_listener(format!("{}:{}", "0.0.0.0", s.server.listen_port), false)
.await
.unwrap(),
settings: s.clone(),
nodes: n.clone(),
}
}
pub async fn start(self) -> ResultType<()> {
log::info!("start discovery server");
let tls_acceptor = new_tls_acceptor();
tokio::spawn(async move {
loop {
match self.tcp_socket.accept().await {
Ok((stream, addr)) => {
let acceptor = tls_acceptor.clone();
let res_servers = self.nodes.clone();
let res_cities = self.settings.config_item.city_list.clone();
tokio::spawn(async move {
match TlsFrameStream::from(stream, acceptor).await {
Ok(mut tls_stream) => {
if let Some(Ok(bytes)) = tls_stream.next_timeout(MESSAGE_TIMEOUT).await {
if let Ok(msg_in) = DiscoveryMessage::parse_from_bytes(&bytes) {
match msg_in.union {
Some(discovery_message::Union::request(req)) => {
log::info!("msg from client:{}, request:{}", addr, req);
handle_request(&res_servers, res_cities, tls_stream, req).await;
}
_ => {
log::warn!("unknown union type from msg_in, type:{:?}", msg_in.union);
}
}
}
}
},
Err(e) => log::error!("error accept client, err: {}", e),
}
});
}
Err(err) => {
log::error!("error accept tcp socket, err: {}", err);
}
}
}
});
Ok(())
}
}
pub fn load_certs(filename: &str) -> Vec<rustls::Certificate> {
let certfile = File::open(filename).expect("cannot open certificate file");
let mut reader = BufReader::new(certfile);
rustls_pemfile::certs(&mut reader)
.unwrap()
.iter()
.map(|v| rustls::Certificate(v.clone()))
.collect()
}
pub fn load_private_key(filename: &str) -> rustls::PrivateKey {
let keyfile = File::open(filename).expect("cannot open private key file");
let mut reader = BufReader::new(keyfile);
loop {
match rustls_pemfile::read_one(&mut reader).expect("cannot parse private key .pem file") {
Some(rustls_pemfile::Item::RSAKey(key)) => return rustls::PrivateKey(key),
Some(rustls_pemfile::Item::PKCS8Key(key)) => return rustls::PrivateKey(key),
None => break,
_ => {}
}
}
panic!(
"no keys found in {:?} (encrypted keys not supported)",
filename
);
}
pub fn lookup_ipv4(host: &str, port: u16) -> SocketAddr {
let addrs = (host, port).to_socket_addrs().unwrap();
for addr in addrs {
if let SocketAddr::V4(_) = addr {
return addr;
}
}
unreachable!("Cannot lookup address");
}
fn make_client_config(
ca_file: &str,
certs_file: &str,
key_file: &str,
) -> Arc<rustls::ClientConfig> {
let cert_file = File::open(&ca_file).expect("Cannot open CA file");
let mut reader = BufReader::new(cert_file);
let mut root_store = RootCertStore::empty();
root_store.add_parsable_certificates(&rustls_pemfile::certs(&mut reader).unwrap());
let suites = rustls::DEFAULT_CIPHER_SUITES.to_vec();
let versions = rustls::DEFAULT_VERSIONS.to_vec();
let certs = load_certs(certs_file);
let key = load_private_key(key_file);
let config = rustls::ClientConfig::builder()
.with_cipher_suites(&suites)
.with_safe_default_kx_groups()
.with_protocol_versions(&versions)
.expect("inconsistent cipher-suite/versions selected")
.with_root_certificates(root_store)
.with_single_cert(certs, key)
.expect("invalid client auth certs/key");
Arc::new(config)
}
fn make_server_config(certs: &str, key_file: &str) -> Arc<rustls::ServerConfig> {
let client_auth = NoClientAuth::new();
let suites = rustls::ALL_CIPHER_SUITES.to_vec();
let versions = rustls::ALL_VERSIONS.to_vec();
let certs = load_certs(certs);
let privkey = load_private_key(key_file);
let mut config = rustls::ServerConfig::builder()
.with_cipher_suites(&suites)
.with_safe_default_kx_groups()
.with_protocol_versions(&versions)
.expect("inconsistent cipher-suites/versions specified")
.with_client_cert_verifier(client_auth)
.with_single_cert_with_ocsp_and_sct(certs, privkey, vec![], vec![])
.expect("bad certificates/private key");
config.key_log = Arc::new(rustls::KeyLogFile::new());
config.session_storage = rustls::server::ServerSessionMemoryCache::new(256);
Arc::new(config)
}
pub async fn new_tls_stream(
domain: &str,
addr: std::net::SocketAddr,
ca_file: &str,
cert_file: &str,
key_file: &str,
) -> ResultType<ClientTlsStream<TcpStream>> {
let config = make_client_config(&ca_file, &cert_file, &key_file);
let connector = TlsConnector::from(config);
let tcp_stream = TcpStream::connect(&addr).await?;
let domain = rustls::ServerName::try_from(domain)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid dnsname"))
.unwrap();
let tls_stream = connector.connect(domain, tcp_stream).await?;
Ok(tls_stream)
}
pub fn new_tls_acceptor() -> TlsAcceptor {
let config = make_server_config(CERT.server_cert_file, CERT.server_key_file);
let acceptor = TlsAcceptor::from(config);
acceptor
}
pub struct TlsFrameStream {
pub client_stream: Option<ClientTlsStream<TcpStream>>,
pub server_stream: Option<ServerTlsStream<TcpStream>>,
peer_addr: SocketAddr,
}
impl TlsFrameStream {
pub async fn from(stream: TcpStream, acceptor: TlsAcceptor) -> ResultType<Self> {
let addr = stream.peer_addr()?;
let tls_stream = match acceptor.accept(stream).await {
Ok(tls_stream) => tls_stream,
Err(e) => {
return Err(anyhow!("accept stream failed.., error: {:?}", e));
}
};
Ok(TlsFrameStream {
client_stream: None,
server_stream: Some(tls_stream),
peer_addr: addr,
})
}
pub async fn new_for_client(server_addr: SocketAddr, ms_timeout: u64) -> ResultType<Self> {
let tls_stream = super::timeout(
ms_timeout,
new_tls_stream(
"localhost",
server_addr,
CERT.ca_file,
CERT.client_cert_file,
CERT.client_key_file,
),
)
.await??;
Ok(TlsFrameStream {
client_stream: Some(tls_stream),
server_stream: None,
peer_addr: server_addr,
})
}
#[inline]
pub async fn next(&mut self) -> Option<Result<BytesMut, Error>> {
let mut bytes = BytesMut::with_capacity(DEFAULT_BUFFER_SIZE);
match self.client_stream.as_mut() {
None => {}
Some(stream) => {
stream.read_buf(&mut bytes).await.unwrap();
return Some(Ok(bytes));
}
};
match self.server_stream.as_mut() {
None => None,
Some(stream) => {
stream.read_buf(&mut bytes).await.unwrap();
return Some(Ok(bytes));
}
}
}
#[inline]
pub async fn send(&mut self, msg: &impl Message) -> ResultType<()> {
self.send_raw(msg.write_to_bytes()?).await
}
#[inline]
pub async fn send_raw(&mut self, msg: Vec<u8>) -> ResultType<()> {
match self.client_stream.as_mut() {
None => {}
Some(stream) => {
stream.write_all(&msg).await.unwrap();
return Ok(());
}
}
match self.server_stream.as_mut() {
None => return Ok(()),
Some(stream) => {
stream.write_all(&msg).await.unwrap();
return Ok(());
}
}
}
#[inline]
pub async fn next_timeout(&mut self, ms: u64) -> Option<Result<BytesMut, Error>> {
if let Ok(res) =
tokio::time::timeout(std::time::Duration::from_millis(ms), self.next()).await
{
res
} else {
None
}
}
pub async fn shutdown(&mut self) -> ResultType<()> {
log::info!("shutdown connection {:?}", self.peer_addr);
match self.client_stream.as_mut() {
None => {}
Some(stream) => {
stream.shutdown().await?;
return Ok(());
}
}
match self.server_stream.as_mut() {
None => return Ok(()),
Some(stream) => {
stream.shutdown().await?;
return Ok(());
}
}
}
}
impl Drop for TlsFrameStream {
fn drop(&mut self) {
match block_on(self.shutdown()) {
Err(e) => {
log::error!("close connection {:?} failed, reson: {:?}", self.peer_addr, e);
}
_ => {}
};
}
}