Compare commits

..

No commits in common. "40532c9e36443fe38775adadd15b383ec7f17c8d" and "0cb3aea62e27278a9db885c101f9bdfe90d4f96b" have entirely different histories.

30 changed files with 406 additions and 1127 deletions

75
Cargo.lock generated
View file

@ -1046,6 +1046,17 @@ dependencies = [
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
[[package]]
name = "hostname"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba"
dependencies = [
"cfg-if",
"libc",
"windows",
]
[[package]] [[package]]
name = "http" name = "http"
version = "1.1.0" version = "1.1.0"
@ -1521,6 +1532,15 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "num_threads"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "nzr" name = "nzr"
version = "0.9.0" version = "0.9.0"
@ -1548,12 +1568,9 @@ dependencies = [
"log", "log",
"regex", "regex",
"serde", "serde",
"serde_json",
"tarpc", "tarpc",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-serde 0.9.0",
"tracing",
"uuid", "uuid",
] ]
@ -1587,6 +1604,7 @@ dependencies = [
"home", "home",
"libc", "libc",
"libsqlite3-sys", "libsqlite3-sys",
"log",
"nix", "nix",
"nzr-api", "nzr-api",
"nzr-virt", "nzr-virt",
@ -1598,13 +1616,12 @@ dependencies = [
"serde_with", "serde_with",
"serde_yaml", "serde_yaml",
"stdext", "stdext",
"syslog",
"tarpc", "tarpc",
"tempfile", "tempfile",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-serde 0.9.0", "tokio-serde 0.9.0",
"tracing",
"tracing-subscriber",
"trait-variant", "trait-variant",
"uuid", "uuid",
"zerocopy", "zerocopy",
@ -1624,21 +1641,6 @@ dependencies = [
"tracing-subscriber", "tracing-subscriber",
] ]
[[package]]
name = "nzrdns"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"futures",
"hickory-proto",
"hickory-server",
"nzr-api",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]] [[package]]
name = "object" name = "object"
version = "0.36.2" version = "0.36.2"
@ -2289,6 +2291,18 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394"
[[package]]
name = "syslog"
version = "7.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "019f1500a13379b7d051455df397c75770de6311a7a188a699499502704d9f10"
dependencies = [
"hostname",
"libc",
"log",
"time",
]
[[package]] [[package]]
name = "tabled" name = "tabled"
version = "0.15.0" version = "0.15.0"
@ -2412,7 +2426,9 @@ checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885"
dependencies = [ dependencies = [
"deranged", "deranged",
"itoa", "itoa",
"libc",
"num-conv", "num-conv",
"num_threads",
"powerfmt", "powerfmt",
"serde", "serde",
"time-core", "time-core",
@ -2926,6 +2942,25 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be"
dependencies = [
"windows-core",
"windows-targets",
]
[[package]]
name = "windows-core"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets",
]
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.42.0" version = "0.42.0"

View file

@ -1,3 +1,3 @@
[workspace] [workspace]
members = ["nzrd", "nzr-api", "client", "nzrdhcp", "nzr-virt", "omyacid", "nzrdns"] members = ["nzrd", "nzr-api", "client", "nzrdhcp", "nzr-virt", "omyacid"]
resolver = "2" resolver = "2"

View file

@ -1,6 +1,5 @@
use clap::{CommandFactory, FromArgMatches, Parser, Subcommand}; use clap::{CommandFactory, FromArgMatches, Parser, Subcommand};
use nzr_api::config; use nzr_api::config;
use nzr_api::error::Simplify;
use nzr_api::hickory_proto::rr::Name; use nzr_api::hickory_proto::rr::Name;
use nzr_api::model; use nzr_api::model;
use nzr_api::net::cidr::CidrV4; use nzr_api::net::cidr::CidrV4;
@ -333,9 +332,9 @@ async fn handle_command() -> Result<(), Box<dyn std::error::Error>> {
let mut net = client let mut net = client
.get_subnets(nzr_api::default_ctx()) .get_subnets(nzr_api::default_ctx())
.await .await
.simplify() .map_err(|e| e.to_string())
.and_then(|res| { .and_then(|res| {
res.iter() res?.iter()
.find_map(|ent| { .find_map(|ent| {
if ent.name == args.name { if ent.name == args.name {
Some(ent.clone()) Some(ent.clone())
@ -343,7 +342,7 @@ async fn handle_command() -> Result<(), Box<dyn std::error::Error>> {
None None
} }
}) })
.ok_or_else(|| format!("Couldn't find network {}", &args.name).into()) .ok_or_else(|| format!("Couldn't find network {}", &args.name))
})?; })?;
// merge in the new args // merge in the new args
@ -366,11 +365,15 @@ async fn handle_command() -> Result<(), Box<dyn std::error::Error>> {
} }
// run the update // run the update
let net = client client
.modify_subnet(nzr_api::default_ctx(), net) .modify_subnet(nzr_api::default_ctx(), net)
.await .await
.simplify()?; .map_err(|err| format!("RPC error: {}", err))
println!("Subnet {} updated.", net.name); .and_then(|res| {
res.map(|e| {
println!("Subnet {} updated.", e.name);
})
})?;
} }
NetCmd::Dump { name } => { NetCmd::Dump { name } => {
let subnets = (client.get_subnets(nzr_api::default_ctx()).await?)?; let subnets = (client.get_subnets(nzr_api::default_ctx()).await?)?;

View file

@ -17,17 +17,14 @@ uuid = { version = "1.2.2", features = ["serde"] }
hickory-proto = { version = "0.24", features = ["serde-config"] } hickory-proto = { version = "0.24", features = ["serde-config"] }
log = "0.4.17" log = "0.4.17"
diesel = { version = "2.2", optional = true } diesel = { version = "2.2", optional = true }
futures = "0.3" futures = { version = "0.3", optional = true }
thiserror = "1" thiserror = "1"
regex = "1" regex = "1"
lazy_static = "1" lazy_static = "1"
tracing = "0.1"
tokio-serde = { version = "0.9", features = ["bincode"] }
serde_json = "1"
[dev-dependencies] [dev-dependencies]
uuid = { version = "1.2.2", features = ["serde", "v4"] } uuid = { version = "1.2.2", features = ["serde", "v4"] }
[features] [features]
diesel = ["dep:diesel"] diesel = ["dep:diesel"]
mock = [] mock = ["dep:futures"]

View file

@ -72,7 +72,6 @@ impl CloudConfig {
pub struct RPCConfig { pub struct RPCConfig {
pub socket_path: PathBuf, pub socket_path: PathBuf,
pub admin_group: Option<String>, pub admin_group: Option<String>,
pub events_sock: PathBuf,
} }
/// The root configuration struct. /// The root configuration struct.
@ -99,7 +98,6 @@ impl Default for Config {
rpc: RPCConfig { rpc: RPCConfig {
socket_path: PathBuf::from("/var/run/nazrin/nzrd.sock"), socket_path: PathBuf::from("/var/run/nazrin/nzrd.sock"),
admin_group: None, admin_group: None,
events_sock: PathBuf::from("/var/run/nazrin/events.sock"),
}, },
db_uri: "sqlite:/var/lib/nazrin/main_sql.db".to_owned(), db_uri: "sqlite:/var/lib/nazrin/main_sql.db".to_owned(),
libvirt_uri: match std::env::var("LIBVIRT_URI") { libvirt_uri: match std::env::var("LIBVIRT_URI") {

View file

@ -1,208 +0,0 @@
use std::fmt;
use serde::{Deserialize, Serialize};
use tarpc::client::RpcError;
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum ErrorType {
/// Entity was not found.
NotFound,
/// Error occurred with a database call.
Database,
/// Error occurred in a libvirt call.
VirtError,
/// Error occurred while parsing input.
Parse,
/// An unknown API error occurred.
Other,
}
impl ErrorType {
fn as_str(&self) -> &'static str {
match self {
ErrorType::NotFound => "Entity not found",
ErrorType::Database => "Database error",
ErrorType::VirtError => "libvirt error",
ErrorType::Parse => "Unable to parse input",
ErrorType::Other => "Unknown API error",
}
}
}
impl fmt::Display for ErrorType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.as_str().fmt(f)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ApiError {
error_type: ErrorType,
message: Option<String>,
inner: Option<InnerError>,
}
impl fmt::Display for ApiError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.message
.as_deref()
.unwrap_or_else(|| self.error_type.as_str())
.fmt(f)?;
if let Some(inner) = &self.inner {
write!(f, ": {inner}")?;
}
Ok(())
}
}
impl std::error::Error for ApiError {}
impl ApiError {
pub fn new<E>(error_type: ErrorType, message: impl Into<String>, err: E) -> Self
where
E: std::error::Error,
{
Self {
error_type,
message: Some(message.into()),
inner: Some(err.into()),
}
}
pub fn err_type(&self) -> ErrorType {
self.error_type
}
}
impl From<ErrorType> for ApiError {
fn from(value: ErrorType) -> Self {
Self {
error_type: value,
message: None,
inner: None,
}
}
}
impl<T> From<T> for ApiError
where
T: AsRef<str>,
{
fn from(value: T) -> Self {
let value = value.as_ref();
Self {
error_type: ErrorType::Other,
message: Some(value.to_owned()),
inner: None,
}
}
}
#[derive(Serialize, Deserialize)]
struct InnerError {
error_debug: String,
error_message: String,
}
impl fmt::Debug for InnerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.error_debug.fmt(f)
}
}
impl fmt::Display for InnerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.error_message.fmt(f)
}
}
impl<E> From<E> for InnerError
where
E: std::error::Error,
{
fn from(value: E) -> Self {
Self {
error_debug: format!("{value:?}"),
error_message: format!("{value}"),
}
}
}
pub trait ToApiResult<T> {
/// Converts the result's error type to [`ApiError`] with [`ErrorType::Other`].
///
/// [`ApiError`]: nzr-api::error::ApiError
/// [`ErrorType::Other`]: nzr-api::error::ErrorType::Other
fn to_api(self) -> Result<T, ApiError>;
/// Converts the result's error type to [`ApiError`] with
/// [`ErrorType::Other`], and the given context.
///
/// [`ApiError`]: nzr-api::error::ApiError
/// [`ErrorType::Other`]: nzr-api::error::ErrorType::Other
fn to_api_with(self, context: impl AsRef<str>) -> Result<T, ApiError>;
/// Converts the result's error type to [`ApiError`] with the given
/// [`ErrorType`] and context.
///
/// [`ApiError`]: nzr-api::error::ApiError
/// [`ErrorType`]: nzr-api::error::ErrorType
fn to_api_with_type(self, err_type: ErrorType, context: impl AsRef<str>)
-> Result<T, ApiError>;
/// Converts the result's error type to [`ApiError`] with the given
/// [`ErrorType`].
///
/// [`ApiError`]: nzr-api::error::ApiError
/// [`ErrorType`]: nzr-api::error::ErrorType
fn to_api_type(self, err_type: ErrorType) -> Result<T, ApiError>;
}
impl<T, E> ToApiResult<T> for Result<T, E>
where
E: std::error::Error + 'static,
{
fn to_api(self) -> Result<T, ApiError> {
self.map_err(|e| ApiError {
error_type: ErrorType::Other,
message: None,
inner: Some(e.into()),
})
}
fn to_api_with(self, context: impl AsRef<str>) -> Result<T, ApiError> {
self.map_err(|e| ApiError {
error_type: ErrorType::Other,
message: Some(context.as_ref().to_owned()),
inner: Some(e.into()),
})
}
fn to_api_type(self, err_type: ErrorType) -> Result<T, ApiError> {
self.map_err(|e| ApiError {
error_type: err_type,
message: None,
inner: Some(e.into()),
})
}
fn to_api_with_type(
self,
err_type: ErrorType,
context: impl AsRef<str>,
) -> Result<T, ApiError> {
self.map_err(|e| ApiError {
error_type: err_type,
message: Some(context.as_ref().to_owned()),
inner: Some(e.into()),
})
}
}
pub trait Simplify<T> {
fn simplify(self) -> Result<T, ApiError>;
}
impl<T> Simplify<T> for Result<Result<T, ApiError>, RpcError> {
/// Flattens a Result of `RpcError` and `ApiError` to just `ApiError`.
fn simplify(self) -> Result<T, ApiError> {
self.to_api_with("RPC Error").and_then(|r| r)
}
}

View file

@ -1,53 +0,0 @@
use std::{pin::Pin, task::Poll};
use futures::{Stream, TryStreamExt};
use tarpc::tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use tokio::io::AsyncRead;
use super::{EventError, EventMessage};
/// Client for receiving various events emitted by Nazrin.
pub struct EventClient<T>
where
T: AsyncRead,
{
transport: Pin<Box<FramedRead<T, LengthDelimitedCodec>>>,
}
impl<T> EventClient<T>
where
T: AsyncRead,
{
/// Creates a new EventClient.
pub fn new(inner: T) -> Self {
let transport = FramedRead::new(inner, LengthDelimitedCodec::new());
Self {
transport: Box::pin(transport),
}
}
}
impl<T> Stream for EventClient<T>
where
T: AsyncRead,
{
type Item = Result<EventMessage, EventError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.as_mut().transport.try_poll_next_unpin(cx) {
Poll::Ready(res) => {
let our_res = res.map(|res| {
res.map_err(|e| e.into()).and_then(|bytes| {
let msg: EventMessage = serde_json::from_slice(&bytes)?;
Ok(msg)
})
});
Poll::Ready(our_res)
}
Poll::Pending => Poll::Pending,
}
}
}

View file

@ -1,77 +0,0 @@
pub mod client;
pub mod server;
#[cfg(test)]
mod test;
use std::io;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use crate::model;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum ResourceAction {
/// The referenced resource was created.
Created,
/// The referenced resource was deleted, and is no longer available.
Deleted,
/// The referenced resource was modified in some way.
Modified,
}
/// Represents an event pertaining to a specific action.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ResourceEvent<T> {
pub action: ResourceAction,
/// The entity that was acted upon.
pub entity: T,
}
/// Represents any event that is emitted by Nazrin.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "event")]
pub enum EventMessage {
/// A subnet was created, modified, or deleted.
Subnet(ResourceEvent<model::Subnet>),
/// An instance was created, modified, or deleted.
Instance(ResourceEvent<model::Instance>),
}
#[derive(Debug, Error)]
pub enum EventError {
#[error("Transport error: {0}")]
Transport(#[from] io::Error),
#[error("Serialization error: {0}")]
Json(#[from] serde_json::Error),
}
pub trait Emittable {
fn as_event(&self, action: ResourceAction) -> EventMessage;
}
macro_rules! emittable {
($t:ty, $msg:ident) => {
impl Emittable for $t {
fn as_event(&self, action: ResourceAction) -> EventMessage {
EventMessage::$msg(ResourceEvent {
action,
entity: self.clone(),
})
}
}
};
}
emittable!(model::Instance, Instance);
emittable!(model::Subnet, Subnet);
#[macro_export]
macro_rules! nzr_event {
($srv:expr, $act:ident, $ent:tt) => {{
use $crate::event::Emittable;
$srv.emit($ent.as_event($crate::event::ResourceAction::$act))
.await
}};
}

View file

@ -1,191 +0,0 @@
use futures::Future;
use std::{fmt, io, pin::Pin};
use futures::SinkExt;
use tarpc::tokio_util::codec::{FramedWrite, LengthDelimitedCodec};
use tokio::{
io::AsyncWrite,
sync::broadcast::{self, Receiver, Sender},
};
use tracing::instrument;
use super::EventMessage;
/// Representation of multiple types of SocketAddrs, because you can't have just
/// one!
#[derive(Debug)]
pub enum SocketAddr {
#[cfg(unix)]
TokioUnix(tokio::net::unix::SocketAddr),
#[cfg(unix)]
Unix(std::os::unix::net::SocketAddr),
Net(std::net::SocketAddr),
#[cfg(test)]
None,
}
impl fmt::Display for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
#[cfg(unix)]
Self::TokioUnix(addr) => std::fmt::Debug::fmt(addr, f),
#[cfg(unix)]
Self::Unix(addr) => std::fmt::Debug::fmt(addr, f),
Self::Net(addr) => addr.fmt(f),
#[cfg(test)]
Self::None => write!(f, "mock client"),
}
}
}
macro_rules! as_sockaddr {
($id:ident, $t:ty) => {
impl From<$t> for SocketAddr {
fn from(value: $t) -> Self {
Self::$id(value)
}
}
};
}
#[cfg(unix)]
as_sockaddr!(TokioUnix, tokio::net::unix::SocketAddr);
#[cfg(unix)]
as_sockaddr!(Unix, std::os::unix::net::SocketAddr);
as_sockaddr!(Net, std::net::SocketAddr);
/// Represents a connection to a client. Instead of being owned by the server
/// struct, a [`tokio::sync::broadcast::Receiver`] is used to get the serialized
/// message and pass it to the client.
///
/// [`tokio::sync::broadcast::Receiver`]: tokio::sync::broadcast::Receiver
struct EventEmitter<T>
where
T: AsyncWrite + Send + 'static,
{
transport: Pin<Box<FramedWrite<T, LengthDelimitedCodec>>>,
client_addr: SocketAddr,
channel: Receiver<Vec<u8>>,
}
impl<T> EventEmitter<T>
where
T: AsyncWrite + Send + 'static,
{
fn new(inner: T, client_addr: SocketAddr, channel: Receiver<Vec<u8>>) -> Self {
let transport = FramedWrite::new(inner, LengthDelimitedCodec::new());
Self {
transport: Box::pin(transport),
client_addr,
channel,
}
}
#[instrument(skip(self), fields(client = %self.client_addr))]
async fn handler(&mut self) -> bool {
match self.channel.recv().await {
Ok(msg) => {
if let Err(err) = self.transport.send(msg.into()).await {
tracing::error!("Couldn't write to client: {err}");
false
} else {
true
}
}
Err(err) => {
tracing::error!("IPC error: {err}");
false
}
}
}
fn run(mut self) {
tokio::spawn(async move { while self.handler().await {} });
}
}
/// Handles the creation and sending of events to clients.
pub struct EventServer {
channel: Sender<Vec<u8>>,
}
// TODO: consider letting this be configurable
const MAX_RECEIVERS: usize = 16;
impl EventServer {
/// Creates a new EventServer.
pub fn new() -> Self {
let (channel, _) = broadcast::channel(MAX_RECEIVERS);
Self { channel }
}
/// Returns a future that returns [`Poll::Pending`] until the client count falls below the threshold.
pub fn until_available(&self) -> EventServerAvailability<'_> {
EventServerAvailability { parent: self }
}
/// Whether we're able to take connections.
#[inline]
fn is_available(&self) -> bool {
self.channel.receiver_count() < MAX_RECEIVERS
}
/// Spawns a new [`EventEmitter`] where events will be sent to.
pub async fn spawn<T: AsyncWrite + Send + 'static>(
&self,
inner: T,
client_addr: impl Into<SocketAddr>,
) -> io::Result<()> {
// Sender<T> doesn't have a try_subscribe, so this is our last-ditch
// effort to avoid a panic
if !self.is_available() {
return Err(io::Error::new(io::ErrorKind::Other, "Too many connections"));
}
EventEmitter::new(inner, client_addr.into(), self.channel.subscribe()).run();
Ok(())
}
/// Send the given event to all connected clients.
pub async fn emit(&self, msg: EventMessage) {
let bytes = match serde_json::to_vec(&msg) {
Ok(bytes) => bytes,
Err(err) => {
tracing::error!("Failed to serialize: {err}");
return;
}
};
if self.channel.send(bytes).is_err() {
tracing::debug!("Tried to emit an event, but no clients were around to hear it");
}
}
}
impl Default for EventServer {
fn default() -> Self {
Self::new()
}
}
pub struct EventServerAvailability<'a> {
parent: &'a EventServer,
}
impl<'a> Future for EventServerAvailability<'a> {
type Output = ();
fn poll(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
use std::task::Poll;
if self.parent.is_available() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}

View file

@ -1,43 +0,0 @@
use std::str::FromStr;
use futures::StreamExt;
use crate::{
event::{server::SocketAddr, EventMessage, ResourceAction},
net::cidr::CidrV4,
nzr_event,
};
#[tokio::test]
async fn event_serde() {
let (rx, tx) = tokio::io::duplex(1024);
let server = super::server::EventServer::default();
server.spawn(tx, SocketAddr::None).await.unwrap();
let mut client = super::client::EventClient::new(rx);
let net = CidrV4::from_str("192.0.2.0/24").unwrap();
let some_subnet = crate::model::Subnet {
name: "whatever".into(),
data: crate::model::SubnetData {
ifname: "eth0".into(),
network: net,
start_host: net.make_ip(10).unwrap(),
end_host: net.make_ip(254).unwrap(),
gateway4: Some(net.make_ip(1).unwrap()),
dns: Vec::new(),
domain_name: Some("homestarrunnner.net".parse().unwrap()),
vlan_id: None,
},
};
nzr_event!(server, Created, some_subnet);
let next = client
.next()
.await
.expect("client must receive message")
.unwrap();
let EventMessage::Subnet(net) = next else {
panic!("Unexpected event received: {next:?}");
};
assert_eq!(net.action, ResourceAction::Created);
}

View file

@ -1,12 +1,9 @@
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use error::ApiError;
use model::{CreateStatus, Instance, SshPubkey, Subnet}; use model::{CreateStatus, Instance, SshPubkey, Subnet};
pub mod args; pub mod args;
pub mod config; pub mod config;
pub mod error;
pub mod event;
#[cfg(feature = "mock")] #[cfg(feature = "mock")]
pub mod mock; pub mod mock;
pub mod model; pub mod model;
@ -26,40 +23,40 @@ pub enum InstanceQuery {
#[tarpc::service] #[tarpc::service]
pub trait Nazrin { pub trait Nazrin {
/// Creates a new instance. /// Creates a new instance.
async fn new_instance(build_args: args::NewInstance) -> Result<uuid::Uuid, ApiError>; async fn new_instance(build_args: args::NewInstance) -> Result<uuid::Uuid, String>;
/// Poll for the current status of an instance being created. /// Poll for the current status of an instance being created.
async fn poll_new_instance(task_id: uuid::Uuid) -> Option<CreateStatus>; async fn poll_new_instance(task_id: uuid::Uuid) -> Option<CreateStatus>;
/// Deletes an existing instance. /// Deletes an existing instance.
/// ///
/// This should involve deleting all related disks and clearing /// This should involve deleting all related disks and clearing
/// the lease information from the subnet data, if any. /// the lease information from the subnet data, if any.
async fn delete_instance(name: String) -> Result<(), ApiError>; async fn delete_instance(name: String) -> Result<(), String>;
/// Gets a single instance by the given InstanceQuery. /// Gets a single instance by the given InstanceQuery.
async fn find_instance(query: InstanceQuery) -> Result<Option<Instance>, ApiError>; async fn find_instance(query: InstanceQuery) -> Result<Option<Instance>, String>;
/// Gets a list of existing instances. /// Gets a list of existing instances.
async fn get_instances(with_status: bool) -> Result<Vec<Instance>, ApiError>; async fn get_instances(with_status: bool) -> Result<Vec<Instance>, String>;
/// Cleans up unusable entries in the database. /// Cleans up unusable entries in the database.
async fn garbage_collect() -> Result<(), ApiError>; async fn garbage_collect() -> Result<(), String>;
/// Creates a new subnet. /// Creates a new subnet.
/// ///
/// Unlike instances, subnets shouldn't perform any changes to the /// Unlike instances, subnets shouldn't perform any changes to the
/// interfaces they reference. This should be used primarily for /// interfaces they reference. This should be used primarily for
/// ease-of-use and bookkeeping (e.g., assigning dynamic leases). /// ease-of-use and bookkeeping (e.g., assigning dynamic leases).
async fn new_subnet(build_args: Subnet) -> Result<Subnet, ApiError>; async fn new_subnet(build_args: Subnet) -> Result<Subnet, String>;
/// Modifies an existing subnet. /// Modifies an existing subnet.
async fn modify_subnet(edit_args: Subnet) -> Result<Subnet, ApiError>; async fn modify_subnet(edit_args: Subnet) -> Result<Subnet, String>;
/// Gets a list of existing subnets. /// Gets a list of existing subnets.
async fn get_subnets() -> Result<Vec<Subnet>, ApiError>; async fn get_subnets() -> Result<Vec<Subnet>, String>;
/// Deletes an existing subnet. /// Deletes an existing subnet.
async fn delete_subnet(interface: String) -> Result<(), ApiError>; async fn delete_subnet(interface: String) -> Result<(), String>;
/// Gets the cloud-init user-data for the given instance. /// Gets the cloud-init user-data for the given instance.
async fn get_instance_userdata(id: i32) -> Result<Vec<u8>, ApiError>; async fn get_instance_userdata(id: i32) -> Result<Vec<u8>, String>;
/// Gets all SSH keys stored in the database. /// Gets all SSH keys stored in the database.
async fn get_ssh_pubkeys() -> Result<Vec<SshPubkey>, ApiError>; async fn get_ssh_pubkeys() -> Result<Vec<SshPubkey>, String>;
/// Adds a new SSH public key to the database. /// Adds a new SSH public key to the database.
async fn add_ssh_pubkey(pub_key: String) -> Result<SshPubkey, ApiError>; async fn add_ssh_pubkey(pub_key: String) -> Result<SshPubkey, String>;
/// Deletes an SSH public key from the database. /// Deletes an SSH public key from the database.
async fn delete_ssh_pubkey(id: i32) -> Result<(), ApiError>; async fn delete_ssh_pubkey(id: i32) -> Result<(), String>;
} }
/// Create a new NazrinClient. /// Create a new NazrinClient.

View file

@ -1,20 +1,20 @@
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use crate::{args, error::ApiError, model, net::cidr::CidrV4}; use crate::{args, model, net::cidr::CidrV4};
pub trait NzrClientExt { pub trait NzrClientExt {
#[allow(async_fn_in_trait)] #[allow(async_fn_in_trait)]
async fn new_mock_instance( async fn new_mock_instance(
&mut self, &mut self,
name: impl AsRef<str>, name: impl AsRef<str>,
) -> Result<Result<model::Instance, ApiError>, crate::RpcError>; ) -> Result<Result<model::Instance, String>, crate::RpcError>;
} }
impl NzrClientExt for crate::NazrinClient { impl NzrClientExt for crate::NazrinClient {
async fn new_mock_instance( async fn new_mock_instance(
&mut self, &mut self,
name: impl AsRef<str>, name: impl AsRef<str>,
) -> Result<Result<model::Instance, ApiError>, crate::RpcError> { ) -> Result<Result<model::Instance, String>, crate::RpcError> {
let name = name.as_ref().to_owned(); let name = name.as_ref().to_owned();
let subnet = self let subnet = self

View file

@ -10,7 +10,6 @@ use futures::{future, StreamExt};
use tokio::{sync::RwLock, task::JoinHandle}; use tokio::{sync::RwLock, task::JoinHandle};
use crate::{ use crate::{
error::{ApiError, ErrorType},
model, model,
net::{cidr::CidrV4, mac::MacAddr}, net::{cidr::CidrV4, mac::MacAddr},
InstanceQuery, Nazrin, NazrinClient, InstanceQuery, Nazrin, NazrinClient,
@ -63,14 +62,14 @@ impl Nazrin for MockServer {
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
build_args: crate::args::NewInstance, build_args: crate::args::NewInstance,
) -> Result<uuid::Uuid, ApiError> { ) -> Result<uuid::Uuid, String> {
let mut db = self.db.write().await; let mut db = self.db.write().await;
let Some(net_pos) = db let Some(net_pos) = db
.subnets .subnets
.iter() .iter()
.position(|s| s.as_ref().filter(|s| s.name == build_args.subnet).is_some()) .position(|s| s.as_ref().filter(|s| s.name == build_args.subnet).is_some())
else { else {
return Err("Subnet doesn't exist".into()); return Err("Subnet doesn't exist".to_owned());
}; };
let subnet = db.subnets[net_pos].as_ref().unwrap().clone(); let subnet = db.subnets[net_pos].as_ref().unwrap().clone();
let cur_lease = *(db let cur_lease = *(db
@ -135,11 +134,7 @@ impl Nazrin for MockServer {
} }
} }
async fn delete_instance( async fn delete_instance(self, _: tarpc::context::Context, name: String) -> Result<(), String> {
self,
_: tarpc::context::Context,
name: String,
) -> Result<(), ApiError> {
let mut db = self.db.write().await; let mut db = self.db.write().await;
let Some(inst) = db let Some(inst) = db
.instances .instances
@ -147,7 +142,7 @@ impl Nazrin for MockServer {
.find(|i| i.as_ref().filter(|i| i.name == name).is_some()) .find(|i| i.as_ref().filter(|i| i.name == name).is_some())
.take() .take()
else { else {
return Err("Instance doesn't exist".into()); return Err("Instance doesn't exist".to_owned());
}; };
inst.take(); inst.take();
Ok(()) Ok(())
@ -157,7 +152,7 @@ impl Nazrin for MockServer {
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
query: crate::InstanceQuery, query: crate::InstanceQuery,
) -> Result<Option<crate::model::Instance>, ApiError> { ) -> Result<Option<crate::model::Instance>, String> {
let db = self.db.read().await; let db = self.db.read().await;
let res = { let res = {
@ -182,7 +177,7 @@ impl Nazrin for MockServer {
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
id: i32, id: i32,
) -> Result<Vec<u8>, ApiError> { ) -> Result<Vec<u8>, String> {
let db = self.db.read().await; let db = self.db.read().await;
let Some(inst) = db let Some(inst) = db
.instances .instances
@ -190,7 +185,7 @@ impl Nazrin for MockServer {
.find(|i| i.as_ref().map(|i| i.id == id).is_some()) .find(|i| i.as_ref().map(|i| i.id == id).is_some())
.and_then(|o| o.as_ref()) .and_then(|o| o.as_ref())
else { else {
return Err("No such instance".into()); return Err("No such instance".to_owned());
}; };
Ok(db.ci_userdatas.get(&inst.name).cloned().unwrap_or_default()) Ok(db.ci_userdatas.get(&inst.name).cloned().unwrap_or_default())
} }
@ -199,7 +194,7 @@ impl Nazrin for MockServer {
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
_with_status: bool, _with_status: bool,
) -> Result<Vec<crate::model::Instance>, ApiError> { ) -> Result<Vec<crate::model::Instance>, String> {
let db = self.db.read().await; let db = self.db.read().await;
Ok(db Ok(db
.instances .instances
@ -212,7 +207,7 @@ impl Nazrin for MockServer {
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
build_args: crate::model::Subnet, build_args: crate::model::Subnet,
) -> Result<crate::model::Subnet, ApiError> { ) -> Result<crate::model::Subnet, String> {
let mut db = self.db.write().await; let mut db = self.db.write().await;
let subnet = build_args.clone(); let subnet = build_args.clone();
db.subnets.push(Some(build_args)); db.subnets.push(Some(build_args));
@ -223,14 +218,14 @@ impl Nazrin for MockServer {
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
_edit_args: crate::model::Subnet, _edit_args: crate::model::Subnet,
) -> Result<crate::model::Subnet, ApiError> { ) -> Result<crate::model::Subnet, String> {
todo!() todo!()
} }
async fn get_subnets( async fn get_subnets(
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
) -> Result<Vec<crate::model::Subnet>, ApiError> { ) -> Result<Vec<crate::model::Subnet>, String> {
let db = self.db.read().await; let db = self.db.read().await;
Ok(db.subnets.iter().filter_map(|net| net.clone()).collect()) Ok(db.subnets.iter().filter_map(|net| net.clone()).collect())
} }
@ -239,40 +234,35 @@ impl Nazrin for MockServer {
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
interface: String, interface: String,
) -> Result<(), ApiError> { ) -> Result<(), String> {
let mut db = self.db.write().await; let mut db = self.db.write().await;
{ db.instances
.iter()
.filter_map(|inst| inst.as_ref())
.for_each(|inst| {
if inst.lease.subnet == interface {
todo!("what now")
}
});
let Some(subnet) = db let Some(subnet) = db
.subnets .subnets
.iter_mut() .iter_mut()
.find(|net| net.as_ref().filter(|n| n.name == interface).is_some()) .find(|net| net.as_ref().filter(|n| n.name == interface).is_some())
else { else {
return Err(ErrorType::NotFound.into()); return Err("Subnet doesn't exist".to_owned());
}; };
subnet.take(); subnet.take();
}
// Drop all instances that belong to this subnet
db.instances.iter_mut().for_each(|inst| {
if inst
.as_mut()
.filter(|inst| inst.lease.subnet != interface)
.is_some()
{
inst.take();
}
});
Ok(()) Ok(())
} }
async fn garbage_collect(self, _: tarpc::context::Context) -> Result<(), ApiError> { async fn garbage_collect(self, _: tarpc::context::Context) -> Result<(), String> {
// no libvirt to compare against, no instances to GC todo!()
Ok(())
} }
async fn get_ssh_pubkeys( async fn get_ssh_pubkeys(
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
) -> Result<Vec<model::SshPubkey>, ApiError> { ) -> Result<Vec<model::SshPubkey>, String> {
let db = self.db.read().await; let db = self.db.read().await;
Ok(db Ok(db
@ -286,7 +276,7 @@ impl Nazrin for MockServer {
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
pub_key: String, pub_key: String,
) -> Result<model::SshPubkey, ApiError> { ) -> Result<model::SshPubkey, String> {
let mut key_model = model::SshPubkey::from_str(&pub_key).map_err(|e| e.to_string())?; let mut key_model = model::SshPubkey::from_str(&pub_key).map_err(|e| e.to_string())?;
let mut db = self.db.write().await; let mut db = self.db.write().await;
key_model.id = Some(db.ssh_keys.len() as i32); key_model.id = Some(db.ssh_keys.len() as i32);
@ -294,7 +284,7 @@ impl Nazrin for MockServer {
Ok(key_model) Ok(key_model)
} }
async fn delete_ssh_pubkey(self, _: tarpc::context::Context, id: i32) -> Result<(), ApiError> { async fn delete_ssh_pubkey(self, _: tarpc::context::Context, id: i32) -> Result<(), String> {
let mut db = self.db.write().await; let mut db = self.db.write().await;
if let Some(key) = db.ssh_keys.get_mut(id as usize) { if let Some(key) = db.ssh_keys.get_mut(id as usize) {
key.take(); key.take();

View file

@ -5,10 +5,7 @@ use serde::{Deserialize, Serialize};
use std::{fmt, net::Ipv4Addr}; use std::{fmt, net::Ipv4Addr};
use thiserror::Error; use thiserror::Error;
use crate::{ use crate::net::{cidr::CidrV4, mac::MacAddr};
error::ApiError,
net::{cidr::CidrV4, mac::MacAddr},
};
#[derive(Copy, Clone, Debug, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, Serialize, Deserialize)]
#[repr(u32)] #[repr(u32)]
@ -70,7 +67,7 @@ impl fmt::Display for DomainState {
pub struct CreateStatus { pub struct CreateStatus {
pub status_text: String, pub status_text: String,
pub completion: f32, pub completion: f32,
pub result: Option<Result<Instance, ApiError>>, pub result: Option<Result<Instance, String>>,
} }
/// Struct representing a VM instance. /// Struct representing a VM instance.

View file

@ -26,8 +26,9 @@ tarpc = { version = "0.34", features = [
] } ] }
# Logging # Logging
tracing = "0.1" # TODO: switch to tracing?
tracing-subscriber = "0.3" log = "0.4.17"
syslog = "7"
# Database # Database
diesel = { version = "2.2", features = [ diesel = { version = "2.2", features = [

View file

@ -1 +1,23 @@
pub mod net;
pub mod vm; pub mod vm;
use std::fmt;
#[derive(Debug)]
pub struct CommandError(String);
impl fmt::Display for CommandError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::error::Error for CommandError {}
macro_rules! cmd_error {
($($arg:tt)*) => {
Box::new(CommandError(format!($($arg)*)))
};
}
pub(crate) use cmd_error;

47
nzrd/src/cmd/net.rs Normal file
View file

@ -0,0 +1,47 @@
use super::*;
use crate::ctx::Context;
use crate::model::tx::Transaction;
use crate::model::Subnet;
use nzr_api::model;
pub async fn add_subnet(
ctx: &Context,
args: model::Subnet,
) -> Result<Subnet, Box<dyn std::error::Error>> {
let subnet = {
let s = Subnet::insert(ctx, args.name, args.data)
.await
.map_err(|er| cmd_error!("Couldn't generate subnet: {}", er))?;
Transaction::begin(ctx, s)
};
if let Err(err) = ctx.zones.new_zone(&subnet).await {
Err(cmd_error!("Failed to create new DNS zone: {}", err))
} else {
Ok(subnet.take())
}
}
pub async fn delete_subnet(
ctx: &Context,
name: impl AsRef<str>,
) -> Result<(), Box<dyn std::error::Error>> {
match Subnet::get_by_name(ctx, name.as_ref())
.await
.map_err(|er| cmd_error!("Couldn't find subnet: {}", er))?
{
Some(subnet) => {
if let Some(domain_name) = &subnet.domain_name {
ctx.zones.delete_zone(domain_name).await;
}
subnet
.delete(ctx)
.await
.map_err(|er| cmd_error!("Couldn't fully delete subnet entry: {}", er))
}
None => Err(cmd_error!("Subnet not found")),
}?;
Ok(())
}

View file

@ -1,4 +1,3 @@
use nzr_api::error::{ApiError, ErrorType, ToApiResult};
use nzr_api::net::cidr::CidrV4; use nzr_api::net::cidr::CidrV4;
use nzr_virt::error::DomainError; use nzr_virt::error::DomainError;
use nzr_virt::xml::build::DomainBuilder; use nzr_virt::xml::build::DomainBuilder;
@ -6,14 +5,14 @@ use nzr_virt::xml::{self, InfoMap, SerialType, Sysinfo};
use nzr_virt::{datasize, dom, vol}; use nzr_virt::{datasize, dom, vol};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use super::*;
use crate::ctrl::vm::Progress; use crate::ctrl::vm::Progress;
use crate::ctx::Context; use crate::ctx::Context;
use crate::model::tx::Transaction;
use crate::model::{Instance, Subnet}; use crate::model::{Instance, Subnet};
use log::{debug, info, warn};
use nzr_api::args;
use nzr_api::net::mac::MacAddr; use nzr_api::net::mac::MacAddr;
use nzr_api::{args, model, nzr_event};
use std::sync::Arc; use std::sync::Arc;
use tracing::{debug, info, warn};
const VIRT_MAC_OUI: &[u8] = &[0x02, 0xf1, 0x0f]; const VIRT_MAC_OUI: &[u8] = &[0x02, 0xf1, 0x0f];
@ -30,21 +29,23 @@ pub async fn new_instance(
ctx: Context, ctx: Context,
prog_task: Arc<RwLock<Progress>>, prog_task: Arc<RwLock<Progress>>,
args: &args::NewInstance, args: &args::NewInstance,
) -> Result<(Instance, dom::Domain), ApiError> { ) -> Result<(Instance, dom::Domain), Box<dyn std::error::Error>> {
progress!(prog_task, 0.0, "Starting..."); progress!(prog_task, 0.0, "Starting...");
// find the subnet corresponding to the interface // find the subnet corresponding to the interface
let subnet = Subnet::get_by_name(&ctx, &args.subnet) let subnet = Subnet::get_by_name(&ctx, &args.subnet)
.await .await
.to_api_with("Unable to get interface")? .map_err(|er| cmd_error!("Unable to get interface: {}", er))?
.ok_or::<ApiError>(format!("Subnet {} wasn't found in database", &args.subnet).into())?; .ok_or(cmd_error!(
"Subnet {} wasn't found in database",
&args.subnet
))?;
// bail if a domain already exists // bail if a domain already exists
if let Ok(dom) = ctx.virt.conn.get_instance(&args.name).await { if let Ok(dom) = ctx.virt.conn.get_instance(&args.name).await {
Err(format!( Err(cmd_error!(
"Domain with name already exists (uuid {})", "Domain with name already exists (uuid {})",
dom.xml().await.uuid, dom.xml().await.uuid,
) ))
.into())
} else { } else {
// make sure the base image exists // make sure the base image exists
let mut base_image = ctx let mut base_image = ctx
@ -53,7 +54,7 @@ pub async fn new_instance(
.baseimg .baseimg
.volume(&args.base_image) .volume(&args.base_image)
.await .await
.to_api_with("Couldn't find base image")?; .map_err(|er| cmd_error!("Couldn't find base image: {}", er))?;
progress!(prog_task, 10.0, "Generating metadata..."); progress!(prog_task, 10.0, "Generating metadata...");
// generate a new lease with a new MAC addr // generate a new lease with a new MAC addr
@ -61,23 +62,19 @@ pub async fn new_instance(
let bytes = [VIRT_MAC_OUI, rand::random::<[u8; 3]>().as_ref()].concat(); let bytes = [VIRT_MAC_OUI, rand::random::<[u8; 3]>().as_ref()].concat();
MacAddr::from_bytes(bytes) MacAddr::from_bytes(bytes)
} }
.to_api_with("Unable to create a new MAC address")?; .map_err(|er| cmd_error!("Unable to create a new MAC address: {}", er))?;
// Get highest host addr + 1 for our new addr // Get highest host addr + 1 for our new addr
let addr = { let addr = {
let addr_num = Instance::all_in_subnet(&ctx, &subnet) let addr_num = Instance::all_in_subnet(&ctx, &subnet)
.await .await?
.to_api_with("Couldn't get instances in subnet")?
.into_iter() .into_iter()
.max_by(|a, b| a.host_num.cmp(&b.host_num)) .max_by(|a, b| a.host_num.cmp(&b.host_num))
.map_or(subnet.start_host, |i| i.host_num + 1); .map_or(subnet.start_host, |i| i.host_num + 1);
if addr_num > subnet.end_host || addr_num < subnet.start_host { if addr_num > subnet.end_host || addr_num < subnet.start_host {
return Err("Got invalid lease address for instance".into()); Err(cmd_error!("Got invalid lease address for instance"))?;
} }
let addr = subnet let addr = subnet.network.make_ip(addr_num as u32)?;
.network
.make_ip(addr_num as u32)
.to_api_with("Unable to generate instance IP")?;
CidrV4::new(addr, subnet.network.cidr()) CidrV4::new(addr, subnet.network.cidr())
}; };
@ -88,12 +85,7 @@ pub async fn new_instance(
}; };
// generate cloud-init data // generate cloud-init data
let db_inst = { let db_inst = Instance::insert(&ctx, &args.name, &subnet, lease.clone(), None).await?;
let inst = Instance::insert(&ctx, &args.name, &subnet, lease.clone(), None)
.await
.to_api_type(ErrorType::Database)?;
Transaction::begin(&ctx, inst)
};
progress!(prog_task, 30.0, "Creating instance images..."); progress!(prog_task, 30.0, "Creating instance images...");
// create primary volume from base image // create primary volume from base image
@ -104,7 +96,7 @@ pub async fn new_instance(
datasize!((args.disk_sizes.0) GiB), datasize!((args.disk_sizes.0) GiB),
) )
.await .await
.to_api_with("Failed to clone base image")?; .map_err(|er| cmd_error!("Failed to clone base image: {}", er))?;
// and, if it exists: the second volume // and, if it exists: the second volume
let sec_vol = match args.disk_sizes.1 { let sec_vol = match args.disk_sizes.1 {
@ -112,11 +104,7 @@ pub async fn new_instance(
let voldata = let voldata =
// TODO: Fix VolType // TODO: Fix VolType
xml::Volume::new(&args.name, xml::VolType::Qcow2, datasize!(sec_size GiB)); xml::Volume::new(&args.name, xml::VolType::Qcow2, datasize!(sec_size GiB));
Some( Some(vol::Volume::create(&ctx.virt.pools.secondary, voldata, 0).await?)
vol::Volume::create(&ctx.virt.pools.secondary, voldata, 0)
.await
.to_api_with("Couldn't create secondary volume")?,
)
} }
None => None, None => None,
}; };
@ -180,20 +168,15 @@ pub async fn new_instance(
.build() .build()
}; };
let mut virt_dom = ctx let mut virt_dom = ctx.virt.conn.define_instance(dom_xml).await?;
.virt
.conn
.define_instance(dom_xml)
.await
.to_api_with("Couldn't define libvirt instance")?;
// not a fatal error, we can set autostart afterward // not a fatal error, we can set autostart afterward
if let Err(err) = virt_dom.autostart(true).await { if let Err(er) = virt_dom.autostart(true).await {
warn!("Couldn't set autostart for domain: {err}"); warn!("Couldn't set autostart for domain: {}", er);
} }
if let Err(err) = virt_dom.start().await { if let Err(er) = virt_dom.start().await {
warn!("Domain defined, but couldn't be started! Error: {err}"); warn!("Domain defined, but couldn't be started! Error: {}", er);
} }
// set all volumes to persistent to avoid deletion // set all volumes to persistent to avoid deletion
@ -205,76 +188,40 @@ pub async fn new_instance(
progress!(prog_task, 80.0, "Domain created!"); progress!(prog_task, 80.0, "Domain created!");
debug!("Domain {} created!", virt_dom.xml().await.name.as_str()); debug!("Domain {} created!", virt_dom.xml().await.name.as_str());
Ok((db_inst.take(), virt_dom)) Ok((db_inst, virt_dom))
} }
} }
pub async fn delete_instance( pub async fn delete_instance(ctx: Context, name: String) -> Result<(), Box<dyn std::error::Error>> {
ctx: Context, let Some(inst_db) = Instance::get_by_name(&ctx, &name).await? else {
name: String, return Err(cmd_error!("Instance {name} not found"));
) -> Result<Option<model::Instance>, ApiError> {
let Some(inst_db) = Instance::get_by_name(&ctx, &name)
.await
.to_api_with_type(ErrorType::Database, "Couldn't find instance")?
else {
return Err(ErrorType::NotFound.into());
};
let api_model = match inst_db.api_model(&ctx).await {
Ok(model) => Some(model),
Err(err) => {
warn!("Couldn't get API model to notify clients: {err}");
None
}
}; };
// First, destroy the instance // First, destroy the instance
match ctx.virt.conn.get_instance(name.clone()).await { match ctx.virt.conn.get_instance(name.clone()).await {
Ok(mut inst) => { Ok(mut inst) => {
inst.stop().await.to_api_with("Couldn't stop instance")?; inst.stop().await?;
inst.undefine(true) inst.undefine(true).await?;
.await
.to_api_with("Couldn't undefine instance")?;
} }
Err(DomainError::DomainNotFound) => { Err(DomainError::DomainNotFound) => {
warn!("Deleting instance that exists in DB but not libvirt"); warn!("Deleting instance that exists in DB but not libvirt");
} }
Err(err) => Err(ApiError::new( Err(err) => Err(err)?,
nzr_api::error::ErrorType::VirtError,
"Couldn't get instance from libvirt",
err,
))?,
} }
// Then, delete the DB entity // Then, delete the DB entity
inst_db inst_db.delete(&ctx).await?;
.delete(&ctx)
.await
.to_api_with("Couldn't delete from database")?;
Ok(api_model) Ok(())
} }
/// Delete all instances that don't have a matching libvirt domain
pub async fn prune_instances(ctx: &Context) -> Result<(), Box<dyn std::error::Error>> { pub async fn prune_instances(ctx: &Context) -> Result<(), Box<dyn std::error::Error>> {
for entity in Instance::all(ctx).await? { for entity in Instance::all(ctx).await? {
if let Err(DomainError::DomainNotFound) = ctx.virt.conn.get_instance(&entity.name).await { if let Err(err) = ctx.virt.conn.get_instance(&entity.name).await {
if err == DomainError::DomainNotFound {
info!("Invalid domain {}, deleting", &entity.name); info!("Invalid domain {}, deleting", &entity.name);
// First, get the API model to notify clients with
let api_model = match entity.api_model(ctx).await {
Ok(ent) => Some(ent),
Err(err) => {
warn!("Couldn't get api model to notify clients: {err}");
None
}
};
// then, delete by name
let name = entity.name.clone(); let name = entity.name.clone();
if let Err(err) = entity.delete(ctx).await { if let Err(err) = entity.delete(ctx).await {
warn!("Couldn't delete {}: {}", name, err); warn!("Couldn't delete {}: {}", name, err);
} }
// and assuming all goes well, notify clients
if let Some(ent) = api_model {
nzr_event!(ctx.events, Deleted, ent);
} }
} }
} }

View file

@ -3,11 +3,13 @@ use diesel::{
SqliteConnection, SqliteConnection,
}; };
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use log::trace;
use nzr_virt::{vol, Connection}; use nzr_virt::{vol, Connection};
use std::ops::Deref; use std::ops::Deref;
use thiserror::Error; use thiserror::Error;
use nzr_api::{config::Config, event::server::EventServer}; use crate::dns::ZoneData;
use nzr_api::config::Config;
use std::sync::Arc; use std::sync::Arc;
#[cfg(test)] #[cfg(test)]
@ -40,8 +42,8 @@ impl Deref for Context {
pub struct InnerCtx { pub struct InnerCtx {
pub sqldb: diesel::r2d2::Pool<ConnectionManager<SqliteConnection>>, pub sqldb: diesel::r2d2::Pool<ConnectionManager<SqliteConnection>>,
pub config: Config, pub config: Config,
pub zones: crate::dns::ZoneData,
pub virt: VirtCtx, pub virt: VirtCtx,
pub events: Arc<EventServer>,
} }
#[derive(Debug, Error)] #[derive(Debug, Error)]
@ -58,6 +60,7 @@ pub enum ContextError {
impl InnerCtx { impl InnerCtx {
async fn new(config: Config) -> Result<Self, ContextError> { async fn new(config: Config) -> Result<Self, ContextError> {
let zones = ZoneData::new(&config.dns);
let conn = Connection::open(&config.libvirt_uri)?; let conn = Connection::open(&config.libvirt_uri)?;
let pools = PoolRefs { let pools = PoolRefs {
@ -66,7 +69,7 @@ impl InnerCtx {
baseimg: conn.get_pool(&config.storage.base_image_pool).await?, baseimg: conn.get_pool(&config.storage.base_image_pool).await?,
}; };
tracing::trace!("Connecting to database"); trace!("Connecting to database");
let db_uri = config.db_uri.clone(); let db_uri = config.db_uri.clone();
let sqldb = tokio::task::spawn_blocking(|| { let sqldb = tokio::task::spawn_blocking(|| {
let manager = ConnectionManager::<SqliteConnection>::new(db_uri); let manager = ConnectionManager::<SqliteConnection>::new(db_uri);
@ -77,7 +80,7 @@ impl InnerCtx {
.unwrap()?; .unwrap()?;
{ {
tracing::trace!("Running pending migrations"); trace!("Running pending migrations");
let mut conn = sqldb.get()?; let mut conn = sqldb.get()?;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
conn.run_pending_migrations(MIGRATIONS) conn.run_pending_migrations(MIGRATIONS)
@ -87,13 +90,11 @@ impl InnerCtx {
.unwrap()?; .unwrap()?;
} }
let events = Arc::new(EventServer::new());
Ok(Self { Ok(Self {
sqldb, sqldb,
config, config,
zones,
virt: VirtCtx { conn, pools }, virt: VirtCtx { conn, pools },
events,
}) })
} }
} }

View file

@ -1,13 +1,14 @@
use crate::model::Subnet;
use log::*;
use nzr_api::config::DNSConfig; use nzr_api::config::DNSConfig;
use std::borrow::Borrow; use std::borrow::Borrow;
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::net::Ipv4Addr;
use std::ops::Deref; use std::ops::Deref;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{Mutex, RwLock}; use tokio::sync::{Mutex, RwLock};
use nzr_api::model::{Instance, SubnetData};
use hickory_proto::rr::Name; use hickory_proto::rr::Name;
use hickory_server::authority::{AuthorityObject, Catalog}; use hickory_server::authority::{AuthorityObject, Catalog};
use hickory_server::proto::rr::{rdata::soa, RData, RecordSet}; use hickory_server::proto::rr::{rdata::soa, RData, RecordSet};
@ -69,7 +70,7 @@ pub struct InnerZD {
} }
pub fn make_rectree_with_soa(name: &Name, config: &DNSConfig) -> BTreeMap<RrKey, RecordSet> { pub fn make_rectree_with_soa(name: &Name, config: &DNSConfig) -> BTreeMap<RrKey, RecordSet> {
tracing::debug!("Creating initial SOA for {}", &name); debug!("Creating initial SOA for {}", &name);
let mut records: BTreeMap<RrKey, RecordSet> = BTreeMap::new(); let mut records: BTreeMap<RrKey, RecordSet> = BTreeMap::new();
let soa_key = RrKey::new( let soa_key = RrKey::new(
LowerName::from(name), LowerName::from(name),
@ -118,28 +119,24 @@ impl InnerZD {
} }
/// Creates a new DNS zone for the given subnet. /// Creates a new DNS zone for the given subnet.
pub async fn new_zone( pub async fn new_zone(&self, subnet: &Subnet) -> Result<(), Box<dyn std::error::Error>> {
&self,
zone_id: impl AsRef<str>,
subnet: &SubnetData,
) -> Result<(), Box<dyn std::error::Error>> {
if let Some(name) = &subnet.domain_name { if let Some(name) = &subnet.domain_name {
let rectree = make_rectree_with_soa(name, &self.config); let name: Name = name.parse()?;
let rectree = make_rectree_with_soa(&name, &self.config);
let auth = InMemoryAuthority::new( let auth = InMemoryAuthority::new(
name.clone(), name,
rectree, rectree,
hickory_server::authority::ZoneType::Primary, hickory_server::authority::ZoneType::Primary,
false, false,
)?; )?;
self.import(zone_id.as_ref(), auth).await; self.import(&subnet.ifname.to_string(), auth).await;
} }
Ok(()) Ok(())
} }
/// Generates a zone with the given records. pub async fn import(&self, name: &str, auth: InMemoryAuthority) {
async fn import(&self, name: &str, auth: InMemoryAuthority) {
let auth_arc = Arc::new(auth); let auth_arc = Arc::new(auth);
tracing::debug!( log::debug!(
"Importing {} with {} records...", "Importing {} with {} records...",
name, name,
auth_arc.records().await.len() auth_arc.records().await.len()
@ -162,23 +159,26 @@ impl InnerZD {
} }
/// Adds a new host record in the DNS zone. /// Adds a new host record in the DNS zone.
pub async fn new_record(&self, inst: &Instance) -> Result<(), Box<dyn std::error::Error>> { pub async fn new_record(
let hostname = Name::from_str(&inst.name)?; &self,
interface: &str,
name: &str,
addr: Ipv4Addr,
) -> Result<(), Box<dyn std::error::Error>> {
let hostname = Name::from_str(name)?;
let zones = self.map.lock().await; let zones = self.map.lock().await;
let zone = zones.get(&inst.lease.subnet).unwrap_or(&self.default_zone); let zone = zones.get(interface).unwrap_or(&self.default_zone);
let fqdn = { let fqdn = {
let origin: Name = zone.origin().into(); let origin: Name = zone.origin().into();
hostname.append_domain(&origin)? hostname.append_domain(&origin)?
}; };
tracing::debug!( log::debug!(
"Creating new host entry {} in zone {}...", "Creating new host entry {} in zone {}...",
&fqdn, &fqdn,
zone.origin() zone.origin()
); );
let addr = inst.lease.addr.addr;
let record = Record::from_rdata(fqdn, 3600, RData::A(addr.into())); let record = Record::from_rdata(fqdn, 3600, RData::A(addr.into()));
zone.upsert(record, 0).await; zone.upsert(record, 0).await;
self.catalog() self.catalog()
@ -189,10 +189,14 @@ impl InnerZD {
Ok(()) Ok(())
} }
pub async fn delete_record(&self, inst: &Instance) -> Result<bool, Box<dyn std::error::Error>> { pub async fn delete_record(
let hostname = Name::from_str(&inst.name)?; &self,
interface: &str,
name: &str,
) -> Result<bool, Box<dyn std::error::Error>> {
let hostname = Name::from_str(name)?;
let mut zones = self.map.lock().await; let mut zones = self.map.lock().await;
if let Some(zone) = zones.get_mut(&inst.lease.subnet) { if let Some(zone) = zones.get_mut(interface) {
let hostname: LowerName = hostname.into(); let hostname: LowerName = hostname.into();
self.catalog.0.write().await.remove(&hostname); self.catalog.0.write().await.remove(&hostname);
let key = RrKey::new(hostname, hickory_server::proto::rr::RecordType::A); let key = RrKey::new(hostname, hickory_server::proto::rr::RecordType::A);

View file

@ -1,16 +0,0 @@
use std::io;
use tokio::net::UnixListener;
use crate::ctx::Context;
pub async fn event_server(ctx: Context) -> io::Result<()> {
let sock = UnixListener::bind(&ctx.config.rpc.events_sock)?;
loop {
// Wait until we have an available slot for a connection
ctx.events.until_available().await;
let (client, addr) = sock.accept().await?;
ctx.events.spawn(client, addr).await?;
}
}

View file

@ -1,41 +1,79 @@
mod cmd; mod cmd;
mod ctrl; mod ctrl;
mod ctx; mod ctx;
mod event; mod dns;
mod model; mod model;
mod rpc; mod rpc;
use std::str::FromStr; use hickory_server::ServerFuture;
use log::LevelFilter;
use log::*;
use model::{Instance, Subnet};
use nzr_api::config; use nzr_api::config;
use std::{net::IpAddr, str::FromStr};
use tokio::net::UdpSocket;
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
let cfg: config::Config = config::Config::figment().extract()?; let cfg: config::Config = config::Config::figment().extract()?;
let ctx = ctx::Context::new(cfg).await?; let ctx = ctx::Context::new(cfg).await?;
let mut bad_loglevel = false; syslog::init_unix(
let log_level = tracing::Level::from_str(&ctx.config.log_level).unwrap_or_else(|_| { syslog::Facility::LOG_DAEMON,
bad_loglevel = true; LevelFilter::from_str(ctx.config.log_level.as_str())?,
tracing::Level::WARN )?;
});
tracing_subscriber::fmt().with_max_level(log_level).init(); info!("Hydrating initial zones...");
for subnet in Subnet::all(&ctx).await? {
// A records
if let Err(err) = ctx.zones.new_zone(&subnet).await {
error!("Couldn't create zone for {}: {}", &subnet.ifname, err);
continue;
}
match Instance::all_in_subnet(&ctx, &subnet).await {
Ok(leases) => {
for lease in leases {
let Ok(lease_addr) = subnet.network.make_ip(lease.host_num as u32) else {
warn!("Ignoring {} due to lease address issue", &lease.name);
continue;
};
if bad_loglevel { if let Err(err) = ctx
tracing::warn!("Couldn't parse log level from config, defaulting to {log_level}"); .zones
.new_record(&subnet.ifname.to_string(), &lease.name, lease_addr)
.await
{
error!(
"Failed to set up lease for {} in {}: {}",
&lease.name, &subnet.ifname, err
);
}
}
}
Err(err) => {
error!("Couldn't get leases for {}: {}", &subnet.ifname, err);
continue;
}
}
} }
// Run both the RPC and events servers // DNS init
let mut dns_listener = ServerFuture::new(ctx.zones.catalog());
let dns_socket = {
let dns_ip: IpAddr = ctx.config.dns.listen_addr.parse()?;
UdpSocket::bind((dns_ip, ctx.config.dns.port)).await?
};
dns_listener.register_socket(dns_socket);
tokio::select! { tokio::select! {
res = rpc::serve(ctx.clone()) => { res = rpc::serve(ctx.clone()) => {
if let Err(err) = res { if let Err(err) = res {
tracing::error!("RPC server error: {err}"); error!("Error from RPC: {}", err);
} }
}, },
res = event::event_server(ctx.clone()) => { res = dns_listener.block_until_done() => {
if let Err(err) = res { if let Err(err) = res {
tracing::error!("Event server error: {err}"); error!("Error from DNS: {}", err);
} }
} }
} }

View file

@ -20,14 +20,12 @@ use tx::Transactable;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum ModelError { pub enum ModelError {
#[error("{0}")] #[error("Database error occurred: {0}")]
Db(#[from] diesel::result::Error), Db(#[from] diesel::result::Error),
#[error("Database pool error ({0})")] #[error("Unable to get database handle: {0}")]
Pool(#[from] diesel::r2d2::PoolError), Pool(#[from] diesel::r2d2::PoolError),
#[error("{0}")] #[error("{0}")]
Cidr(#[from] cidr::Error), Cidr(#[from] cidr::Error),
#[error("Instance belongs to a subnet that has since disappeared")]
NoSubnet,
} }
diesel::table! { diesel::table! {
@ -249,7 +247,10 @@ impl Instance {
/// Creates an [nzr_api::model::Instance] from the information available in /// Creates an [nzr_api::model::Instance] from the information available in
/// the database. /// the database.
pub async fn api_model(&self, ctx: &Context) -> Result<nzr_api::model::Instance, ModelError> { pub async fn api_model(
&self,
ctx: &Context,
) -> Result<nzr_api::model::Instance, Box<dyn std::error::Error>> {
let netid = self.subnet_id; let netid = self.subnet_id;
let Some(subnet) = ctx let Some(subnet) = ctx
.spawn_db(move |mut db| Subnet::table().find(netid).load::<Subnet>(&mut db)) .spawn_db(move |mut db| Subnet::table().find(netid).load::<Subnet>(&mut db))
@ -257,7 +258,7 @@ impl Instance {
.into_iter() .into_iter()
.next() .next()
else { else {
return Err(ModelError::NoSubnet); todo!("something went horribly wrong");
}; };
Ok(nzr_api::model::Instance { Ok(nzr_api::model::Instance {
@ -403,7 +404,7 @@ impl Subnet {
// the API. // the API.
Ok(addr) => Some(addr), Ok(addr) => Some(addr),
Err(err) => { Err(err) => {
tracing::error!( log::error!(
"Error parsing DNS server '{}' for {}: {}", "Error parsing DNS server '{}' for {}: {}",
s, s,
&self.name, &self.name,
@ -415,7 +416,7 @@ impl Subnet {
.collect(), .collect(),
domain_name: self.domain_name.as_ref().map(|s| { domain_name: self.domain_name.as_ref().map(|s| {
Name::from_str(s).unwrap_or_else(|e| { Name::from_str(s).unwrap_or_else(|e| {
tracing::error!("Error parsing DNS name for {}: {}", &self.name, e); log::error!("Error parsing DNS name for {}: {}", &self.name, e);
Name::default() Name::default()
}) })
}), }),

View file

@ -48,7 +48,7 @@ impl<'a, T: Transactable> Drop for Transaction<'a, T> {
let ctx = self.ctx.clone(); let ctx = self.ctx.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(err) = inner.undo_tx(&ctx).await { if let Err(err) = inner.undo_tx(&ctx).await {
tracing::error!("Error undoing transaction: {err}"); log::error!("Error undoing transaction: {err}");
} }
}); });
} }

View file

@ -1,6 +1,5 @@
use futures::{future, StreamExt}; use futures::{future, StreamExt};
use nzr_api::error::{ApiError, ErrorType, ToApiResult}; use nzr_api::{args, model, InstanceQuery, Nazrin};
use nzr_api::{args, model, nzr_event, InstanceQuery, Nazrin};
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use tarpc::server::{BaseChannel, Channel}; use tarpc::server::{BaseChannel, Channel};
@ -14,8 +13,8 @@ use uuid::Uuid;
use crate::cmd; use crate::cmd;
use crate::ctx::Context; use crate::ctx::Context;
use crate::model::{Instance, SshPubkey, Subnet}; use crate::model::{Instance, SshPubkey, Subnet};
use log::*;
use std::collections::HashMap; use std::collections::HashMap;
use tracing::*;
#[derive(Clone)] #[derive(Clone)]
pub struct NzrServer { pub struct NzrServer {
@ -37,7 +36,7 @@ impl Nazrin for NzrServer {
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
build_args: args::NewInstance, build_args: args::NewInstance,
) -> Result<uuid::Uuid, ApiError> { ) -> Result<uuid::Uuid, String> {
let progress = Arc::new(RwLock::new(crate::ctrl::vm::Progress { let progress = Arc::new(RwLock::new(crate::ctrl::vm::Progress {
status_text: "Starting...".to_owned(), status_text: "Starting...".to_owned(),
percentage: 0.0, percentage: 0.0,
@ -45,11 +44,13 @@ impl Nazrin for NzrServer {
let prog_task = progress.clone(); let prog_task = progress.clone();
let build_task = tokio::spawn(async move { let build_task = tokio::spawn(async move {
let (inst, dom) = let (inst, dom) =
cmd::vm::new_instance(self.ctx.clone(), prog_task.clone(), &build_args).await?; cmd::vm::new_instance(self.ctx.clone(), prog_task.clone(), &build_args)
.await
.map_err(|e| format!("Instance creation failed: {}", e))?;
let mut api_model = inst let mut api_model = inst
.api_model(&self.ctx) .api_model(&self.ctx)
.await .await
.to_api_with("Couldn't generate API response")?; .map_err(|e| format!("Couldn't generate API response: {e}"))?;
match dom.state().await { match dom.state().await {
Ok(state) => { Ok(state) => {
api_model.state = state.into(); api_model.state = state.into();
@ -58,9 +59,6 @@ impl Nazrin for NzrServer {
warn!("Unable to get instance state: {err}"); warn!("Unable to get instance state: {err}");
} }
} }
// Inform event listeners
nzr_event!(self.ctx.events, Created, api_model);
Ok(api_model) Ok(api_model)
}); });
@ -96,7 +94,7 @@ impl Nazrin for NzrServer {
Some( Some(
task.inner task.inner
.await .await
.to_api_with("Task failed with panic") .map_err(|err| format!("Task failed with panic: {}", err))
.and_then(|res| res), .and_then(|res| res),
) )
} else { } else {
@ -110,16 +108,10 @@ impl Nazrin for NzrServer {
}) })
} }
async fn delete_instance( async fn delete_instance(self, _: tarpc::context::Context, name: String) -> Result<(), String> {
self, cmd::vm::delete_instance(self.ctx.clone(), name)
_: tarpc::context::Context, .await
name: String, .map_err(|e| format!("Couldn't delete instance: {}", e))?;
) -> Result<(), ApiError> {
let api_model = cmd::vm::delete_instance(self.ctx.clone(), name).await?;
if let Some(api_model) = api_model {
nzr_event!(self.ctx.events, Deleted, api_model);
}
Ok(()) Ok(())
} }
@ -127,16 +119,18 @@ impl Nazrin for NzrServer {
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
query: nzr_api::InstanceQuery, query: nzr_api::InstanceQuery,
) -> Result<Option<model::Instance>, ApiError> { ) -> Result<Option<model::Instance>, String> {
let res = match query { let res = match query {
InstanceQuery::Name(name) => Instance::get_by_name(&self.ctx, name).await, InstanceQuery::Name(name) => Instance::get_by_name(&self.ctx, name).await,
InstanceQuery::MacAddr(addr) => Instance::get_by_mac(&self.ctx, addr).await, InstanceQuery::MacAddr(addr) => Instance::get_by_mac(&self.ctx, addr).await,
InstanceQuery::Ipv4Addr(addr) => Instance::get_by_ip4(&self.ctx, addr).await, InstanceQuery::Ipv4Addr(addr) => Instance::get_by_ip4(&self.ctx, addr).await,
} }
.to_api()?; .map_err(|e| e.to_string())?;
if let Some(inst) = res { if let Some(inst) = res {
inst.api_model(&self.ctx).await.to_api().map(Some) inst.api_model(&self.ctx)
.await
.map_or_else(|e| Err(e.to_string()), |m| Ok(Some(m)))
} else { } else {
Ok(None) Ok(None)
} }
@ -146,10 +140,10 @@ impl Nazrin for NzrServer {
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
with_status: bool, with_status: bool,
) -> Result<Vec<model::Instance>, ApiError> { ) -> Result<Vec<model::Instance>, String> {
let db_models = Instance::all(&self.ctx) let db_models = Instance::all(&self.ctx)
.await .await
.to_api_type(ErrorType::Database)?; .map_err(|e| format!("Unable to get all instances: {e}"))?;
let mut models = Vec::new(); let mut models = Vec::new();
for inst in db_models { for inst in db_models {
let mut api_model = match inst.api_model(&self.ctx).await { let mut api_model = match inst.api_model(&self.ctx).await {
@ -187,39 +181,34 @@ impl Nazrin for NzrServer {
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
build_args: model::Subnet, build_args: model::Subnet,
) -> Result<model::Subnet, ApiError> { ) -> Result<model::Subnet, String> {
let subnet = Subnet::insert(&self.ctx, build_args.name, build_args.data) cmd::net::add_subnet(&self.ctx, build_args)
.await .await
.to_api_type(ErrorType::Database)? .map_err(|e| e.to_string())?
.api_model() .api_model()
.to_api_with("Unable to generate API model")?; .map_err(|e| e.to_string())
// inform event listeners
nzr_event!(self.ctx.events, Created, subnet);
Ok(subnet)
} }
async fn modify_subnet( async fn modify_subnet(
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
edit_args: model::Subnet, edit_args: model::Subnet,
) -> Result<model::Subnet, ApiError> { ) -> Result<model::Subnet, String> {
if let Some(subnet) = Subnet::get_by_name(&self.ctx, &edit_args.name) if let Some(subnet) = Subnet::get_by_name(&self.ctx, &edit_args.name)
.await .await
.map_err(|e| e.to_string())? .map_err(|e| e.to_string())?
{ {
Err("Modifying subnets not yet supported".into()) todo!("support updating Subnets")
} else { } else {
Err(ErrorType::NotFound.into()) Err(format!("Subnet {} not found", &edit_args.name))
} }
} }
async fn get_subnets(self, _: tarpc::context::Context) -> Result<Vec<model::Subnet>, ApiError> { async fn get_subnets(self, _: tarpc::context::Context) -> Result<Vec<model::Subnet>, String> {
Subnet::all(&self.ctx) Subnet::all(&self.ctx).await.map_or_else(
.await |e| Err(e.to_string()),
.to_api_with("Couldn't get list of subnets") |v| {
.map(|v| { Ok(v.into_iter()
v.into_iter()
.filter_map(|s| match s.api_model() { .filter_map(|s| match s.api_model() {
Ok(model) => Some(model), Ok(model) => Some(model),
Err(err) => { Err(err) => {
@ -227,43 +216,23 @@ impl Nazrin for NzrServer {
None None
} }
}) })
.collect() .collect())
}) },
)
} }
async fn delete_subnet( async fn delete_subnet(
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
subnet_name: String, subnet_name: String,
) -> Result<(), ApiError> { ) -> Result<(), String> {
if let Some(subnet) = Subnet::get_by_name(&self.ctx, subnet_name) cmd::net::delete_subnet(&self.ctx, &subnet_name)
.await .await
.to_api_type(ErrorType::Database)? .map_err(|e| e.to_string())?;
{
let api_model = match subnet.api_model() {
Ok(model) => Some(model),
Err(err) => {
tracing::error!("Unable to generate model for clients: {err}");
None
}
};
subnet
.delete(&self.ctx)
.await
.to_api_type(ErrorType::Database)?;
if let Some(api_model) = api_model {
nzr_event!(&self.ctx.events, Deleted, api_model);
}
Ok(()) Ok(())
} else {
Err(ErrorType::NotFound.into())
}
} }
async fn garbage_collect(self, _: tarpc::context::Context) -> Result<(), ApiError> { async fn garbage_collect(self, _: tarpc::context::Context) -> Result<(), String> {
cmd::vm::prune_instances(&self.ctx) cmd::vm::prune_instances(&self.ctx)
.await .await
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
@ -274,52 +243,50 @@ impl Nazrin for NzrServer {
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
id: i32, id: i32,
) -> Result<Vec<u8>, ApiError> { ) -> Result<Vec<u8>, String> {
if let Some(db_model) = Instance::get(&self.ctx, id) let Some(db_model) = Instance::get(&self.ctx, id)
.await .await
.to_api_type(ErrorType::Database)? .map_err(|e| e.to_string())?
{ else {
return Err("Instance doesn't exist".to_owned());
};
Ok(db_model.ci_userdata.unwrap_or_default()) Ok(db_model.ci_userdata.unwrap_or_default())
} else {
Err(ErrorType::NotFound.into())
}
} }
async fn get_ssh_pubkeys( async fn get_ssh_pubkeys(
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
) -> Result<Vec<model::SshPubkey>, ApiError> { ) -> Result<Vec<model::SshPubkey>, String> {
SshPubkey::all(&self.ctx) SshPubkey::all(&self.ctx).await.map_or_else(
.await |e| Err(e.to_string()),
.to_api_type(ErrorType::Database) |k| Ok(k.iter().map(|k| k.api_model()).collect()),
.map(|k| k.iter().map(|k| k.api_model()).collect()) )
} }
async fn add_ssh_pubkey( async fn add_ssh_pubkey(
self, self,
_: tarpc::context::Context, _: tarpc::context::Context,
pub_key: String, pub_key: String,
) -> Result<model::SshPubkey, ApiError> { ) -> Result<model::SshPubkey, String> {
let pubkey = model::SshPubkey::from_str(&pub_key).to_api_type(ErrorType::Parse)?; let pubkey = model::SshPubkey::from_str(&pub_key).map_err(|e| e.to_string())?;
SshPubkey::insert(&self.ctx, pubkey.algorithm, pubkey.key_data, pubkey.comment) SshPubkey::insert(&self.ctx, pubkey.algorithm, pubkey.key_data, pubkey.comment)
.await .await
.to_api_type(ErrorType::Database) .map_err(|e| e.to_string())
.map(|k| k.api_model()) .map(|k| k.api_model())
} }
async fn delete_ssh_pubkey(self, _: tarpc::context::Context, id: i32) -> Result<(), ApiError> { async fn delete_ssh_pubkey(self, _: tarpc::context::Context, id: i32) -> Result<(), String> {
if let Some(key) = SshPubkey::get(&self.ctx, id) let Some(key) = SshPubkey::get(&self.ctx, id)
.await .await
.to_api_type(ErrorType::Database)? .map_err(|e| e.to_string())?
{ else {
key.delete(&self.ctx) return Err("SSH key with ID doesn't exist".into());
.await };
.to_api_type(ErrorType::Database)?;
key.delete(&self.ctx).await.map_err(|e| e.to_string())?;
Ok(()) Ok(())
} else {
Err(ErrorType::NotFound.into())
}
} }
} }
@ -357,6 +324,7 @@ pub async fn serve(ctx: Context) -> Result<(), Box<dyn std::error::Error>> {
debug!("Listening for new connection..."); debug!("Listening for new connection...");
let (conn, _addr) = listener.accept().await?; let (conn, _addr) = listener.accept().await?;
let ctx = ctx.clone(); let ctx = ctx.clone();
// hack?
tokio::spawn(async move { tokio::spawn(async move {
let framed = codec_builder.new_framed(conn); let framed = codec_builder.new_framed(conn);
let transport = tarpc::serde_transport::new(framed, Bincode::default()); let transport = tarpc::serde_transport::new(framed, Bincode::default());
@ -372,6 +340,6 @@ pub async fn serve(ctx: Context) -> Result<(), Box<dyn std::error::Error>> {
} }
struct InstCreateStatus { struct InstCreateStatus {
inner: JoinHandle<Result<model::Instance, ApiError>>, inner: JoinHandle<Result<model::Instance, String>>,
progress: Arc<RwLock<crate::ctrl::vm::Progress>>, progress: Arc<RwLock<crate::ctrl::vm::Progress>>,
} }

View file

@ -125,7 +125,11 @@ async fn handle_message(ctx: &Context, from: SocketAddr, msg: &Message) {
{ {
let opts = response.opts_mut(); let opts = response.opts_mut();
let giaddr = msg.giaddr(); let giaddr = if msg.giaddr().is_unspecified() {
todo!("no relay??")
} else {
msg.giaddr()
};
opts.insert(DhcpOption::ServerIdentifier(giaddr)); opts.insert(DhcpOption::ServerIdentifier(giaddr));
if let Some(time) = lease_time { if let Some(time) = lease_time {

View file

@ -1,15 +0,0 @@
[package]
name = "nzrdns"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
nzr-api = { path = "../nzr-api" }
hickory-server = "0.24"
hickory-proto = { version = "0.24", features = ["serde-config"] }
tracing = "0.1"
tracing-subscriber = "0.3"
async-trait = "0.1"
futures = "0.3"
anyhow = "1"

View file

@ -1,169 +0,0 @@
use std::{net::IpAddr, process::ExitCode};
use anyhow::Context;
use dns::ZoneData;
use futures::StreamExt;
use hickory_server::ServerFuture;
use nzr_api::{
config::Config,
event::{client::EventClient, EventMessage, ResourceAction},
NazrinClient,
};
use tokio::{
io::AsyncRead,
net::{UdpSocket, UnixStream},
};
mod dns;
/// Function to handle incoming events from Nazrin and update the DNS database
/// accordingly.
async fn event_handler<T: AsyncRead>(zones: ZoneData, mut events: EventClient<T>) {
while let Some(event) = events.next().await {
match event {
Ok(EventMessage::Instance(event)) => {
let ent = &event.entity;
match event.action {
ResourceAction::Created => {
if let Err(err) = zones.new_record(ent).await {
tracing::error!("Unable to add record {}: {err}", ent.name);
}
}
ResourceAction::Deleted => {
if let Err(err) = zones.delete_record(ent).await {
tracing::error!("Unable to delete record {}: {err}", ent.name);
}
}
misc => {
tracing::debug!("ignoring instance action {misc:?}");
}
}
}
Ok(EventMessage::Subnet(event)) => {
let ent = &event.entity;
match event.action {
ResourceAction::Created => {
if let Some(name) = ent.data.domain_name.as_ref() {
if let Err(err) = zones.new_zone(&ent.name, &ent.data).await {
tracing::error!("Unable to add zone {name}: {err}");
}
}
}
ResourceAction::Deleted => {
if ent.data.domain_name.as_ref().is_some() {
zones.delete_zone(&ent.name).await;
}
}
misc => {
tracing::debug!("ignoring subnet action {misc:?}");
}
}
}
Err(err) => {
tracing::error!("Error getting events: {err}");
}
}
}
tracing::warn!("No more events! (did Nazrin shut down?)");
}
/// Hydrates all existing DNS zones.
async fn hydrate_zones(zones: ZoneData, api_client: NazrinClient) -> anyhow::Result<()> {
tracing::info!("Hydrating initial zones...");
let subnets = api_client
.get_subnets(nzr_api::default_ctx())
.await
.context("RPC error getting subnets")?
.map_err(|e| anyhow::anyhow!("API error getting subnets: {e}"))?;
let instances = api_client
.get_instances(nzr_api::default_ctx(), false)
.await
.context("RPC error getting instances")?
.map_err(|e| anyhow::anyhow!("API error getting instances: {e}"))?;
for subnet in subnets {
if let Err(err) = zones.new_zone(&subnet.name, &subnet.data).await {
tracing::warn!("Couldn't create zone for {}: {err}", &subnet.name);
}
}
for instance in instances {
if let Err(err) = zones.new_record(&instance).await {
tracing::warn!("Couldn't create zone entry for {}: {err}", &instance.name);
}
}
Ok(())
}
#[tokio::main]
async fn main() -> ExitCode {
tracing_subscriber::fmt::init();
let cfg: Config = match Config::figment().extract() {
Ok(cfg) => cfg,
Err(err) => {
tracing::error!("Error parsing config: {err}");
return ExitCode::FAILURE;
}
};
let api_client = {
let sock = match UnixStream::connect(&cfg.rpc.socket_path).await {
Ok(sock) => sock,
Err(err) => {
tracing::error!("Connection to nzrd failed: {err}");
return ExitCode::FAILURE;
}
};
nzr_api::new_client(sock)
};
let events = {
let sock = match UnixStream::connect(&cfg.rpc.events_sock).await {
Ok(sock) => sock,
Err(err) => {
tracing::error!("Connections to events stream failed: {err}");
return ExitCode::FAILURE;
}
};
nzr_api::event::client::EventClient::new(sock)
};
let zones = ZoneData::new(&cfg.dns);
if let Err(err) = hydrate_zones(zones.clone(), api_client.clone()).await {
tracing::error!("{err}");
return ExitCode::FAILURE;
}
let mut dns_listener = ServerFuture::new(zones.catalog());
let dns_socket = {
let Ok(dns_ip) = cfg.dns.listen_addr.parse::<IpAddr>() else {
tracing::error!("Unable to parse listen_addr");
return ExitCode::FAILURE;
};
match UdpSocket::bind((dns_ip, cfg.dns.port)).await {
Ok(sock) => sock,
Err(err) => {
tracing::error!("Couldn't bind to {dns_ip}:{}: {err}", cfg.dns.port);
return ExitCode::FAILURE;
}
}
};
dns_listener.register_socket(dns_socket);
tokio::select! {
_ = event_handler(zones.clone(), events) => {
// nothing to do here
},
res = dns_listener.block_until_done() => {
if let Err(err) = res {
tracing::error!("Error from DNS: {err}");
}
}
}
ExitCode::SUCCESS
}

View file

@ -57,7 +57,7 @@ impl Context {
} }
pub async fn get_sshkeys(&self) -> Result<Vec<SshPubkey>> { pub async fn get_sshkeys(&self) -> Result<Vec<SshPubkey>> {
// We don't cache SSH keys, so always get from the API server // TODO: do we cache SSH keys? I don't like the idea of it
let ssh_keys = self let ssh_keys = self
.api_client .api_client
.get_ssh_pubkeys(nzr_api::default_ctx()) .get_ssh_pubkeys(nzr_api::default_ctx())

View file

@ -41,6 +41,7 @@ async fn get_meta_data(
Ok(Some(inst)) => { Ok(Some(inst)) => {
let meta = Metadata { let meta = Metadata {
inst_name: &inst.name, inst_name: &inst.name,
// XXX: this is very silly imo
ssh_pubkeys: ssh_pubkeys.iter().collect(), ssh_pubkeys: ssh_pubkeys.iter().collect(),
}; };
@ -98,7 +99,7 @@ async fn get_vendor_data(
// admin username from an unknown instance. // admin username from an unknown instance.
if let IpAddr::V4(ip) = addr.ip() { if let IpAddr::V4(ip) = addr.ip() {
match ctx.get_instance(ip).await { match ctx.get_instance(ip).await {
Ok(Some(_)) => { Ok(_) => {
let data = model::VendorData { let data = model::VendorData {
username: Some(&ctx.cfg().cloud.admin_user), username: Some(&ctx.cfg().cloud.admin_user),
}; };
@ -107,14 +108,14 @@ async fn get_vendor_data(
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
}) })
} }
Ok(None) => {
tracing::warn!("Request from unregistered server {ip}");
Err(StatusCode::FORBIDDEN)
}
Err(err) => { Err(err) => {
tracing::error!("{err}"); tracing::error!("{err}");
Err(StatusCode::INTERNAL_SERVER_ERROR) Err(StatusCode::INTERNAL_SERVER_ERROR)
} }
_ => {
tracing::warn!("Request from unregistered server {ip}");
Err(StatusCode::FORBIDDEN)
}
} }
} else { } else {
Err(StatusCode::BAD_REQUEST) Err(StatusCode::BAD_REQUEST)