events: use JSON instead of Bincode
Bincode doesn't support serde's deserialize_any, so it's easier to use JSON for de/serializing events. Since they'll likely be pretty sparse, the size difference shouldn't be a big deal.
This commit is contained in:
parent
ece1f9a089
commit
40532c9e36
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -1540,7 +1540,6 @@ dependencies = [
|
||||||
name = "nzr-api"
|
name = "nzr-api"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bincode",
|
|
||||||
"diesel",
|
"diesel",
|
||||||
"figment",
|
"figment",
|
||||||
"futures",
|
"futures",
|
||||||
|
@ -1549,6 +1548,7 @@ dependencies = [
|
||||||
"log",
|
"log",
|
||||||
"regex",
|
"regex",
|
||||||
"serde",
|
"serde",
|
||||||
|
"serde_json",
|
||||||
"tarpc",
|
"tarpc",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
|
|
@ -23,7 +23,7 @@ regex = "1"
|
||||||
lazy_static = "1"
|
lazy_static = "1"
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tokio-serde = { version = "0.9", features = ["bincode"] }
|
tokio-serde = { version = "0.9", features = ["bincode"] }
|
||||||
bincode = "1.3"
|
serde_json = "1"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
uuid = { version = "1.2.2", features = ["serde", "v4"] }
|
uuid = { version = "1.2.2", features = ["serde", "v4"] }
|
||||||
|
|
|
@ -41,7 +41,7 @@ where
|
||||||
Poll::Ready(res) => {
|
Poll::Ready(res) => {
|
||||||
let our_res = res.map(|res| {
|
let our_res = res.map(|res| {
|
||||||
res.map_err(|e| e.into()).and_then(|bytes| {
|
res.map_err(|e| e.into()).and_then(|bytes| {
|
||||||
let msg: EventMessage = bincode::deserialize(&bytes)?;
|
let msg: EventMessage = serde_json::from_slice(&bytes)?;
|
||||||
Ok(msg)
|
Ok(msg)
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
pub mod client;
|
pub mod client;
|
||||||
pub mod server;
|
pub mod server;
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test;
|
||||||
|
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
|
@ -8,7 +10,7 @@ use thiserror::Error;
|
||||||
|
|
||||||
use crate::model;
|
use crate::model;
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
pub enum ResourceAction {
|
pub enum ResourceAction {
|
||||||
/// The referenced resource was created.
|
/// The referenced resource was created.
|
||||||
Created,
|
Created,
|
||||||
|
@ -19,7 +21,7 @@ pub enum ResourceAction {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Represents an event pertaining to a specific action.
|
/// Represents an event pertaining to a specific action.
|
||||||
#[derive(Clone, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct ResourceEvent<T> {
|
pub struct ResourceEvent<T> {
|
||||||
pub action: ResourceAction,
|
pub action: ResourceAction,
|
||||||
/// The entity that was acted upon.
|
/// The entity that was acted upon.
|
||||||
|
@ -27,7 +29,7 @@ pub struct ResourceEvent<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Represents any event that is emitted by Nazrin.
|
/// Represents any event that is emitted by Nazrin.
|
||||||
#[derive(Clone, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
#[serde(tag = "event")]
|
#[serde(tag = "event")]
|
||||||
pub enum EventMessage {
|
pub enum EventMessage {
|
||||||
/// A subnet was created, modified, or deleted.
|
/// A subnet was created, modified, or deleted.
|
||||||
|
@ -41,7 +43,7 @@ pub enum EventError {
|
||||||
#[error("Transport error: {0}")]
|
#[error("Transport error: {0}")]
|
||||||
Transport(#[from] io::Error),
|
Transport(#[from] io::Error),
|
||||||
#[error("Serialization error: {0}")]
|
#[error("Serialization error: {0}")]
|
||||||
Bincode(#[from] bincode::Error),
|
Json(#[from] serde_json::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait Emittable {
|
pub trait Emittable {
|
||||||
|
|
|
@ -20,6 +20,8 @@ pub enum SocketAddr {
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
Unix(std::os::unix::net::SocketAddr),
|
Unix(std::os::unix::net::SocketAddr),
|
||||||
Net(std::net::SocketAddr),
|
Net(std::net::SocketAddr),
|
||||||
|
#[cfg(test)]
|
||||||
|
None,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Display for SocketAddr {
|
impl fmt::Display for SocketAddr {
|
||||||
|
@ -30,6 +32,8 @@ impl fmt::Display for SocketAddr {
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
Self::Unix(addr) => std::fmt::Debug::fmt(addr, f),
|
Self::Unix(addr) => std::fmt::Debug::fmt(addr, f),
|
||||||
Self::Net(addr) => addr.fmt(f),
|
Self::Net(addr) => addr.fmt(f),
|
||||||
|
#[cfg(test)]
|
||||||
|
Self::None => write!(f, "mock client"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -145,7 +149,7 @@ impl EventServer {
|
||||||
|
|
||||||
/// Send the given event to all connected clients.
|
/// Send the given event to all connected clients.
|
||||||
pub async fn emit(&self, msg: EventMessage) {
|
pub async fn emit(&self, msg: EventMessage) {
|
||||||
let bytes = match bincode::serialize(&msg) {
|
let bytes = match serde_json::to_vec(&msg) {
|
||||||
Ok(bytes) => bytes,
|
Ok(bytes) => bytes,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
tracing::error!("Failed to serialize: {err}");
|
tracing::error!("Failed to serialize: {err}");
|
||||||
|
|
43
nzr-api/src/event/test.rs
Normal file
43
nzr-api/src/event/test.rs
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use futures::StreamExt;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
event::{server::SocketAddr, EventMessage, ResourceAction},
|
||||||
|
net::cidr::CidrV4,
|
||||||
|
nzr_event,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn event_serde() {
|
||||||
|
let (rx, tx) = tokio::io::duplex(1024);
|
||||||
|
let server = super::server::EventServer::default();
|
||||||
|
server.spawn(tx, SocketAddr::None).await.unwrap();
|
||||||
|
let mut client = super::client::EventClient::new(rx);
|
||||||
|
let net = CidrV4::from_str("192.0.2.0/24").unwrap();
|
||||||
|
let some_subnet = crate::model::Subnet {
|
||||||
|
name: "whatever".into(),
|
||||||
|
data: crate::model::SubnetData {
|
||||||
|
ifname: "eth0".into(),
|
||||||
|
network: net,
|
||||||
|
start_host: net.make_ip(10).unwrap(),
|
||||||
|
end_host: net.make_ip(254).unwrap(),
|
||||||
|
gateway4: Some(net.make_ip(1).unwrap()),
|
||||||
|
dns: Vec::new(),
|
||||||
|
domain_name: Some("homestarrunnner.net".parse().unwrap()),
|
||||||
|
vlan_id: None,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
nzr_event!(server, Created, some_subnet);
|
||||||
|
let next = client
|
||||||
|
.next()
|
||||||
|
.await
|
||||||
|
.expect("client must receive message")
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let EventMessage::Subnet(net) = next else {
|
||||||
|
panic!("Unexpected event received: {next:?}");
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(net.action, ResourceAction::Created);
|
||||||
|
}
|
Loading…
Reference in a new issue