events: try to wait for available connection slots
This commit is contained in:
parent
1d97134839
commit
42fad4920a
|
@ -1,3 +1,4 @@
|
|||
use futures::Future;
|
||||
use std::{fmt, io, pin::Pin};
|
||||
|
||||
use futures::SinkExt;
|
||||
|
@ -114,6 +115,17 @@ impl EventServer {
|
|||
Self { channel }
|
||||
}
|
||||
|
||||
/// Returns a future that returns [`Poll::Pending`] until the client count falls below the threshold.
|
||||
pub fn until_available(&self) -> EventServerAvailability<'_> {
|
||||
EventServerAvailability { parent: self }
|
||||
}
|
||||
|
||||
/// Whether we're able to take connections.
|
||||
#[inline]
|
||||
fn is_available(&self) -> bool {
|
||||
self.channel.receiver_count() < MAX_RECEIVERS
|
||||
}
|
||||
|
||||
/// Spawns a new [`EventEmitter`] where events will be sent to.
|
||||
pub async fn spawn<T: AsyncWrite + Send + 'static>(
|
||||
&self,
|
||||
|
@ -122,8 +134,8 @@ impl EventServer {
|
|||
) -> io::Result<()> {
|
||||
// Sender<T> doesn't have a try_subscribe, so this is our last-ditch
|
||||
// effort to avoid a panic
|
||||
if self.channel.receiver_count() + 1 > MAX_RECEIVERS {
|
||||
todo!("Too many connections!");
|
||||
if !self.is_available() {
|
||||
return Err(io::Error::new(io::ErrorKind::Other, "Too many connections"));
|
||||
}
|
||||
|
||||
EventEmitter::new(inner, client_addr.into(), self.channel.subscribe()).run();
|
||||
|
@ -152,3 +164,24 @@ impl Default for EventServer {
|
|||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct EventServerAvailability<'a> {
|
||||
parent: &'a EventServer,
|
||||
}
|
||||
|
||||
impl<'a> Future for EventServerAvailability<'a> {
|
||||
type Output = ();
|
||||
|
||||
fn poll(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Self::Output> {
|
||||
use std::task::Poll;
|
||||
|
||||
if self.parent.is_available() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,8 @@ pub async fn event_server(ctx: Context) -> io::Result<()> {
|
|||
let sock = UnixListener::bind(&ctx.config.rpc.events_sock)?;
|
||||
|
||||
loop {
|
||||
// Wait until we have an available slot for a connection
|
||||
ctx.events.until_available().await;
|
||||
let (client, addr) = sock.accept().await?;
|
||||
ctx.events.spawn(client, addr).await?;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue