nzrdns: the DNS part of nzrd, now not part of nzrd
This commit is contained in:
		
							parent
							
								
									19a08abb52
								
							
						
					
					
						commit
						d6eca32bc0
					
				
					 16 changed files with 532 additions and 113 deletions
				
			
		
							
								
								
									
										18
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										18
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							|  | @ -1560,6 +1560,7 @@ dependencies = [ | |||
| name = "nzr-api" | ||||
| version = "0.1.0" | ||||
| dependencies = [ | ||||
|  "bincode", | ||||
|  "diesel", | ||||
|  "figment", | ||||
|  "futures", | ||||
|  | @ -1571,6 +1572,8 @@ dependencies = [ | |||
|  "tarpc", | ||||
|  "thiserror", | ||||
|  "tokio", | ||||
|  "tokio-serde 0.9.0", | ||||
|  "tracing", | ||||
|  "uuid", | ||||
| ] | ||||
| 
 | ||||
|  | @ -1641,6 +1644,21 @@ dependencies = [ | |||
|  "tracing-subscriber", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "nzrdns" | ||||
| version = "0.1.0" | ||||
| dependencies = [ | ||||
|  "anyhow", | ||||
|  "async-trait", | ||||
|  "futures", | ||||
|  "hickory-proto", | ||||
|  "hickory-server", | ||||
|  "nzr-api", | ||||
|  "tokio", | ||||
|  "tracing", | ||||
|  "tracing-subscriber", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "object" | ||||
| version = "0.36.2" | ||||
|  |  | |||
|  | @ -1,3 +1,3 @@ | |||
| [workspace] | ||||
| members = ["nzrd", "nzr-api", "client", "nzrdhcp", "nzr-virt", "omyacid"] | ||||
| members = ["nzrd", "nzr-api", "client", "nzrdhcp", "nzr-virt", "omyacid", "nzrdns"] | ||||
| resolver = "2" | ||||
|  |  | |||
|  | @ -21,6 +21,9 @@ futures = { version = "0.3", optional = true } | |||
| thiserror = "1" | ||||
| regex = "1" | ||||
| lazy_static = "1" | ||||
| tracing = "0.1" | ||||
| tokio-serde = { version = "0.9", features = ["bincode"] } | ||||
| bincode = "1.3" | ||||
| 
 | ||||
| [dev-dependencies] | ||||
| uuid = { version = "1.2.2", features = ["serde", "v4"] } | ||||
|  |  | |||
|  | @ -72,6 +72,7 @@ impl CloudConfig { | |||
| pub struct RPCConfig { | ||||
|     pub socket_path: PathBuf, | ||||
|     pub admin_group: Option<String>, | ||||
|     pub events_sock: PathBuf, | ||||
| } | ||||
| 
 | ||||
| /// The root configuration struct.
 | ||||
|  | @ -98,6 +99,7 @@ impl Default for Config { | |||
|             rpc: RPCConfig { | ||||
|                 socket_path: PathBuf::from("/var/run/nazrin/nzrd.sock"), | ||||
|                 admin_group: None, | ||||
|                 events_sock: PathBuf::from("/var/run/nazrin/events.sock"), | ||||
|             }, | ||||
|             db_uri: "sqlite:/var/lib/nazrin/main_sql.db".to_owned(), | ||||
|             libvirt_uri: match std::env::var("LIBVIRT_URI") { | ||||
|  |  | |||
							
								
								
									
										53
									
								
								nzr-api/src/event/client.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										53
									
								
								nzr-api/src/event/client.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,53 @@ | |||
| use std::{pin::Pin, task::Poll}; | ||||
| 
 | ||||
| use futures::{Stream, TryStreamExt}; | ||||
| use tarpc::tokio_util::codec::{FramedRead, LengthDelimitedCodec}; | ||||
| use tokio::io::AsyncRead; | ||||
| 
 | ||||
| use super::{EventError, EventMessage}; | ||||
| 
 | ||||
| /// Client for receiving various events emitted by Nazrin.
 | ||||
| pub struct EventClient<T> | ||||
| where | ||||
|     T: AsyncRead, | ||||
| { | ||||
|     transport: Pin<Box<FramedRead<T, LengthDelimitedCodec>>>, | ||||
| } | ||||
| 
 | ||||
| impl<T> EventClient<T> | ||||
| where | ||||
|     T: AsyncRead, | ||||
| { | ||||
|     /// Creates a new EventClient.
 | ||||
|     pub fn new(inner: T) -> Self { | ||||
|         let transport = FramedRead::new(inner, LengthDelimitedCodec::new()); | ||||
|         Self { | ||||
|             transport: Box::pin(transport), | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl<T> Stream for EventClient<T> | ||||
| where | ||||
|     T: AsyncRead, | ||||
| { | ||||
|     type Item = Result<EventMessage, EventError>; | ||||
| 
 | ||||
|     fn poll_next( | ||||
|         mut self: Pin<&mut Self>, | ||||
|         cx: &mut std::task::Context<'_>, | ||||
|     ) -> Poll<Option<Self::Item>> { | ||||
|         match self.as_mut().transport.try_poll_next_unpin(cx) { | ||||
|             Poll::Ready(res) => { | ||||
|                 let our_res = res.map(|res| { | ||||
|                     res.map_err(|e| e.into()).and_then(|bytes| { | ||||
|                         let msg: EventMessage = bincode::deserialize(&bytes)?; | ||||
|                         Ok(msg) | ||||
|                     }) | ||||
|                 }); | ||||
|                 Poll::Ready(our_res) | ||||
|             } | ||||
|             Poll::Pending => Poll::Pending, | ||||
|         } | ||||
|     } | ||||
| } | ||||
							
								
								
									
										75
									
								
								nzr-api/src/event/mod.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										75
									
								
								nzr-api/src/event/mod.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,75 @@ | |||
| pub mod client; | ||||
| pub mod server; | ||||
| 
 | ||||
| use std::io; | ||||
| 
 | ||||
| use serde::{Deserialize, Serialize}; | ||||
| use thiserror::Error; | ||||
| 
 | ||||
| use crate::model; | ||||
| 
 | ||||
| #[derive(Clone, Serialize, Deserialize)] | ||||
| pub enum ResourceAction { | ||||
|     /// The referenced resource was created.
 | ||||
|     Created, | ||||
|     /// The referenced resource was deleted, and is no longer available.
 | ||||
|     Deleted, | ||||
|     /// The referenced resource was modified in some way.
 | ||||
|     Modified, | ||||
| } | ||||
| 
 | ||||
| /// Represents an event pertaining to a specific action.
 | ||||
| #[derive(Clone, Serialize, Deserialize)] | ||||
| pub struct ResourceEvent<T> { | ||||
|     pub action: ResourceAction, | ||||
|     /// The entity that was acted upon.
 | ||||
|     pub entity: T, | ||||
| } | ||||
| 
 | ||||
| /// Represents any event that is emitted by Nazrin.
 | ||||
| #[derive(Clone, Serialize, Deserialize)] | ||||
| #[serde(tag = "event")] | ||||
| pub enum EventMessage { | ||||
|     /// A subnet was created, modified, or deleted.
 | ||||
|     Subnet(ResourceEvent<model::Subnet>), | ||||
|     /// An instance was created, modified, or deleted.
 | ||||
|     Instance(ResourceEvent<model::Instance>), | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, Error)] | ||||
| pub enum EventError { | ||||
|     #[error("Transport error: {0}")] | ||||
|     Transport(#[from] io::Error), | ||||
|     #[error("Serialization error: {0}")] | ||||
|     Bincode(#[from] bincode::Error), | ||||
| } | ||||
| 
 | ||||
| pub trait Emittable { | ||||
|     fn as_event(&self, action: ResourceAction) -> EventMessage; | ||||
| } | ||||
| 
 | ||||
| macro_rules! emittable { | ||||
|     ($t:ty, $msg:ident) => { | ||||
|         impl Emittable for $t { | ||||
|             fn as_event(&self, action: ResourceAction) -> EventMessage { | ||||
|                 EventMessage::$msg(ResourceEvent { | ||||
|                     action, | ||||
|                     entity: self.clone(), | ||||
|                 }) | ||||
|             } | ||||
|         } | ||||
|     }; | ||||
| } | ||||
| 
 | ||||
| emittable!(model::Instance, Instance); | ||||
| emittable!(model::Subnet, Subnet); | ||||
| 
 | ||||
| #[macro_export] | ||||
| macro_rules! nzr_event { | ||||
|     ($srv:expr, $act:ident, $ent:tt) => {{ | ||||
|         use $crate::event::Emittable; | ||||
| 
 | ||||
|         $srv.emit($ent.as_event($crate::event::ResourceAction::$act)) | ||||
|             .await | ||||
|     }}; | ||||
| } | ||||
							
								
								
									
										115
									
								
								nzr-api/src/event/server.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										115
									
								
								nzr-api/src/event/server.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,115 @@ | |||
| use std::{io, net::SocketAddr, pin::Pin}; | ||||
| 
 | ||||
| use futures::SinkExt; | ||||
| use tarpc::tokio_util::codec::{FramedWrite, LengthDelimitedCodec}; | ||||
| use tokio::{ | ||||
|     io::AsyncWrite, | ||||
|     sync::broadcast::{self, Receiver, Sender}, | ||||
| }; | ||||
| use tracing::instrument; | ||||
| 
 | ||||
| use super::EventMessage; | ||||
| 
 | ||||
| /// Represents a connection to a client. Instead of being owned by the server
 | ||||
| /// struct, a [`tokio::sync::broadcast::Receiver`] is used to get the serialized
 | ||||
| /// message and pass it to the client.
 | ||||
| ///
 | ||||
| /// [`tokio::sync::broadcast::Receiver`]: tokio::sync::broadcast::Receiver
 | ||||
| struct EventEmitter<T> | ||||
| where | ||||
|     T: AsyncWrite + Send + 'static, | ||||
| { | ||||
|     transport: Pin<Box<FramedWrite<T, LengthDelimitedCodec>>>, | ||||
|     client_addr: SocketAddr, | ||||
|     channel: Receiver<Vec<u8>>, | ||||
| } | ||||
| 
 | ||||
| impl<T> EventEmitter<T> | ||||
| where | ||||
|     T: AsyncWrite + Send + 'static, | ||||
| { | ||||
|     fn new(inner: T, client_addr: SocketAddr, channel: Receiver<Vec<u8>>) -> Self { | ||||
|         let transport = FramedWrite::new(inner, LengthDelimitedCodec::new()); | ||||
|         Self { | ||||
|             transport: Box::pin(transport), | ||||
|             client_addr, | ||||
|             channel, | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     #[instrument(skip(self), fields(client = %self.client_addr))] | ||||
|     async fn handler(&mut self) -> bool { | ||||
|         match self.channel.recv().await { | ||||
|             Ok(msg) => { | ||||
|                 if let Err(err) = self.transport.send(msg.into()).await { | ||||
|                     tracing::error!("Couldn't write to client: {err}"); | ||||
|                     false | ||||
|                 } else { | ||||
|                     true | ||||
|                 } | ||||
|             } | ||||
|             Err(err) => { | ||||
|                 tracing::error!("IPC error: {err}"); | ||||
|                 false | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     fn run(mut self) { | ||||
|         tokio::spawn(async move { while self.handler().await {} }); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| /// Handles the creation and sending of events to clients.
 | ||||
| pub struct EventServer { | ||||
|     channel: Sender<Vec<u8>>, | ||||
| } | ||||
| 
 | ||||
| // TODO: consider letting this be configurable
 | ||||
| const MAX_RECEIVERS: usize = 16; | ||||
| 
 | ||||
| impl EventServer { | ||||
|     /// Creates a new EventServer.
 | ||||
|     pub fn new() -> Self { | ||||
|         let (channel, _) = broadcast::channel(MAX_RECEIVERS); | ||||
|         Self { channel } | ||||
|     } | ||||
| 
 | ||||
|     /// Spawns a new [`EventEmitter`] where events will be sent to.
 | ||||
|     pub async fn spawn<T: AsyncWrite + Send + 'static>( | ||||
|         &self, | ||||
|         inner: T, | ||||
|         client_addr: SocketAddr, | ||||
|     ) -> io::Result<()> { | ||||
|         // Sender<T> doesn't have a try_subscribe, so this is our last-ditch
 | ||||
|         // effort to avoid a panic
 | ||||
|         if self.channel.receiver_count() + 1 > MAX_RECEIVERS { | ||||
|             todo!("Too many connections!"); | ||||
|         } | ||||
| 
 | ||||
|         EventEmitter::new(inner, client_addr, self.channel.subscribe()).run(); | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     /// Send the given event to all connected clients.
 | ||||
|     pub async fn emit(&self, msg: EventMessage) { | ||||
|         let bytes = match bincode::serialize(&msg) { | ||||
|             Ok(bytes) => bytes, | ||||
|             Err(err) => { | ||||
|                 tracing::error!("Failed to serialize: {err}"); | ||||
|                 return; | ||||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         if self.channel.send(bytes).is_err() { | ||||
|             tracing::debug!("Tried to emit an event, but no clients were around to hear it"); | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl Default for EventServer { | ||||
|     fn default() -> Self { | ||||
|         Self::new() | ||||
|     } | ||||
| } | ||||
|  | @ -4,6 +4,7 @@ use model::{CreateStatus, Instance, SshPubkey, Subnet}; | |||
| 
 | ||||
| pub mod args; | ||||
| pub mod config; | ||||
| pub mod event; | ||||
| #[cfg(feature = "mock")] | ||||
| pub mod mock; | ||||
| pub mod model; | ||||
|  |  | |||
|  | @ -15,11 +15,7 @@ pub async fn add_subnet( | |||
|         Transaction::begin(ctx, s) | ||||
|     }; | ||||
| 
 | ||||
|     if let Err(err) = ctx.zones.new_zone(&subnet).await { | ||||
|         Err(cmd_error!("Failed to create new DNS zone: {}", err)) | ||||
|     } else { | ||||
|         Ok(subnet.take()) | ||||
|     } | ||||
|     Ok(subnet.take()) | ||||
| } | ||||
| 
 | ||||
| pub async fn delete_subnet( | ||||
|  | @ -31,9 +27,7 @@ pub async fn delete_subnet( | |||
|         .map_err(|er| cmd_error!("Couldn't find subnet: {}", er))? | ||||
|     { | ||||
|         Some(subnet) => { | ||||
|             if let Some(domain_name) = &subnet.domain_name { | ||||
|                 ctx.zones.delete_zone(domain_name).await; | ||||
|             } | ||||
|             // TODO: notify clients
 | ||||
| 
 | ||||
|             subnet | ||||
|                 .delete(ctx) | ||||
|  |  | |||
|  | @ -10,8 +10,8 @@ use crate::ctrl::vm::Progress; | |||
| use crate::ctx::Context; | ||||
| use crate::model::{Instance, Subnet}; | ||||
| use log::{debug, info, warn}; | ||||
| use nzr_api::args; | ||||
| use nzr_api::net::mac::MacAddr; | ||||
| use nzr_api::{args, model, nzr_event}; | ||||
| use std::sync::Arc; | ||||
| 
 | ||||
| const VIRT_MAC_OUI: &[u8] = &[0x02, 0xf1, 0x0f]; | ||||
|  | @ -192,10 +192,20 @@ pub async fn new_instance( | |||
|     } | ||||
| } | ||||
| 
 | ||||
| pub async fn delete_instance(ctx: Context, name: String) -> Result<(), Box<dyn std::error::Error>> { | ||||
| pub async fn delete_instance( | ||||
|     ctx: Context, | ||||
|     name: String, | ||||
| ) -> Result<Option<model::Instance>, Box<dyn std::error::Error>> { | ||||
|     let Some(inst_db) = Instance::get_by_name(&ctx, &name).await? else { | ||||
|         return Err(cmd_error!("Instance {name} not found")); | ||||
|     }; | ||||
|     let api_model = match inst_db.api_model(&ctx).await { | ||||
|         Ok(model) => Some(model), | ||||
|         Err(err) => { | ||||
|             warn!("Couldn't get API model to notify clients: {err}"); | ||||
|             None | ||||
|         } | ||||
|     }; | ||||
|     // First, destroy the instance
 | ||||
|     match ctx.virt.conn.get_instance(name.clone()).await { | ||||
|         Ok(mut inst) => { | ||||
|  | @ -210,18 +220,32 @@ pub async fn delete_instance(ctx: Context, name: String) -> Result<(), Box<dyn s | |||
|     // Then, delete the DB entity
 | ||||
|     inst_db.delete(&ctx).await?; | ||||
| 
 | ||||
|     Ok(()) | ||||
|     Ok(api_model) | ||||
| } | ||||
| 
 | ||||
| /// Delete all instances that don't have a matching libvirt domain
 | ||||
| pub async fn prune_instances(ctx: &Context) -> Result<(), Box<dyn std::error::Error>> { | ||||
|     for entity in Instance::all(ctx).await? { | ||||
|         if let Err(err) = ctx.virt.conn.get_instance(&entity.name).await { | ||||
|             if err == DomainError::DomainNotFound { | ||||
|                 info!("Invalid domain {}, deleting", &entity.name); | ||||
|                 let name = entity.name.clone(); | ||||
|                 if let Err(err) = entity.delete(ctx).await { | ||||
|                     warn!("Couldn't delete {}: {}", name, err); | ||||
|         if let Err(DomainError::DomainNotFound) = ctx.virt.conn.get_instance(&entity.name).await { | ||||
|             info!("Invalid domain {}, deleting", &entity.name); | ||||
|             // First, get the API model to notify clients with
 | ||||
|             let api_model = match entity.api_model(ctx).await { | ||||
|                 Ok(ent) => Some(ent), | ||||
|                 Err(err) => { | ||||
|                     warn!("Couldn't get api model to notify clients: {err}"); | ||||
|                     None | ||||
|                 } | ||||
|             }; | ||||
| 
 | ||||
|             // then, delete by name
 | ||||
|             let name = entity.name.clone(); | ||||
|             if let Err(err) = entity.delete(ctx).await { | ||||
|                 warn!("Couldn't delete {}: {}", name, err); | ||||
|             } | ||||
| 
 | ||||
|             // and assuming all goes well, notify clients
 | ||||
|             if let Some(ent) = api_model { | ||||
|                 nzr_event!(ctx.events, Deleted, ent); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  |  | |||
|  | @ -8,8 +8,7 @@ use nzr_virt::{vol, Connection}; | |||
| use std::ops::Deref; | ||||
| use thiserror::Error; | ||||
| 
 | ||||
| use crate::dns::ZoneData; | ||||
| use nzr_api::config::Config; | ||||
| use nzr_api::{config::Config, event::server::EventServer}; | ||||
| use std::sync::Arc; | ||||
| 
 | ||||
| #[cfg(test)] | ||||
|  | @ -42,8 +41,8 @@ impl Deref for Context { | |||
| pub struct InnerCtx { | ||||
|     pub sqldb: diesel::r2d2::Pool<ConnectionManager<SqliteConnection>>, | ||||
|     pub config: Config, | ||||
|     pub zones: crate::dns::ZoneData, | ||||
|     pub virt: VirtCtx, | ||||
|     pub events: Arc<EventServer>, | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, Error)] | ||||
|  | @ -60,7 +59,6 @@ pub enum ContextError { | |||
| 
 | ||||
| impl InnerCtx { | ||||
|     async fn new(config: Config) -> Result<Self, ContextError> { | ||||
|         let zones = ZoneData::new(&config.dns); | ||||
|         let conn = Connection::open(&config.libvirt_uri)?; | ||||
| 
 | ||||
|         let pools = PoolRefs { | ||||
|  | @ -90,11 +88,13 @@ impl InnerCtx { | |||
|             .unwrap()?; | ||||
|         } | ||||
| 
 | ||||
|         let events = Arc::new(EventServer::new()); | ||||
| 
 | ||||
|         Ok(Self { | ||||
|             sqldb, | ||||
|             config, | ||||
|             zones, | ||||
|             virt: VirtCtx { conn, pools }, | ||||
|             events, | ||||
|         }) | ||||
|     } | ||||
| } | ||||
|  |  | |||
|  | @ -1,17 +1,13 @@ | |||
| mod cmd; | ||||
| mod ctrl; | ||||
| mod ctx; | ||||
| mod dns; | ||||
| mod model; | ||||
| mod rpc; | ||||
| 
 | ||||
| use hickory_server::ServerFuture; | ||||
| use log::LevelFilter; | ||||
| use log::*; | ||||
| use model::{Instance, Subnet}; | ||||
| use nzr_api::config; | ||||
| use std::{net::IpAddr, str::FromStr}; | ||||
| use tokio::net::UdpSocket; | ||||
| use std::str::FromStr; | ||||
| 
 | ||||
| #[tokio::main(flavor = "multi_thread")] | ||||
| async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||||
|  | @ -23,59 +19,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { | |||
|         LevelFilter::from_str(ctx.config.log_level.as_str())?, | ||||
|     )?; | ||||
| 
 | ||||
|     info!("Hydrating initial zones..."); | ||||
|     for subnet in Subnet::all(&ctx).await? { | ||||
|         // A records
 | ||||
|         if let Err(err) = ctx.zones.new_zone(&subnet).await { | ||||
|             error!("Couldn't create zone for {}: {}", &subnet.ifname, err); | ||||
|             continue; | ||||
|         } | ||||
|         match Instance::all_in_subnet(&ctx, &subnet).await { | ||||
|             Ok(leases) => { | ||||
|                 for lease in leases { | ||||
|                     let Ok(lease_addr) = subnet.network.make_ip(lease.host_num as u32) else { | ||||
|                         warn!("Ignoring {} due to lease address issue", &lease.name); | ||||
|                         continue; | ||||
|                     }; | ||||
| 
 | ||||
|                     if let Err(err) = ctx | ||||
|                         .zones | ||||
|                         .new_record(&subnet.ifname.to_string(), &lease.name, lease_addr) | ||||
|                         .await | ||||
|                     { | ||||
|                         error!( | ||||
|                             "Failed to set up lease for {} in {}: {}", | ||||
|                             &lease.name, &subnet.ifname, err | ||||
|                         ); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             Err(err) => { | ||||
|                 error!("Couldn't get leases for {}: {}", &subnet.ifname, err); | ||||
|                 continue; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     // DNS init
 | ||||
|     let mut dns_listener = ServerFuture::new(ctx.zones.catalog()); | ||||
|     let dns_socket = { | ||||
|         let dns_ip: IpAddr = ctx.config.dns.listen_addr.parse()?; | ||||
|         UdpSocket::bind((dns_ip, ctx.config.dns.port)).await? | ||||
|     }; | ||||
|     dns_listener.register_socket(dns_socket); | ||||
| 
 | ||||
|     tokio::select! { | ||||
|         res = rpc::serve(ctx.clone()) => { | ||||
|             if let Err(err) = res { | ||||
|                 error!("Error from RPC: {}", err); | ||||
|             } | ||||
|         }, | ||||
|         res = dns_listener.block_until_done() => { | ||||
|             if let Err(err) = res { | ||||
|                 error!("Error from DNS: {}", err); | ||||
|             } | ||||
|         } | ||||
|     if let Err(err) = rpc::serve(ctx.clone()).await { | ||||
|         error!("Error from RPC: {}", err); | ||||
|     } | ||||
| 
 | ||||
|     Ok(()) | ||||
|  |  | |||
|  | @ -1,5 +1,5 @@ | |||
| use futures::{future, StreamExt}; | ||||
| use nzr_api::{args, model, InstanceQuery, Nazrin}; | ||||
| use nzr_api::{args, model, nzr_event, InstanceQuery, Nazrin}; | ||||
| use std::str::FromStr; | ||||
| use std::sync::Arc; | ||||
| use tarpc::server::{BaseChannel, Channel}; | ||||
|  | @ -59,6 +59,9 @@ impl Nazrin for NzrServer { | |||
|                     warn!("Unable to get instance state: {err}"); | ||||
|                 } | ||||
|             } | ||||
| 
 | ||||
|             // Inform event listeners
 | ||||
|             nzr_event!(self.ctx.events, Created, api_model); | ||||
|             Ok(api_model) | ||||
|         }); | ||||
| 
 | ||||
|  | @ -109,9 +112,13 @@ impl Nazrin for NzrServer { | |||
|     } | ||||
| 
 | ||||
|     async fn delete_instance(self, _: tarpc::context::Context, name: String) -> Result<(), String> { | ||||
|         cmd::vm::delete_instance(self.ctx.clone(), name) | ||||
|         let api_model = cmd::vm::delete_instance(self.ctx.clone(), name) | ||||
|             .await | ||||
|             .map_err(|e| format!("Couldn't delete instance: {}", e))?; | ||||
| 
 | ||||
|         if let Some(api_model) = api_model { | ||||
|             nzr_event!(self.ctx.events, Deleted, api_model); | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|  | @ -182,11 +189,15 @@ impl Nazrin for NzrServer { | |||
|         _: tarpc::context::Context, | ||||
|         build_args: model::Subnet, | ||||
|     ) -> Result<model::Subnet, String> { | ||||
|         cmd::net::add_subnet(&self.ctx, build_args) | ||||
|         let subnet = cmd::net::add_subnet(&self.ctx, build_args) | ||||
|             .await | ||||
|             .map_err(|e| e.to_string())? | ||||
|             .api_model() | ||||
|             .map_err(|e| e.to_string()) | ||||
|             .map_err(|e| e.to_string())?; | ||||
| 
 | ||||
|         // inform event listeners
 | ||||
|         nzr_event!(self.ctx.events, Created, subnet); | ||||
|         Ok(subnet) | ||||
|     } | ||||
| 
 | ||||
|     async fn modify_subnet( | ||||
|  | @ -198,7 +209,7 @@ impl Nazrin for NzrServer { | |||
|             .await | ||||
|             .map_err(|e| e.to_string())? | ||||
|         { | ||||
|             todo!("support updating Subnets") | ||||
|             Err("Modifying subnets not yet supported".into()) | ||||
|         } else { | ||||
|             Err(format!("Subnet {} not found", &edit_args.name)) | ||||
|         } | ||||
|  |  | |||
							
								
								
									
										15
									
								
								nzrdns/Cargo.toml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										15
									
								
								nzrdns/Cargo.toml
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,15 @@ | |||
| [package] | ||||
| name = "nzrdns" | ||||
| version = "0.1.0" | ||||
| edition = "2021" | ||||
| 
 | ||||
| [dependencies] | ||||
| tokio = { version = "1", features = ["macros", "rt-multi-thread"] } | ||||
| nzr-api = { path = "../nzr-api" } | ||||
| hickory-server = "0.24" | ||||
| hickory-proto = { version = "0.24", features = ["serde-config"] } | ||||
| tracing = "0.1" | ||||
| tracing-subscriber = "0.3" | ||||
| async-trait = "0.1" | ||||
| futures = "0.3" | ||||
| anyhow = "1" | ||||
|  | @ -1,14 +1,13 @@ | |||
| use crate::model::Subnet; | ||||
| use log::*; | ||||
| use nzr_api::config::DNSConfig; | ||||
| use std::borrow::Borrow; | ||||
| use std::collections::{BTreeMap, HashMap}; | ||||
| use std::net::Ipv4Addr; | ||||
| use std::ops::Deref; | ||||
| use std::str::FromStr; | ||||
| use std::sync::Arc; | ||||
| use tokio::sync::{Mutex, RwLock}; | ||||
| 
 | ||||
| use nzr_api::model::{Instance, SubnetData}; | ||||
| 
 | ||||
| use hickory_proto::rr::Name; | ||||
| use hickory_server::authority::{AuthorityObject, Catalog}; | ||||
| use hickory_server::proto::rr::{rdata::soa, RData, RecordSet}; | ||||
|  | @ -70,7 +69,7 @@ pub struct InnerZD { | |||
| } | ||||
| 
 | ||||
| pub fn make_rectree_with_soa(name: &Name, config: &DNSConfig) -> BTreeMap<RrKey, RecordSet> { | ||||
|     debug!("Creating initial SOA for {}", &name); | ||||
|     tracing::debug!("Creating initial SOA for {}", &name); | ||||
|     let mut records: BTreeMap<RrKey, RecordSet> = BTreeMap::new(); | ||||
|     let soa_key = RrKey::new( | ||||
|         LowerName::from(name), | ||||
|  | @ -119,24 +118,28 @@ impl InnerZD { | |||
|     } | ||||
| 
 | ||||
|     /// Creates a new DNS zone for the given subnet.
 | ||||
|     pub async fn new_zone(&self, subnet: &Subnet) -> Result<(), Box<dyn std::error::Error>> { | ||||
|     pub async fn new_zone( | ||||
|         &self, | ||||
|         zone_id: impl AsRef<str>, | ||||
|         subnet: &SubnetData, | ||||
|     ) -> Result<(), Box<dyn std::error::Error>> { | ||||
|         if let Some(name) = &subnet.domain_name { | ||||
|             let name: Name = name.parse()?; | ||||
|             let rectree = make_rectree_with_soa(&name, &self.config); | ||||
|             let rectree = make_rectree_with_soa(name, &self.config); | ||||
|             let auth = InMemoryAuthority::new( | ||||
|                 name, | ||||
|                 name.clone(), | ||||
|                 rectree, | ||||
|                 hickory_server::authority::ZoneType::Primary, | ||||
|                 false, | ||||
|             )?; | ||||
|             self.import(&subnet.ifname.to_string(), auth).await; | ||||
|             self.import(zone_id.as_ref(), auth).await; | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn import(&self, name: &str, auth: InMemoryAuthority) { | ||||
|     /// Generates a zone with the given records.
 | ||||
|     async fn import(&self, name: &str, auth: InMemoryAuthority) { | ||||
|         let auth_arc = Arc::new(auth); | ||||
|         log::debug!( | ||||
|         tracing::debug!( | ||||
|             "Importing {} with {} records...", | ||||
|             name, | ||||
|             auth_arc.records().await.len() | ||||
|  | @ -159,26 +162,23 @@ impl InnerZD { | |||
|     } | ||||
| 
 | ||||
|     /// Adds a new host record in the DNS zone.
 | ||||
|     pub async fn new_record( | ||||
|         &self, | ||||
|         interface: &str, | ||||
|         name: &str, | ||||
|         addr: Ipv4Addr, | ||||
|     ) -> Result<(), Box<dyn std::error::Error>> { | ||||
|         let hostname = Name::from_str(name)?; | ||||
|     pub async fn new_record(&self, inst: &Instance) -> Result<(), Box<dyn std::error::Error>> { | ||||
|         let hostname = Name::from_str(&inst.name)?; | ||||
|         let zones = self.map.lock().await; | ||||
|         let zone = zones.get(interface).unwrap_or(&self.default_zone); | ||||
|         let zone = zones.get(&inst.lease.subnet).unwrap_or(&self.default_zone); | ||||
|         let fqdn = { | ||||
|             let origin: Name = zone.origin().into(); | ||||
|             hostname.append_domain(&origin)? | ||||
|         }; | ||||
| 
 | ||||
|         log::debug!( | ||||
|         tracing::debug!( | ||||
|             "Creating new host entry {} in zone {}...", | ||||
|             &fqdn, | ||||
|             zone.origin() | ||||
|         ); | ||||
| 
 | ||||
|         let addr = inst.lease.addr.addr; | ||||
| 
 | ||||
|         let record = Record::from_rdata(fqdn, 3600, RData::A(addr.into())); | ||||
|         zone.upsert(record, 0).await; | ||||
|         self.catalog() | ||||
|  | @ -189,14 +189,10 @@ impl InnerZD { | |||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn delete_record( | ||||
|         &self, | ||||
|         interface: &str, | ||||
|         name: &str, | ||||
|     ) -> Result<bool, Box<dyn std::error::Error>> { | ||||
|         let hostname = Name::from_str(name)?; | ||||
|     pub async fn delete_record(&self, inst: &Instance) -> Result<bool, Box<dyn std::error::Error>> { | ||||
|         let hostname = Name::from_str(&inst.name)?; | ||||
|         let mut zones = self.map.lock().await; | ||||
|         if let Some(zone) = zones.get_mut(interface) { | ||||
|         if let Some(zone) = zones.get_mut(&inst.lease.subnet) { | ||||
|             let hostname: LowerName = hostname.into(); | ||||
|             self.catalog.0.write().await.remove(&hostname); | ||||
|             let key = RrKey::new(hostname, hickory_server::proto::rr::RecordType::A); | ||||
							
								
								
									
										167
									
								
								nzrdns/src/main.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										167
									
								
								nzrdns/src/main.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,167 @@ | |||
| use std::{net::IpAddr, process::ExitCode}; | ||||
| 
 | ||||
| use anyhow::Context; | ||||
| use dns::ZoneData; | ||||
| use futures::StreamExt; | ||||
| use hickory_server::ServerFuture; | ||||
| use nzr_api::{ | ||||
|     config::Config, | ||||
|     event::{client::EventClient, EventMessage, ResourceAction}, | ||||
|     NazrinClient, | ||||
| }; | ||||
| use tokio::{ | ||||
|     io::AsyncRead, | ||||
|     net::{UdpSocket, UnixStream}, | ||||
| }; | ||||
| 
 | ||||
| mod dns; | ||||
| 
 | ||||
| /// Function to handle incoming events from Nazrin and update the DNS database
 | ||||
| /// accordingly.
 | ||||
| async fn event_handler<T: AsyncRead>(zones: ZoneData, mut events: EventClient<T>) { | ||||
|     while let Some(event) = events.next().await { | ||||
|         match event { | ||||
|             Ok(EventMessage::Instance(event)) => { | ||||
|                 let ent = &event.entity; | ||||
|                 match event.action { | ||||
|                     ResourceAction::Created => { | ||||
|                         if let Err(err) = zones.new_record(ent).await { | ||||
|                             tracing::error!("Unable to add record {}: {err}", ent.name); | ||||
|                         } | ||||
|                     } | ||||
|                     ResourceAction::Deleted => { | ||||
|                         if let Err(err) = zones.delete_record(ent).await { | ||||
|                             tracing::error!("Unable to delete record {}: {err}", ent.name); | ||||
|                         } | ||||
|                     } | ||||
|                     ResourceAction::Modified => { | ||||
|                         todo!(); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             Ok(EventMessage::Subnet(event)) => { | ||||
|                 let ent = &event.entity; | ||||
|                 match event.action { | ||||
|                     ResourceAction::Created => { | ||||
|                         if let Some(name) = ent.data.domain_name.as_ref() { | ||||
|                             if let Err(err) = zones.new_zone(&ent.name, &ent.data).await { | ||||
|                                 tracing::error!("Unable to add zone {name}: {err}"); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                     ResourceAction::Deleted => { | ||||
|                         if ent.data.domain_name.as_ref().is_some() { | ||||
|                             zones.delete_zone(&ent.name).await; | ||||
|                         } | ||||
|                     } | ||||
|                     ResourceAction::Modified => { | ||||
|                         todo!(); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             Err(err) => { | ||||
|                 tracing::error!("Error getting events: {err}"); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| /// Hydrates all existing DNS zones.
 | ||||
| async fn hydrate_zones(zones: ZoneData, api_client: NazrinClient) -> anyhow::Result<()> { | ||||
|     tracing::info!("Hydrating initial zones..."); | ||||
|     let subnets = api_client | ||||
|         .get_subnets(nzr_api::default_ctx()) | ||||
|         .await | ||||
|         .context("RPC error getting subnets")? | ||||
|         .map_err(|e| anyhow::anyhow!("API error getting subnets: {e}"))?; | ||||
| 
 | ||||
|     let instances = api_client | ||||
|         .get_instances(nzr_api::default_ctx(), false) | ||||
|         .await | ||||
|         .context("RPC error getting instances")? | ||||
|         .map_err(|e| anyhow::anyhow!("API error getting instances: {e}"))?; | ||||
| 
 | ||||
|     for subnet in subnets { | ||||
|         if let Err(err) = zones.new_zone(&subnet.name, &subnet.data).await { | ||||
|             tracing::warn!("Couldn't create zone for {}: {err}", &subnet.name); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     for instance in instances { | ||||
|         if let Err(err) = zones.new_record(&instance).await { | ||||
|             tracing::warn!("Couldn't create zone entry for {}: {err}", &instance.name); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| #[tokio::main] | ||||
| async fn main() -> ExitCode { | ||||
|     tracing_subscriber::fmt::init(); | ||||
|     let cfg: Config = match Config::figment().extract() { | ||||
|         Ok(cfg) => cfg, | ||||
|         Err(err) => { | ||||
|             tracing::error!("Error parsing config: {err}"); | ||||
|             return ExitCode::FAILURE; | ||||
|         } | ||||
|     }; | ||||
| 
 | ||||
|     let api_client = { | ||||
|         let sock = match UnixStream::connect(&cfg.rpc.socket_path).await { | ||||
|             Ok(sock) => sock, | ||||
|             Err(err) => { | ||||
|                 tracing::error!("Connection to nzrd failed: {err}"); | ||||
|                 return ExitCode::FAILURE; | ||||
|             } | ||||
|         }; | ||||
|         nzr_api::new_client(sock) | ||||
|     }; | ||||
|     let events = { | ||||
|         let sock = match UnixStream::connect(&cfg.rpc.events_sock).await { | ||||
|             Ok(sock) => sock, | ||||
|             Err(err) => { | ||||
|                 tracing::error!("Connections to events stream failed: {err}"); | ||||
|                 return ExitCode::FAILURE; | ||||
|             } | ||||
|         }; | ||||
|         nzr_api::event::client::EventClient::new(sock) | ||||
|     }; | ||||
| 
 | ||||
|     let zones = ZoneData::new(&cfg.dns); | ||||
| 
 | ||||
|     if let Err(err) = hydrate_zones(zones.clone(), api_client.clone()).await { | ||||
|         tracing::error!("{err}"); | ||||
|         return ExitCode::FAILURE; | ||||
|     } | ||||
| 
 | ||||
|     let mut dns_listener = ServerFuture::new(zones.catalog()); | ||||
|     let dns_socket = { | ||||
|         let Ok(dns_ip) = cfg.dns.listen_addr.parse::<IpAddr>() else { | ||||
|             tracing::error!("Unable to parse listen_addr"); | ||||
|             return ExitCode::FAILURE; | ||||
|         }; | ||||
| 
 | ||||
|         match UdpSocket::bind((dns_ip, cfg.dns.port)).await { | ||||
|             Ok(sock) => sock, | ||||
|             Err(err) => { | ||||
|                 tracing::error!("Couldn't bind to {dns_ip}:{}: {err}", cfg.dns.port); | ||||
|                 return ExitCode::FAILURE; | ||||
|             } | ||||
|         } | ||||
|     }; | ||||
|     dns_listener.register_socket(dns_socket); | ||||
| 
 | ||||
|     tokio::select! { | ||||
|         _ = event_handler(zones.clone(), events) => { | ||||
|             todo!(); | ||||
|         }, | ||||
|         res = dns_listener.block_until_done() => { | ||||
|             if let Err(err) = res { | ||||
|                 tracing::error!("Error from DNS: {err}"); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     ExitCode::SUCCESS | ||||
| } | ||||
		Loading…
	
		Reference in a new issue