diff --git a/nzr-api/src/event/server.rs b/nzr-api/src/event/server.rs index d096eca..f790d95 100644 --- a/nzr-api/src/event/server.rs +++ b/nzr-api/src/event/server.rs @@ -1,3 +1,4 @@ +use futures::Future; use std::{fmt, io, pin::Pin}; use futures::SinkExt; @@ -114,6 +115,17 @@ impl EventServer { Self { channel } } + /// Returns a future that returns [`Poll::Pending`] until the client count falls below the threshold. + pub fn until_available(&self) -> EventServerAvailability<'_> { + EventServerAvailability { parent: self } + } + + /// Whether we're able to take connections. + #[inline] + fn is_available(&self) -> bool { + self.channel.receiver_count() < MAX_RECEIVERS + } + /// Spawns a new [`EventEmitter`] where events will be sent to. pub async fn spawn( &self, @@ -122,8 +134,8 @@ impl EventServer { ) -> io::Result<()> { // Sender doesn't have a try_subscribe, so this is our last-ditch // effort to avoid a panic - if self.channel.receiver_count() + 1 > MAX_RECEIVERS { - todo!("Too many connections!"); + if !self.is_available() { + return Err(io::Error::new(io::ErrorKind::Other, "Too many connections")); } EventEmitter::new(inner, client_addr.into(), self.channel.subscribe()).run(); @@ -152,3 +164,24 @@ impl Default for EventServer { Self::new() } } + +pub struct EventServerAvailability<'a> { + parent: &'a EventServer, +} + +impl<'a> Future for EventServerAvailability<'a> { + type Output = (); + + fn poll( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + use std::task::Poll; + + if self.parent.is_available() { + Poll::Ready(()) + } else { + Poll::Pending + } + } +} diff --git a/nzrd/src/event.rs b/nzrd/src/event.rs index d839781..901e8ae 100644 --- a/nzrd/src/event.rs +++ b/nzrd/src/event.rs @@ -8,6 +8,8 @@ pub async fn event_server(ctx: Context) -> io::Result<()> { let sock = UnixListener::bind(&ctx.config.rpc.events_sock)?; loop { + // Wait until we have an available slot for a connection + ctx.events.until_available().await; let (client, addr) = sock.accept().await?; ctx.events.spawn(client, addr).await?; }