Compare commits
12 commits
0cb3aea62e
...
40532c9e36
Author | SHA1 | Date | |
---|---|---|---|
snow flurry | 40532c9e36 | ||
snow flurry | ece1f9a089 | ||
snow flurry | ba86368591 | ||
snow flurry | 42fad4920a | ||
snow flurry | 1d97134839 | ||
snow flurry | 459682d182 | ||
snow flurry | f0772b10e2 | ||
snow flurry | f1dd375e2f | ||
snow flurry | 6fe1ed02aa | ||
snow flurry | f63626489d | ||
snow flurry | d6eca32bc0 | ||
snow flurry | 19a08abb52 |
75
Cargo.lock
generated
75
Cargo.lock
generated
|
@ -1046,17 +1046,6 @@ dependencies = [
|
|||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hostname"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"windows",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "http"
|
||||
version = "1.1.0"
|
||||
|
@ -1532,15 +1521,6 @@ dependencies = [
|
|||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_threads"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nzr"
|
||||
version = "0.9.0"
|
||||
|
@ -1568,9 +1548,12 @@ dependencies = [
|
|||
"log",
|
||||
"regex",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tarpc",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-serde 0.9.0",
|
||||
"tracing",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
|
@ -1604,7 +1587,6 @@ dependencies = [
|
|||
"home",
|
||||
"libc",
|
||||
"libsqlite3-sys",
|
||||
"log",
|
||||
"nix",
|
||||
"nzr-api",
|
||||
"nzr-virt",
|
||||
|
@ -1616,12 +1598,13 @@ dependencies = [
|
|||
"serde_with",
|
||||
"serde_yaml",
|
||||
"stdext",
|
||||
"syslog",
|
||||
"tarpc",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-serde 0.9.0",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"trait-variant",
|
||||
"uuid",
|
||||
"zerocopy",
|
||||
|
@ -1641,6 +1624,21 @@ dependencies = [
|
|||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nzrdns"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"futures",
|
||||
"hickory-proto",
|
||||
"hickory-server",
|
||||
"nzr-api",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "object"
|
||||
version = "0.36.2"
|
||||
|
@ -2291,18 +2289,6 @@ version = "1.0.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394"
|
||||
|
||||
[[package]]
|
||||
name = "syslog"
|
||||
version = "7.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "019f1500a13379b7d051455df397c75770de6311a7a188a699499502704d9f10"
|
||||
dependencies = [
|
||||
"hostname",
|
||||
"libc",
|
||||
"log",
|
||||
"time",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tabled"
|
||||
version = "0.15.0"
|
||||
|
@ -2426,9 +2412,7 @@ checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885"
|
|||
dependencies = [
|
||||
"deranged",
|
||||
"itoa",
|
||||
"libc",
|
||||
"num-conv",
|
||||
"num_threads",
|
||||
"powerfmt",
|
||||
"serde",
|
||||
"time-core",
|
||||
|
@ -2942,25 +2926,6 @@ version = "0.4.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
||||
|
||||
[[package]]
|
||||
name = "windows"
|
||||
version = "0.52.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be"
|
||||
dependencies = [
|
||||
"windows-core",
|
||||
"windows-targets",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-core"
|
||||
version = "0.52.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
|
||||
dependencies = [
|
||||
"windows-targets",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.42.0"
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
[workspace]
|
||||
members = ["nzrd", "nzr-api", "client", "nzrdhcp", "nzr-virt", "omyacid"]
|
||||
members = ["nzrd", "nzr-api", "client", "nzrdhcp", "nzr-virt", "omyacid", "nzrdns"]
|
||||
resolver = "2"
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use clap::{CommandFactory, FromArgMatches, Parser, Subcommand};
|
||||
use nzr_api::config;
|
||||
use nzr_api::error::Simplify;
|
||||
use nzr_api::hickory_proto::rr::Name;
|
||||
use nzr_api::model;
|
||||
use nzr_api::net::cidr::CidrV4;
|
||||
|
@ -332,9 +333,9 @@ async fn handle_command() -> Result<(), Box<dyn std::error::Error>> {
|
|||
let mut net = client
|
||||
.get_subnets(nzr_api::default_ctx())
|
||||
.await
|
||||
.map_err(|e| e.to_string())
|
||||
.simplify()
|
||||
.and_then(|res| {
|
||||
res?.iter()
|
||||
res.iter()
|
||||
.find_map(|ent| {
|
||||
if ent.name == args.name {
|
||||
Some(ent.clone())
|
||||
|
@ -342,7 +343,7 @@ async fn handle_command() -> Result<(), Box<dyn std::error::Error>> {
|
|||
None
|
||||
}
|
||||
})
|
||||
.ok_or_else(|| format!("Couldn't find network {}", &args.name))
|
||||
.ok_or_else(|| format!("Couldn't find network {}", &args.name).into())
|
||||
})?;
|
||||
|
||||
// merge in the new args
|
||||
|
@ -365,15 +366,11 @@ async fn handle_command() -> Result<(), Box<dyn std::error::Error>> {
|
|||
}
|
||||
|
||||
// run the update
|
||||
client
|
||||
let net = client
|
||||
.modify_subnet(nzr_api::default_ctx(), net)
|
||||
.await
|
||||
.map_err(|err| format!("RPC error: {}", err))
|
||||
.and_then(|res| {
|
||||
res.map(|e| {
|
||||
println!("Subnet {} updated.", e.name);
|
||||
})
|
||||
})?;
|
||||
.simplify()?;
|
||||
println!("Subnet {} updated.", net.name);
|
||||
}
|
||||
NetCmd::Dump { name } => {
|
||||
let subnets = (client.get_subnets(nzr_api::default_ctx()).await?)?;
|
||||
|
|
|
@ -17,14 +17,17 @@ uuid = { version = "1.2.2", features = ["serde"] }
|
|||
hickory-proto = { version = "0.24", features = ["serde-config"] }
|
||||
log = "0.4.17"
|
||||
diesel = { version = "2.2", optional = true }
|
||||
futures = { version = "0.3", optional = true }
|
||||
futures = "0.3"
|
||||
thiserror = "1"
|
||||
regex = "1"
|
||||
lazy_static = "1"
|
||||
tracing = "0.1"
|
||||
tokio-serde = { version = "0.9", features = ["bincode"] }
|
||||
serde_json = "1"
|
||||
|
||||
[dev-dependencies]
|
||||
uuid = { version = "1.2.2", features = ["serde", "v4"] }
|
||||
|
||||
[features]
|
||||
diesel = ["dep:diesel"]
|
||||
mock = ["dep:futures"]
|
||||
mock = []
|
||||
|
|
|
@ -72,6 +72,7 @@ impl CloudConfig {
|
|||
pub struct RPCConfig {
|
||||
pub socket_path: PathBuf,
|
||||
pub admin_group: Option<String>,
|
||||
pub events_sock: PathBuf,
|
||||
}
|
||||
|
||||
/// The root configuration struct.
|
||||
|
@ -98,6 +99,7 @@ impl Default for Config {
|
|||
rpc: RPCConfig {
|
||||
socket_path: PathBuf::from("/var/run/nazrin/nzrd.sock"),
|
||||
admin_group: None,
|
||||
events_sock: PathBuf::from("/var/run/nazrin/events.sock"),
|
||||
},
|
||||
db_uri: "sqlite:/var/lib/nazrin/main_sql.db".to_owned(),
|
||||
libvirt_uri: match std::env::var("LIBVIRT_URI") {
|
||||
|
|
208
nzr-api/src/error.rs
Normal file
208
nzr-api/src/error.rs
Normal file
|
@ -0,0 +1,208 @@
|
|||
use std::fmt;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tarpc::client::RpcError;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
pub enum ErrorType {
|
||||
/// Entity was not found.
|
||||
NotFound,
|
||||
/// Error occurred with a database call.
|
||||
Database,
|
||||
/// Error occurred in a libvirt call.
|
||||
VirtError,
|
||||
/// Error occurred while parsing input.
|
||||
Parse,
|
||||
/// An unknown API error occurred.
|
||||
Other,
|
||||
}
|
||||
|
||||
impl ErrorType {
|
||||
fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
ErrorType::NotFound => "Entity not found",
|
||||
ErrorType::Database => "Database error",
|
||||
ErrorType::VirtError => "libvirt error",
|
||||
ErrorType::Parse => "Unable to parse input",
|
||||
ErrorType::Other => "Unknown API error",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ErrorType {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.as_str().fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ApiError {
|
||||
error_type: ErrorType,
|
||||
message: Option<String>,
|
||||
inner: Option<InnerError>,
|
||||
}
|
||||
|
||||
impl fmt::Display for ApiError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.message
|
||||
.as_deref()
|
||||
.unwrap_or_else(|| self.error_type.as_str())
|
||||
.fmt(f)?;
|
||||
if let Some(inner) = &self.inner {
|
||||
write!(f, ": {inner}")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for ApiError {}
|
||||
|
||||
impl ApiError {
|
||||
pub fn new<E>(error_type: ErrorType, message: impl Into<String>, err: E) -> Self
|
||||
where
|
||||
E: std::error::Error,
|
||||
{
|
||||
Self {
|
||||
error_type,
|
||||
message: Some(message.into()),
|
||||
inner: Some(err.into()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn err_type(&self) -> ErrorType {
|
||||
self.error_type
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ErrorType> for ApiError {
|
||||
fn from(value: ErrorType) -> Self {
|
||||
Self {
|
||||
error_type: value,
|
||||
message: None,
|
||||
inner: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<T> for ApiError
|
||||
where
|
||||
T: AsRef<str>,
|
||||
{
|
||||
fn from(value: T) -> Self {
|
||||
let value = value.as_ref();
|
||||
Self {
|
||||
error_type: ErrorType::Other,
|
||||
message: Some(value.to_owned()),
|
||||
inner: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct InnerError {
|
||||
error_debug: String,
|
||||
error_message: String,
|
||||
}
|
||||
|
||||
impl fmt::Debug for InnerError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.error_debug.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for InnerError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.error_message.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E> From<E> for InnerError
|
||||
where
|
||||
E: std::error::Error,
|
||||
{
|
||||
fn from(value: E) -> Self {
|
||||
Self {
|
||||
error_debug: format!("{value:?}"),
|
||||
error_message: format!("{value}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ToApiResult<T> {
|
||||
/// Converts the result's error type to [`ApiError`] with [`ErrorType::Other`].
|
||||
///
|
||||
/// [`ApiError`]: nzr-api::error::ApiError
|
||||
/// [`ErrorType::Other`]: nzr-api::error::ErrorType::Other
|
||||
fn to_api(self) -> Result<T, ApiError>;
|
||||
/// Converts the result's error type to [`ApiError`] with
|
||||
/// [`ErrorType::Other`], and the given context.
|
||||
///
|
||||
/// [`ApiError`]: nzr-api::error::ApiError
|
||||
/// [`ErrorType::Other`]: nzr-api::error::ErrorType::Other
|
||||
fn to_api_with(self, context: impl AsRef<str>) -> Result<T, ApiError>;
|
||||
/// Converts the result's error type to [`ApiError`] with the given
|
||||
/// [`ErrorType`] and context.
|
||||
///
|
||||
/// [`ApiError`]: nzr-api::error::ApiError
|
||||
/// [`ErrorType`]: nzr-api::error::ErrorType
|
||||
fn to_api_with_type(self, err_type: ErrorType, context: impl AsRef<str>)
|
||||
-> Result<T, ApiError>;
|
||||
/// Converts the result's error type to [`ApiError`] with the given
|
||||
/// [`ErrorType`].
|
||||
///
|
||||
/// [`ApiError`]: nzr-api::error::ApiError
|
||||
/// [`ErrorType`]: nzr-api::error::ErrorType
|
||||
fn to_api_type(self, err_type: ErrorType) -> Result<T, ApiError>;
|
||||
}
|
||||
|
||||
impl<T, E> ToApiResult<T> for Result<T, E>
|
||||
where
|
||||
E: std::error::Error + 'static,
|
||||
{
|
||||
fn to_api(self) -> Result<T, ApiError> {
|
||||
self.map_err(|e| ApiError {
|
||||
error_type: ErrorType::Other,
|
||||
message: None,
|
||||
inner: Some(e.into()),
|
||||
})
|
||||
}
|
||||
|
||||
fn to_api_with(self, context: impl AsRef<str>) -> Result<T, ApiError> {
|
||||
self.map_err(|e| ApiError {
|
||||
error_type: ErrorType::Other,
|
||||
message: Some(context.as_ref().to_owned()),
|
||||
inner: Some(e.into()),
|
||||
})
|
||||
}
|
||||
|
||||
fn to_api_type(self, err_type: ErrorType) -> Result<T, ApiError> {
|
||||
self.map_err(|e| ApiError {
|
||||
error_type: err_type,
|
||||
message: None,
|
||||
inner: Some(e.into()),
|
||||
})
|
||||
}
|
||||
|
||||
fn to_api_with_type(
|
||||
self,
|
||||
err_type: ErrorType,
|
||||
context: impl AsRef<str>,
|
||||
) -> Result<T, ApiError> {
|
||||
self.map_err(|e| ApiError {
|
||||
error_type: err_type,
|
||||
message: Some(context.as_ref().to_owned()),
|
||||
inner: Some(e.into()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Simplify<T> {
|
||||
fn simplify(self) -> Result<T, ApiError>;
|
||||
}
|
||||
|
||||
impl<T> Simplify<T> for Result<Result<T, ApiError>, RpcError> {
|
||||
/// Flattens a Result of `RpcError` and `ApiError` to just `ApiError`.
|
||||
fn simplify(self) -> Result<T, ApiError> {
|
||||
self.to_api_with("RPC Error").and_then(|r| r)
|
||||
}
|
||||
}
|
53
nzr-api/src/event/client.rs
Normal file
53
nzr-api/src/event/client.rs
Normal file
|
@ -0,0 +1,53 @@
|
|||
use std::{pin::Pin, task::Poll};
|
||||
|
||||
use futures::{Stream, TryStreamExt};
|
||||
use tarpc::tokio_util::codec::{FramedRead, LengthDelimitedCodec};
|
||||
use tokio::io::AsyncRead;
|
||||
|
||||
use super::{EventError, EventMessage};
|
||||
|
||||
/// Client for receiving various events emitted by Nazrin.
|
||||
pub struct EventClient<T>
|
||||
where
|
||||
T: AsyncRead,
|
||||
{
|
||||
transport: Pin<Box<FramedRead<T, LengthDelimitedCodec>>>,
|
||||
}
|
||||
|
||||
impl<T> EventClient<T>
|
||||
where
|
||||
T: AsyncRead,
|
||||
{
|
||||
/// Creates a new EventClient.
|
||||
pub fn new(inner: T) -> Self {
|
||||
let transport = FramedRead::new(inner, LengthDelimitedCodec::new());
|
||||
Self {
|
||||
transport: Box::pin(transport),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Stream for EventClient<T>
|
||||
where
|
||||
T: AsyncRead,
|
||||
{
|
||||
type Item = Result<EventMessage, EventError>;
|
||||
|
||||
fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
match self.as_mut().transport.try_poll_next_unpin(cx) {
|
||||
Poll::Ready(res) => {
|
||||
let our_res = res.map(|res| {
|
||||
res.map_err(|e| e.into()).and_then(|bytes| {
|
||||
let msg: EventMessage = serde_json::from_slice(&bytes)?;
|
||||
Ok(msg)
|
||||
})
|
||||
});
|
||||
Poll::Ready(our_res)
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
77
nzr-api/src/event/mod.rs
Normal file
77
nzr-api/src/event/mod.rs
Normal file
|
@ -0,0 +1,77 @@
|
|||
pub mod client;
|
||||
pub mod server;
|
||||
#[cfg(test)]
|
||||
mod test;
|
||||
|
||||
use std::io;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::model;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub enum ResourceAction {
|
||||
/// The referenced resource was created.
|
||||
Created,
|
||||
/// The referenced resource was deleted, and is no longer available.
|
||||
Deleted,
|
||||
/// The referenced resource was modified in some way.
|
||||
Modified,
|
||||
}
|
||||
|
||||
/// Represents an event pertaining to a specific action.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct ResourceEvent<T> {
|
||||
pub action: ResourceAction,
|
||||
/// The entity that was acted upon.
|
||||
pub entity: T,
|
||||
}
|
||||
|
||||
/// Represents any event that is emitted by Nazrin.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(tag = "event")]
|
||||
pub enum EventMessage {
|
||||
/// A subnet was created, modified, or deleted.
|
||||
Subnet(ResourceEvent<model::Subnet>),
|
||||
/// An instance was created, modified, or deleted.
|
||||
Instance(ResourceEvent<model::Instance>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum EventError {
|
||||
#[error("Transport error: {0}")]
|
||||
Transport(#[from] io::Error),
|
||||
#[error("Serialization error: {0}")]
|
||||
Json(#[from] serde_json::Error),
|
||||
}
|
||||
|
||||
pub trait Emittable {
|
||||
fn as_event(&self, action: ResourceAction) -> EventMessage;
|
||||
}
|
||||
|
||||
macro_rules! emittable {
|
||||
($t:ty, $msg:ident) => {
|
||||
impl Emittable for $t {
|
||||
fn as_event(&self, action: ResourceAction) -> EventMessage {
|
||||
EventMessage::$msg(ResourceEvent {
|
||||
action,
|
||||
entity: self.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
emittable!(model::Instance, Instance);
|
||||
emittable!(model::Subnet, Subnet);
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! nzr_event {
|
||||
($srv:expr, $act:ident, $ent:tt) => {{
|
||||
use $crate::event::Emittable;
|
||||
|
||||
$srv.emit($ent.as_event($crate::event::ResourceAction::$act))
|
||||
.await
|
||||
}};
|
||||
}
|
191
nzr-api/src/event/server.rs
Normal file
191
nzr-api/src/event/server.rs
Normal file
|
@ -0,0 +1,191 @@
|
|||
use futures::Future;
|
||||
use std::{fmt, io, pin::Pin};
|
||||
|
||||
use futures::SinkExt;
|
||||
use tarpc::tokio_util::codec::{FramedWrite, LengthDelimitedCodec};
|
||||
use tokio::{
|
||||
io::AsyncWrite,
|
||||
sync::broadcast::{self, Receiver, Sender},
|
||||
};
|
||||
use tracing::instrument;
|
||||
|
||||
use super::EventMessage;
|
||||
|
||||
/// Representation of multiple types of SocketAddrs, because you can't have just
|
||||
/// one!
|
||||
#[derive(Debug)]
|
||||
pub enum SocketAddr {
|
||||
#[cfg(unix)]
|
||||
TokioUnix(tokio::net::unix::SocketAddr),
|
||||
#[cfg(unix)]
|
||||
Unix(std::os::unix::net::SocketAddr),
|
||||
Net(std::net::SocketAddr),
|
||||
#[cfg(test)]
|
||||
None,
|
||||
}
|
||||
|
||||
impl fmt::Display for SocketAddr {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
#[cfg(unix)]
|
||||
Self::TokioUnix(addr) => std::fmt::Debug::fmt(addr, f),
|
||||
#[cfg(unix)]
|
||||
Self::Unix(addr) => std::fmt::Debug::fmt(addr, f),
|
||||
Self::Net(addr) => addr.fmt(f),
|
||||
#[cfg(test)]
|
||||
Self::None => write!(f, "mock client"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! as_sockaddr {
|
||||
($id:ident, $t:ty) => {
|
||||
impl From<$t> for SocketAddr {
|
||||
fn from(value: $t) -> Self {
|
||||
Self::$id(value)
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
as_sockaddr!(TokioUnix, tokio::net::unix::SocketAddr);
|
||||
#[cfg(unix)]
|
||||
as_sockaddr!(Unix, std::os::unix::net::SocketAddr);
|
||||
as_sockaddr!(Net, std::net::SocketAddr);
|
||||
|
||||
/// Represents a connection to a client. Instead of being owned by the server
|
||||
/// struct, a [`tokio::sync::broadcast::Receiver`] is used to get the serialized
|
||||
/// message and pass it to the client.
|
||||
///
|
||||
/// [`tokio::sync::broadcast::Receiver`]: tokio::sync::broadcast::Receiver
|
||||
struct EventEmitter<T>
|
||||
where
|
||||
T: AsyncWrite + Send + 'static,
|
||||
{
|
||||
transport: Pin<Box<FramedWrite<T, LengthDelimitedCodec>>>,
|
||||
client_addr: SocketAddr,
|
||||
channel: Receiver<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl<T> EventEmitter<T>
|
||||
where
|
||||
T: AsyncWrite + Send + 'static,
|
||||
{
|
||||
fn new(inner: T, client_addr: SocketAddr, channel: Receiver<Vec<u8>>) -> Self {
|
||||
let transport = FramedWrite::new(inner, LengthDelimitedCodec::new());
|
||||
Self {
|
||||
transport: Box::pin(transport),
|
||||
client_addr,
|
||||
channel,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self), fields(client = %self.client_addr))]
|
||||
async fn handler(&mut self) -> bool {
|
||||
match self.channel.recv().await {
|
||||
Ok(msg) => {
|
||||
if let Err(err) = self.transport.send(msg.into()).await {
|
||||
tracing::error!("Couldn't write to client: {err}");
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("IPC error: {err}");
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn run(mut self) {
|
||||
tokio::spawn(async move { while self.handler().await {} });
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles the creation and sending of events to clients.
|
||||
pub struct EventServer {
|
||||
channel: Sender<Vec<u8>>,
|
||||
}
|
||||
|
||||
// TODO: consider letting this be configurable
|
||||
const MAX_RECEIVERS: usize = 16;
|
||||
|
||||
impl EventServer {
|
||||
/// Creates a new EventServer.
|
||||
pub fn new() -> Self {
|
||||
let (channel, _) = broadcast::channel(MAX_RECEIVERS);
|
||||
Self { channel }
|
||||
}
|
||||
|
||||
/// Returns a future that returns [`Poll::Pending`] until the client count falls below the threshold.
|
||||
pub fn until_available(&self) -> EventServerAvailability<'_> {
|
||||
EventServerAvailability { parent: self }
|
||||
}
|
||||
|
||||
/// Whether we're able to take connections.
|
||||
#[inline]
|
||||
fn is_available(&self) -> bool {
|
||||
self.channel.receiver_count() < MAX_RECEIVERS
|
||||
}
|
||||
|
||||
/// Spawns a new [`EventEmitter`] where events will be sent to.
|
||||
pub async fn spawn<T: AsyncWrite + Send + 'static>(
|
||||
&self,
|
||||
inner: T,
|
||||
client_addr: impl Into<SocketAddr>,
|
||||
) -> io::Result<()> {
|
||||
// Sender<T> doesn't have a try_subscribe, so this is our last-ditch
|
||||
// effort to avoid a panic
|
||||
if !self.is_available() {
|
||||
return Err(io::Error::new(io::ErrorKind::Other, "Too many connections"));
|
||||
}
|
||||
|
||||
EventEmitter::new(inner, client_addr.into(), self.channel.subscribe()).run();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send the given event to all connected clients.
|
||||
pub async fn emit(&self, msg: EventMessage) {
|
||||
let bytes = match serde_json::to_vec(&msg) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(err) => {
|
||||
tracing::error!("Failed to serialize: {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if self.channel.send(bytes).is_err() {
|
||||
tracing::debug!("Tried to emit an event, but no clients were around to hear it");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for EventServer {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct EventServerAvailability<'a> {
|
||||
parent: &'a EventServer,
|
||||
}
|
||||
|
||||
impl<'a> Future for EventServerAvailability<'a> {
|
||||
type Output = ();
|
||||
|
||||
fn poll(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Self::Output> {
|
||||
use std::task::Poll;
|
||||
|
||||
if self.parent.is_available() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
43
nzr-api/src/event/test.rs
Normal file
43
nzr-api/src/event/test.rs
Normal file
|
@ -0,0 +1,43 @@
|
|||
use std::str::FromStr;
|
||||
|
||||
use futures::StreamExt;
|
||||
|
||||
use crate::{
|
||||
event::{server::SocketAddr, EventMessage, ResourceAction},
|
||||
net::cidr::CidrV4,
|
||||
nzr_event,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn event_serde() {
|
||||
let (rx, tx) = tokio::io::duplex(1024);
|
||||
let server = super::server::EventServer::default();
|
||||
server.spawn(tx, SocketAddr::None).await.unwrap();
|
||||
let mut client = super::client::EventClient::new(rx);
|
||||
let net = CidrV4::from_str("192.0.2.0/24").unwrap();
|
||||
let some_subnet = crate::model::Subnet {
|
||||
name: "whatever".into(),
|
||||
data: crate::model::SubnetData {
|
||||
ifname: "eth0".into(),
|
||||
network: net,
|
||||
start_host: net.make_ip(10).unwrap(),
|
||||
end_host: net.make_ip(254).unwrap(),
|
||||
gateway4: Some(net.make_ip(1).unwrap()),
|
||||
dns: Vec::new(),
|
||||
domain_name: Some("homestarrunnner.net".parse().unwrap()),
|
||||
vlan_id: None,
|
||||
},
|
||||
};
|
||||
nzr_event!(server, Created, some_subnet);
|
||||
let next = client
|
||||
.next()
|
||||
.await
|
||||
.expect("client must receive message")
|
||||
.unwrap();
|
||||
|
||||
let EventMessage::Subnet(net) = next else {
|
||||
panic!("Unexpected event received: {next:?}");
|
||||
};
|
||||
|
||||
assert_eq!(net.action, ResourceAction::Created);
|
||||
}
|
|
@ -1,9 +1,12 @@
|
|||
use std::net::Ipv4Addr;
|
||||
|
||||
use error::ApiError;
|
||||
use model::{CreateStatus, Instance, SshPubkey, Subnet};
|
||||
|
||||
pub mod args;
|
||||
pub mod config;
|
||||
pub mod error;
|
||||
pub mod event;
|
||||
#[cfg(feature = "mock")]
|
||||
pub mod mock;
|
||||
pub mod model;
|
||||
|
@ -23,40 +26,40 @@ pub enum InstanceQuery {
|
|||
#[tarpc::service]
|
||||
pub trait Nazrin {
|
||||
/// Creates a new instance.
|
||||
async fn new_instance(build_args: args::NewInstance) -> Result<uuid::Uuid, String>;
|
||||
async fn new_instance(build_args: args::NewInstance) -> Result<uuid::Uuid, ApiError>;
|
||||
/// Poll for the current status of an instance being created.
|
||||
async fn poll_new_instance(task_id: uuid::Uuid) -> Option<CreateStatus>;
|
||||
/// Deletes an existing instance.
|
||||
///
|
||||
/// This should involve deleting all related disks and clearing
|
||||
/// the lease information from the subnet data, if any.
|
||||
async fn delete_instance(name: String) -> Result<(), String>;
|
||||
async fn delete_instance(name: String) -> Result<(), ApiError>;
|
||||
/// Gets a single instance by the given InstanceQuery.
|
||||
async fn find_instance(query: InstanceQuery) -> Result<Option<Instance>, String>;
|
||||
async fn find_instance(query: InstanceQuery) -> Result<Option<Instance>, ApiError>;
|
||||
/// Gets a list of existing instances.
|
||||
async fn get_instances(with_status: bool) -> Result<Vec<Instance>, String>;
|
||||
async fn get_instances(with_status: bool) -> Result<Vec<Instance>, ApiError>;
|
||||
/// Cleans up unusable entries in the database.
|
||||
async fn garbage_collect() -> Result<(), String>;
|
||||
async fn garbage_collect() -> Result<(), ApiError>;
|
||||
/// Creates a new subnet.
|
||||
///
|
||||
/// Unlike instances, subnets shouldn't perform any changes to the
|
||||
/// interfaces they reference. This should be used primarily for
|
||||
/// ease-of-use and bookkeeping (e.g., assigning dynamic leases).
|
||||
async fn new_subnet(build_args: Subnet) -> Result<Subnet, String>;
|
||||
async fn new_subnet(build_args: Subnet) -> Result<Subnet, ApiError>;
|
||||
/// Modifies an existing subnet.
|
||||
async fn modify_subnet(edit_args: Subnet) -> Result<Subnet, String>;
|
||||
async fn modify_subnet(edit_args: Subnet) -> Result<Subnet, ApiError>;
|
||||
/// Gets a list of existing subnets.
|
||||
async fn get_subnets() -> Result<Vec<Subnet>, String>;
|
||||
async fn get_subnets() -> Result<Vec<Subnet>, ApiError>;
|
||||
/// Deletes an existing subnet.
|
||||
async fn delete_subnet(interface: String) -> Result<(), String>;
|
||||
async fn delete_subnet(interface: String) -> Result<(), ApiError>;
|
||||
/// Gets the cloud-init user-data for the given instance.
|
||||
async fn get_instance_userdata(id: i32) -> Result<Vec<u8>, String>;
|
||||
async fn get_instance_userdata(id: i32) -> Result<Vec<u8>, ApiError>;
|
||||
/// Gets all SSH keys stored in the database.
|
||||
async fn get_ssh_pubkeys() -> Result<Vec<SshPubkey>, String>;
|
||||
async fn get_ssh_pubkeys() -> Result<Vec<SshPubkey>, ApiError>;
|
||||
/// Adds a new SSH public key to the database.
|
||||
async fn add_ssh_pubkey(pub_key: String) -> Result<SshPubkey, String>;
|
||||
async fn add_ssh_pubkey(pub_key: String) -> Result<SshPubkey, ApiError>;
|
||||
/// Deletes an SSH public key from the database.
|
||||
async fn delete_ssh_pubkey(id: i32) -> Result<(), String>;
|
||||
async fn delete_ssh_pubkey(id: i32) -> Result<(), ApiError>;
|
||||
}
|
||||
|
||||
/// Create a new NazrinClient.
|
||||
|
|
|
@ -1,20 +1,20 @@
|
|||
use std::net::Ipv4Addr;
|
||||
|
||||
use crate::{args, model, net::cidr::CidrV4};
|
||||
use crate::{args, error::ApiError, model, net::cidr::CidrV4};
|
||||
|
||||
pub trait NzrClientExt {
|
||||
#[allow(async_fn_in_trait)]
|
||||
async fn new_mock_instance(
|
||||
&mut self,
|
||||
name: impl AsRef<str>,
|
||||
) -> Result<Result<model::Instance, String>, crate::RpcError>;
|
||||
) -> Result<Result<model::Instance, ApiError>, crate::RpcError>;
|
||||
}
|
||||
|
||||
impl NzrClientExt for crate::NazrinClient {
|
||||
async fn new_mock_instance(
|
||||
&mut self,
|
||||
name: impl AsRef<str>,
|
||||
) -> Result<Result<model::Instance, String>, crate::RpcError> {
|
||||
) -> Result<Result<model::Instance, ApiError>, crate::RpcError> {
|
||||
let name = name.as_ref().to_owned();
|
||||
|
||||
let subnet = self
|
||||
|
|
|
@ -10,6 +10,7 @@ use futures::{future, StreamExt};
|
|||
use tokio::{sync::RwLock, task::JoinHandle};
|
||||
|
||||
use crate::{
|
||||
error::{ApiError, ErrorType},
|
||||
model,
|
||||
net::{cidr::CidrV4, mac::MacAddr},
|
||||
InstanceQuery, Nazrin, NazrinClient,
|
||||
|
@ -62,14 +63,14 @@ impl Nazrin for MockServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
build_args: crate::args::NewInstance,
|
||||
) -> Result<uuid::Uuid, String> {
|
||||
) -> Result<uuid::Uuid, ApiError> {
|
||||
let mut db = self.db.write().await;
|
||||
let Some(net_pos) = db
|
||||
.subnets
|
||||
.iter()
|
||||
.position(|s| s.as_ref().filter(|s| s.name == build_args.subnet).is_some())
|
||||
else {
|
||||
return Err("Subnet doesn't exist".to_owned());
|
||||
return Err("Subnet doesn't exist".into());
|
||||
};
|
||||
let subnet = db.subnets[net_pos].as_ref().unwrap().clone();
|
||||
let cur_lease = *(db
|
||||
|
@ -134,7 +135,11 @@ impl Nazrin for MockServer {
|
|||
}
|
||||
}
|
||||
|
||||
async fn delete_instance(self, _: tarpc::context::Context, name: String) -> Result<(), String> {
|
||||
async fn delete_instance(
|
||||
self,
|
||||
_: tarpc::context::Context,
|
||||
name: String,
|
||||
) -> Result<(), ApiError> {
|
||||
let mut db = self.db.write().await;
|
||||
let Some(inst) = db
|
||||
.instances
|
||||
|
@ -142,7 +147,7 @@ impl Nazrin for MockServer {
|
|||
.find(|i| i.as_ref().filter(|i| i.name == name).is_some())
|
||||
.take()
|
||||
else {
|
||||
return Err("Instance doesn't exist".to_owned());
|
||||
return Err("Instance doesn't exist".into());
|
||||
};
|
||||
inst.take();
|
||||
Ok(())
|
||||
|
@ -152,7 +157,7 @@ impl Nazrin for MockServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
query: crate::InstanceQuery,
|
||||
) -> Result<Option<crate::model::Instance>, String> {
|
||||
) -> Result<Option<crate::model::Instance>, ApiError> {
|
||||
let db = self.db.read().await;
|
||||
|
||||
let res = {
|
||||
|
@ -177,7 +182,7 @@ impl Nazrin for MockServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
id: i32,
|
||||
) -> Result<Vec<u8>, String> {
|
||||
) -> Result<Vec<u8>, ApiError> {
|
||||
let db = self.db.read().await;
|
||||
let Some(inst) = db
|
||||
.instances
|
||||
|
@ -185,7 +190,7 @@ impl Nazrin for MockServer {
|
|||
.find(|i| i.as_ref().map(|i| i.id == id).is_some())
|
||||
.and_then(|o| o.as_ref())
|
||||
else {
|
||||
return Err("No such instance".to_owned());
|
||||
return Err("No such instance".into());
|
||||
};
|
||||
Ok(db.ci_userdatas.get(&inst.name).cloned().unwrap_or_default())
|
||||
}
|
||||
|
@ -194,7 +199,7 @@ impl Nazrin for MockServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
_with_status: bool,
|
||||
) -> Result<Vec<crate::model::Instance>, String> {
|
||||
) -> Result<Vec<crate::model::Instance>, ApiError> {
|
||||
let db = self.db.read().await;
|
||||
Ok(db
|
||||
.instances
|
||||
|
@ -207,7 +212,7 @@ impl Nazrin for MockServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
build_args: crate::model::Subnet,
|
||||
) -> Result<crate::model::Subnet, String> {
|
||||
) -> Result<crate::model::Subnet, ApiError> {
|
||||
let mut db = self.db.write().await;
|
||||
let subnet = build_args.clone();
|
||||
db.subnets.push(Some(build_args));
|
||||
|
@ -218,14 +223,14 @@ impl Nazrin for MockServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
_edit_args: crate::model::Subnet,
|
||||
) -> Result<crate::model::Subnet, String> {
|
||||
) -> Result<crate::model::Subnet, ApiError> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn get_subnets(
|
||||
self,
|
||||
_: tarpc::context::Context,
|
||||
) -> Result<Vec<crate::model::Subnet>, String> {
|
||||
) -> Result<Vec<crate::model::Subnet>, ApiError> {
|
||||
let db = self.db.read().await;
|
||||
Ok(db.subnets.iter().filter_map(|net| net.clone()).collect())
|
||||
}
|
||||
|
@ -234,35 +239,40 @@ impl Nazrin for MockServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
interface: String,
|
||||
) -> Result<(), String> {
|
||||
) -> Result<(), ApiError> {
|
||||
let mut db = self.db.write().await;
|
||||
db.instances
|
||||
.iter()
|
||||
.filter_map(|inst| inst.as_ref())
|
||||
.for_each(|inst| {
|
||||
if inst.lease.subnet == interface {
|
||||
todo!("what now")
|
||||
}
|
||||
});
|
||||
let Some(subnet) = db
|
||||
.subnets
|
||||
.iter_mut()
|
||||
.find(|net| net.as_ref().filter(|n| n.name == interface).is_some())
|
||||
else {
|
||||
return Err("Subnet doesn't exist".to_owned());
|
||||
};
|
||||
subnet.take();
|
||||
{
|
||||
let Some(subnet) = db
|
||||
.subnets
|
||||
.iter_mut()
|
||||
.find(|net| net.as_ref().filter(|n| n.name == interface).is_some())
|
||||
else {
|
||||
return Err(ErrorType::NotFound.into());
|
||||
};
|
||||
subnet.take();
|
||||
}
|
||||
// Drop all instances that belong to this subnet
|
||||
db.instances.iter_mut().for_each(|inst| {
|
||||
if inst
|
||||
.as_mut()
|
||||
.filter(|inst| inst.lease.subnet != interface)
|
||||
.is_some()
|
||||
{
|
||||
inst.take();
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn garbage_collect(self, _: tarpc::context::Context) -> Result<(), String> {
|
||||
todo!()
|
||||
async fn garbage_collect(self, _: tarpc::context::Context) -> Result<(), ApiError> {
|
||||
// no libvirt to compare against, no instances to GC
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_ssh_pubkeys(
|
||||
self,
|
||||
_: tarpc::context::Context,
|
||||
) -> Result<Vec<model::SshPubkey>, String> {
|
||||
) -> Result<Vec<model::SshPubkey>, ApiError> {
|
||||
let db = self.db.read().await;
|
||||
|
||||
Ok(db
|
||||
|
@ -276,7 +286,7 @@ impl Nazrin for MockServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
pub_key: String,
|
||||
) -> Result<model::SshPubkey, String> {
|
||||
) -> Result<model::SshPubkey, ApiError> {
|
||||
let mut key_model = model::SshPubkey::from_str(&pub_key).map_err(|e| e.to_string())?;
|
||||
let mut db = self.db.write().await;
|
||||
key_model.id = Some(db.ssh_keys.len() as i32);
|
||||
|
@ -284,7 +294,7 @@ impl Nazrin for MockServer {
|
|||
Ok(key_model)
|
||||
}
|
||||
|
||||
async fn delete_ssh_pubkey(self, _: tarpc::context::Context, id: i32) -> Result<(), String> {
|
||||
async fn delete_ssh_pubkey(self, _: tarpc::context::Context, id: i32) -> Result<(), ApiError> {
|
||||
let mut db = self.db.write().await;
|
||||
if let Some(key) = db.ssh_keys.get_mut(id as usize) {
|
||||
key.take();
|
||||
|
|
|
@ -5,7 +5,10 @@ use serde::{Deserialize, Serialize};
|
|||
use std::{fmt, net::Ipv4Addr};
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::net::{cidr::CidrV4, mac::MacAddr};
|
||||
use crate::{
|
||||
error::ApiError,
|
||||
net::{cidr::CidrV4, mac::MacAddr},
|
||||
};
|
||||
|
||||
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
|
||||
#[repr(u32)]
|
||||
|
@ -67,7 +70,7 @@ impl fmt::Display for DomainState {
|
|||
pub struct CreateStatus {
|
||||
pub status_text: String,
|
||||
pub completion: f32,
|
||||
pub result: Option<Result<Instance, String>>,
|
||||
pub result: Option<Result<Instance, ApiError>>,
|
||||
}
|
||||
|
||||
/// Struct representing a VM instance.
|
||||
|
|
|
@ -26,9 +26,8 @@ tarpc = { version = "0.34", features = [
|
|||
] }
|
||||
|
||||
# Logging
|
||||
# TODO: switch to tracing?
|
||||
log = "0.4.17"
|
||||
syslog = "7"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
|
||||
# Database
|
||||
diesel = { version = "2.2", features = [
|
||||
|
|
|
@ -1,23 +1 @@
|
|||
pub mod net;
|
||||
pub mod vm;
|
||||
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CommandError(String);
|
||||
|
||||
impl fmt::Display for CommandError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for CommandError {}
|
||||
|
||||
macro_rules! cmd_error {
|
||||
($($arg:tt)*) => {
|
||||
Box::new(CommandError(format!($($arg)*)))
|
||||
};
|
||||
}
|
||||
|
||||
pub(crate) use cmd_error;
|
||||
|
|
|
@ -1,47 +0,0 @@
|
|||
use super::*;
|
||||
use crate::ctx::Context;
|
||||
use crate::model::tx::Transaction;
|
||||
use crate::model::Subnet;
|
||||
use nzr_api::model;
|
||||
|
||||
pub async fn add_subnet(
|
||||
ctx: &Context,
|
||||
args: model::Subnet,
|
||||
) -> Result<Subnet, Box<dyn std::error::Error>> {
|
||||
let subnet = {
|
||||
let s = Subnet::insert(ctx, args.name, args.data)
|
||||
.await
|
||||
.map_err(|er| cmd_error!("Couldn't generate subnet: {}", er))?;
|
||||
Transaction::begin(ctx, s)
|
||||
};
|
||||
|
||||
if let Err(err) = ctx.zones.new_zone(&subnet).await {
|
||||
Err(cmd_error!("Failed to create new DNS zone: {}", err))
|
||||
} else {
|
||||
Ok(subnet.take())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_subnet(
|
||||
ctx: &Context,
|
||||
name: impl AsRef<str>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
match Subnet::get_by_name(ctx, name.as_ref())
|
||||
.await
|
||||
.map_err(|er| cmd_error!("Couldn't find subnet: {}", er))?
|
||||
{
|
||||
Some(subnet) => {
|
||||
if let Some(domain_name) = &subnet.domain_name {
|
||||
ctx.zones.delete_zone(domain_name).await;
|
||||
}
|
||||
|
||||
subnet
|
||||
.delete(ctx)
|
||||
.await
|
||||
.map_err(|er| cmd_error!("Couldn't fully delete subnet entry: {}", er))
|
||||
}
|
||||
None => Err(cmd_error!("Subnet not found")),
|
||||
}?;
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -1,3 +1,4 @@
|
|||
use nzr_api::error::{ApiError, ErrorType, ToApiResult};
|
||||
use nzr_api::net::cidr::CidrV4;
|
||||
use nzr_virt::error::DomainError;
|
||||
use nzr_virt::xml::build::DomainBuilder;
|
||||
|
@ -5,14 +6,14 @@ use nzr_virt::xml::{self, InfoMap, SerialType, Sysinfo};
|
|||
use nzr_virt::{datasize, dom, vol};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::*;
|
||||
use crate::ctrl::vm::Progress;
|
||||
use crate::ctx::Context;
|
||||
use crate::model::tx::Transaction;
|
||||
use crate::model::{Instance, Subnet};
|
||||
use log::{debug, info, warn};
|
||||
use nzr_api::args;
|
||||
use nzr_api::net::mac::MacAddr;
|
||||
use nzr_api::{args, model, nzr_event};
|
||||
use std::sync::Arc;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
const VIRT_MAC_OUI: &[u8] = &[0x02, 0xf1, 0x0f];
|
||||
|
||||
|
@ -29,23 +30,21 @@ pub async fn new_instance(
|
|||
ctx: Context,
|
||||
prog_task: Arc<RwLock<Progress>>,
|
||||
args: &args::NewInstance,
|
||||
) -> Result<(Instance, dom::Domain), Box<dyn std::error::Error>> {
|
||||
) -> Result<(Instance, dom::Domain), ApiError> {
|
||||
progress!(prog_task, 0.0, "Starting...");
|
||||
// find the subnet corresponding to the interface
|
||||
let subnet = Subnet::get_by_name(&ctx, &args.subnet)
|
||||
.await
|
||||
.map_err(|er| cmd_error!("Unable to get interface: {}", er))?
|
||||
.ok_or(cmd_error!(
|
||||
"Subnet {} wasn't found in database",
|
||||
&args.subnet
|
||||
))?;
|
||||
.to_api_with("Unable to get interface")?
|
||||
.ok_or::<ApiError>(format!("Subnet {} wasn't found in database", &args.subnet).into())?;
|
||||
|
||||
// bail if a domain already exists
|
||||
if let Ok(dom) = ctx.virt.conn.get_instance(&args.name).await {
|
||||
Err(cmd_error!(
|
||||
Err(format!(
|
||||
"Domain with name already exists (uuid {})",
|
||||
dom.xml().await.uuid,
|
||||
))
|
||||
)
|
||||
.into())
|
||||
} else {
|
||||
// make sure the base image exists
|
||||
let mut base_image = ctx
|
||||
|
@ -54,7 +53,7 @@ pub async fn new_instance(
|
|||
.baseimg
|
||||
.volume(&args.base_image)
|
||||
.await
|
||||
.map_err(|er| cmd_error!("Couldn't find base image: {}", er))?;
|
||||
.to_api_with("Couldn't find base image")?;
|
||||
progress!(prog_task, 10.0, "Generating metadata...");
|
||||
|
||||
// generate a new lease with a new MAC addr
|
||||
|
@ -62,19 +61,23 @@ pub async fn new_instance(
|
|||
let bytes = [VIRT_MAC_OUI, rand::random::<[u8; 3]>().as_ref()].concat();
|
||||
MacAddr::from_bytes(bytes)
|
||||
}
|
||||
.map_err(|er| cmd_error!("Unable to create a new MAC address: {}", er))?;
|
||||
.to_api_with("Unable to create a new MAC address")?;
|
||||
|
||||
// Get highest host addr + 1 for our new addr
|
||||
let addr = {
|
||||
let addr_num = Instance::all_in_subnet(&ctx, &subnet)
|
||||
.await?
|
||||
.await
|
||||
.to_api_with("Couldn't get instances in subnet")?
|
||||
.into_iter()
|
||||
.max_by(|a, b| a.host_num.cmp(&b.host_num))
|
||||
.map_or(subnet.start_host, |i| i.host_num + 1);
|
||||
if addr_num > subnet.end_host || addr_num < subnet.start_host {
|
||||
Err(cmd_error!("Got invalid lease address for instance"))?;
|
||||
return Err("Got invalid lease address for instance".into());
|
||||
}
|
||||
let addr = subnet.network.make_ip(addr_num as u32)?;
|
||||
let addr = subnet
|
||||
.network
|
||||
.make_ip(addr_num as u32)
|
||||
.to_api_with("Unable to generate instance IP")?;
|
||||
CidrV4::new(addr, subnet.network.cidr())
|
||||
};
|
||||
|
||||
|
@ -85,7 +88,12 @@ pub async fn new_instance(
|
|||
};
|
||||
|
||||
// generate cloud-init data
|
||||
let db_inst = Instance::insert(&ctx, &args.name, &subnet, lease.clone(), None).await?;
|
||||
let db_inst = {
|
||||
let inst = Instance::insert(&ctx, &args.name, &subnet, lease.clone(), None)
|
||||
.await
|
||||
.to_api_type(ErrorType::Database)?;
|
||||
Transaction::begin(&ctx, inst)
|
||||
};
|
||||
|
||||
progress!(prog_task, 30.0, "Creating instance images...");
|
||||
// create primary volume from base image
|
||||
|
@ -96,7 +104,7 @@ pub async fn new_instance(
|
|||
datasize!((args.disk_sizes.0) GiB),
|
||||
)
|
||||
.await
|
||||
.map_err(|er| cmd_error!("Failed to clone base image: {}", er))?;
|
||||
.to_api_with("Failed to clone base image")?;
|
||||
|
||||
// and, if it exists: the second volume
|
||||
let sec_vol = match args.disk_sizes.1 {
|
||||
|
@ -104,7 +112,11 @@ pub async fn new_instance(
|
|||
let voldata =
|
||||
// TODO: Fix VolType
|
||||
xml::Volume::new(&args.name, xml::VolType::Qcow2, datasize!(sec_size GiB));
|
||||
Some(vol::Volume::create(&ctx.virt.pools.secondary, voldata, 0).await?)
|
||||
Some(
|
||||
vol::Volume::create(&ctx.virt.pools.secondary, voldata, 0)
|
||||
.await
|
||||
.to_api_with("Couldn't create secondary volume")?,
|
||||
)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
@ -168,15 +180,20 @@ pub async fn new_instance(
|
|||
.build()
|
||||
};
|
||||
|
||||
let mut virt_dom = ctx.virt.conn.define_instance(dom_xml).await?;
|
||||
let mut virt_dom = ctx
|
||||
.virt
|
||||
.conn
|
||||
.define_instance(dom_xml)
|
||||
.await
|
||||
.to_api_with("Couldn't define libvirt instance")?;
|
||||
|
||||
// not a fatal error, we can set autostart afterward
|
||||
if let Err(er) = virt_dom.autostart(true).await {
|
||||
warn!("Couldn't set autostart for domain: {}", er);
|
||||
if let Err(err) = virt_dom.autostart(true).await {
|
||||
warn!("Couldn't set autostart for domain: {err}");
|
||||
}
|
||||
|
||||
if let Err(er) = virt_dom.start().await {
|
||||
warn!("Domain defined, but couldn't be started! Error: {}", er);
|
||||
if let Err(err) = virt_dom.start().await {
|
||||
warn!("Domain defined, but couldn't be started! Error: {err}");
|
||||
}
|
||||
|
||||
// set all volumes to persistent to avoid deletion
|
||||
|
@ -188,40 +205,76 @@ pub async fn new_instance(
|
|||
|
||||
progress!(prog_task, 80.0, "Domain created!");
|
||||
debug!("Domain {} created!", virt_dom.xml().await.name.as_str());
|
||||
Ok((db_inst, virt_dom))
|
||||
Ok((db_inst.take(), virt_dom))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_instance(ctx: Context, name: String) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let Some(inst_db) = Instance::get_by_name(&ctx, &name).await? else {
|
||||
return Err(cmd_error!("Instance {name} not found"));
|
||||
pub async fn delete_instance(
|
||||
ctx: Context,
|
||||
name: String,
|
||||
) -> Result<Option<model::Instance>, ApiError> {
|
||||
let Some(inst_db) = Instance::get_by_name(&ctx, &name)
|
||||
.await
|
||||
.to_api_with_type(ErrorType::Database, "Couldn't find instance")?
|
||||
else {
|
||||
return Err(ErrorType::NotFound.into());
|
||||
};
|
||||
let api_model = match inst_db.api_model(&ctx).await {
|
||||
Ok(model) => Some(model),
|
||||
Err(err) => {
|
||||
warn!("Couldn't get API model to notify clients: {err}");
|
||||
None
|
||||
}
|
||||
};
|
||||
// First, destroy the instance
|
||||
match ctx.virt.conn.get_instance(name.clone()).await {
|
||||
Ok(mut inst) => {
|
||||
inst.stop().await?;
|
||||
inst.undefine(true).await?;
|
||||
inst.stop().await.to_api_with("Couldn't stop instance")?;
|
||||
inst.undefine(true)
|
||||
.await
|
||||
.to_api_with("Couldn't undefine instance")?;
|
||||
}
|
||||
Err(DomainError::DomainNotFound) => {
|
||||
warn!("Deleting instance that exists in DB but not libvirt");
|
||||
}
|
||||
Err(err) => Err(err)?,
|
||||
Err(err) => Err(ApiError::new(
|
||||
nzr_api::error::ErrorType::VirtError,
|
||||
"Couldn't get instance from libvirt",
|
||||
err,
|
||||
))?,
|
||||
}
|
||||
// Then, delete the DB entity
|
||||
inst_db.delete(&ctx).await?;
|
||||
inst_db
|
||||
.delete(&ctx)
|
||||
.await
|
||||
.to_api_with("Couldn't delete from database")?;
|
||||
|
||||
Ok(())
|
||||
Ok(api_model)
|
||||
}
|
||||
|
||||
/// Delete all instances that don't have a matching libvirt domain
|
||||
pub async fn prune_instances(ctx: &Context) -> Result<(), Box<dyn std::error::Error>> {
|
||||
for entity in Instance::all(ctx).await? {
|
||||
if let Err(err) = ctx.virt.conn.get_instance(&entity.name).await {
|
||||
if err == DomainError::DomainNotFound {
|
||||
info!("Invalid domain {}, deleting", &entity.name);
|
||||
let name = entity.name.clone();
|
||||
if let Err(err) = entity.delete(ctx).await {
|
||||
warn!("Couldn't delete {}: {}", name, err);
|
||||
if let Err(DomainError::DomainNotFound) = ctx.virt.conn.get_instance(&entity.name).await {
|
||||
info!("Invalid domain {}, deleting", &entity.name);
|
||||
// First, get the API model to notify clients with
|
||||
let api_model = match entity.api_model(ctx).await {
|
||||
Ok(ent) => Some(ent),
|
||||
Err(err) => {
|
||||
warn!("Couldn't get api model to notify clients: {err}");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// then, delete by name
|
||||
let name = entity.name.clone();
|
||||
if let Err(err) = entity.delete(ctx).await {
|
||||
warn!("Couldn't delete {}: {}", name, err);
|
||||
}
|
||||
|
||||
// and assuming all goes well, notify clients
|
||||
if let Some(ent) = api_model {
|
||||
nzr_event!(ctx.events, Deleted, ent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,13 +3,11 @@ use diesel::{
|
|||
SqliteConnection,
|
||||
};
|
||||
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
|
||||
use log::trace;
|
||||
use nzr_virt::{vol, Connection};
|
||||
use std::ops::Deref;
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::dns::ZoneData;
|
||||
use nzr_api::config::Config;
|
||||
use nzr_api::{config::Config, event::server::EventServer};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -42,8 +40,8 @@ impl Deref for Context {
|
|||
pub struct InnerCtx {
|
||||
pub sqldb: diesel::r2d2::Pool<ConnectionManager<SqliteConnection>>,
|
||||
pub config: Config,
|
||||
pub zones: crate::dns::ZoneData,
|
||||
pub virt: VirtCtx,
|
||||
pub events: Arc<EventServer>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
|
@ -60,7 +58,6 @@ pub enum ContextError {
|
|||
|
||||
impl InnerCtx {
|
||||
async fn new(config: Config) -> Result<Self, ContextError> {
|
||||
let zones = ZoneData::new(&config.dns);
|
||||
let conn = Connection::open(&config.libvirt_uri)?;
|
||||
|
||||
let pools = PoolRefs {
|
||||
|
@ -69,7 +66,7 @@ impl InnerCtx {
|
|||
baseimg: conn.get_pool(&config.storage.base_image_pool).await?,
|
||||
};
|
||||
|
||||
trace!("Connecting to database");
|
||||
tracing::trace!("Connecting to database");
|
||||
let db_uri = config.db_uri.clone();
|
||||
let sqldb = tokio::task::spawn_blocking(|| {
|
||||
let manager = ConnectionManager::<SqliteConnection>::new(db_uri);
|
||||
|
@ -80,7 +77,7 @@ impl InnerCtx {
|
|||
.unwrap()?;
|
||||
|
||||
{
|
||||
trace!("Running pending migrations");
|
||||
tracing::trace!("Running pending migrations");
|
||||
let mut conn = sqldb.get()?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
conn.run_pending_migrations(MIGRATIONS)
|
||||
|
@ -90,11 +87,13 @@ impl InnerCtx {
|
|||
.unwrap()?;
|
||||
}
|
||||
|
||||
let events = Arc::new(EventServer::new());
|
||||
|
||||
Ok(Self {
|
||||
sqldb,
|
||||
config,
|
||||
zones,
|
||||
virt: VirtCtx { conn, pools },
|
||||
events,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
16
nzrd/src/event.rs
Normal file
16
nzrd/src/event.rs
Normal file
|
@ -0,0 +1,16 @@
|
|||
use std::io;
|
||||
|
||||
use tokio::net::UnixListener;
|
||||
|
||||
use crate::ctx::Context;
|
||||
|
||||
pub async fn event_server(ctx: Context) -> io::Result<()> {
|
||||
let sock = UnixListener::bind(&ctx.config.rpc.events_sock)?;
|
||||
|
||||
loop {
|
||||
// Wait until we have an available slot for a connection
|
||||
ctx.events.until_available().await;
|
||||
let (client, addr) = sock.accept().await?;
|
||||
ctx.events.spawn(client, addr).await?;
|
||||
}
|
||||
}
|
|
@ -1,79 +1,41 @@
|
|||
mod cmd;
|
||||
mod ctrl;
|
||||
mod ctx;
|
||||
mod dns;
|
||||
mod event;
|
||||
mod model;
|
||||
mod rpc;
|
||||
|
||||
use hickory_server::ServerFuture;
|
||||
use log::LevelFilter;
|
||||
use log::*;
|
||||
use model::{Instance, Subnet};
|
||||
use std::str::FromStr;
|
||||
|
||||
use nzr_api::config;
|
||||
use std::{net::IpAddr, str::FromStr};
|
||||
use tokio::net::UdpSocket;
|
||||
|
||||
#[tokio::main(flavor = "multi_thread")]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let cfg: config::Config = config::Config::figment().extract()?;
|
||||
let ctx = ctx::Context::new(cfg).await?;
|
||||
|
||||
syslog::init_unix(
|
||||
syslog::Facility::LOG_DAEMON,
|
||||
LevelFilter::from_str(ctx.config.log_level.as_str())?,
|
||||
)?;
|
||||
let mut bad_loglevel = false;
|
||||
let log_level = tracing::Level::from_str(&ctx.config.log_level).unwrap_or_else(|_| {
|
||||
bad_loglevel = true;
|
||||
tracing::Level::WARN
|
||||
});
|
||||
|
||||
info!("Hydrating initial zones...");
|
||||
for subnet in Subnet::all(&ctx).await? {
|
||||
// A records
|
||||
if let Err(err) = ctx.zones.new_zone(&subnet).await {
|
||||
error!("Couldn't create zone for {}: {}", &subnet.ifname, err);
|
||||
continue;
|
||||
}
|
||||
match Instance::all_in_subnet(&ctx, &subnet).await {
|
||||
Ok(leases) => {
|
||||
for lease in leases {
|
||||
let Ok(lease_addr) = subnet.network.make_ip(lease.host_num as u32) else {
|
||||
warn!("Ignoring {} due to lease address issue", &lease.name);
|
||||
continue;
|
||||
};
|
||||
tracing_subscriber::fmt().with_max_level(log_level).init();
|
||||
|
||||
if let Err(err) = ctx
|
||||
.zones
|
||||
.new_record(&subnet.ifname.to_string(), &lease.name, lease_addr)
|
||||
.await
|
||||
{
|
||||
error!(
|
||||
"Failed to set up lease for {} in {}: {}",
|
||||
&lease.name, &subnet.ifname, err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Couldn't get leases for {}: {}", &subnet.ifname, err);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if bad_loglevel {
|
||||
tracing::warn!("Couldn't parse log level from config, defaulting to {log_level}");
|
||||
}
|
||||
|
||||
// DNS init
|
||||
let mut dns_listener = ServerFuture::new(ctx.zones.catalog());
|
||||
let dns_socket = {
|
||||
let dns_ip: IpAddr = ctx.config.dns.listen_addr.parse()?;
|
||||
UdpSocket::bind((dns_ip, ctx.config.dns.port)).await?
|
||||
};
|
||||
dns_listener.register_socket(dns_socket);
|
||||
|
||||
// Run both the RPC and events servers
|
||||
tokio::select! {
|
||||
res = rpc::serve(ctx.clone()) => {
|
||||
if let Err(err) = res {
|
||||
error!("Error from RPC: {}", err);
|
||||
tracing::error!("RPC server error: {err}");
|
||||
}
|
||||
},
|
||||
res = dns_listener.block_until_done() => {
|
||||
res = event::event_server(ctx.clone()) => {
|
||||
if let Err(err) = res {
|
||||
error!("Error from DNS: {}", err);
|
||||
tracing::error!("Event server error: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,12 +20,14 @@ use tx::Transactable;
|
|||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ModelError {
|
||||
#[error("Database error occurred: {0}")]
|
||||
#[error("{0}")]
|
||||
Db(#[from] diesel::result::Error),
|
||||
#[error("Unable to get database handle: {0}")]
|
||||
#[error("Database pool error ({0})")]
|
||||
Pool(#[from] diesel::r2d2::PoolError),
|
||||
#[error("{0}")]
|
||||
Cidr(#[from] cidr::Error),
|
||||
#[error("Instance belongs to a subnet that has since disappeared")]
|
||||
NoSubnet,
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
|
@ -247,10 +249,7 @@ impl Instance {
|
|||
|
||||
/// Creates an [nzr_api::model::Instance] from the information available in
|
||||
/// the database.
|
||||
pub async fn api_model(
|
||||
&self,
|
||||
ctx: &Context,
|
||||
) -> Result<nzr_api::model::Instance, Box<dyn std::error::Error>> {
|
||||
pub async fn api_model(&self, ctx: &Context) -> Result<nzr_api::model::Instance, ModelError> {
|
||||
let netid = self.subnet_id;
|
||||
let Some(subnet) = ctx
|
||||
.spawn_db(move |mut db| Subnet::table().find(netid).load::<Subnet>(&mut db))
|
||||
|
@ -258,7 +257,7 @@ impl Instance {
|
|||
.into_iter()
|
||||
.next()
|
||||
else {
|
||||
todo!("something went horribly wrong");
|
||||
return Err(ModelError::NoSubnet);
|
||||
};
|
||||
|
||||
Ok(nzr_api::model::Instance {
|
||||
|
@ -404,7 +403,7 @@ impl Subnet {
|
|||
// the API.
|
||||
Ok(addr) => Some(addr),
|
||||
Err(err) => {
|
||||
log::error!(
|
||||
tracing::error!(
|
||||
"Error parsing DNS server '{}' for {}: {}",
|
||||
s,
|
||||
&self.name,
|
||||
|
@ -416,7 +415,7 @@ impl Subnet {
|
|||
.collect(),
|
||||
domain_name: self.domain_name.as_ref().map(|s| {
|
||||
Name::from_str(s).unwrap_or_else(|e| {
|
||||
log::error!("Error parsing DNS name for {}: {}", &self.name, e);
|
||||
tracing::error!("Error parsing DNS name for {}: {}", &self.name, e);
|
||||
Name::default()
|
||||
})
|
||||
}),
|
||||
|
|
|
@ -48,7 +48,7 @@ impl<'a, T: Transactable> Drop for Transaction<'a, T> {
|
|||
let ctx = self.ctx.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = inner.undo_tx(&ctx).await {
|
||||
log::error!("Error undoing transaction: {err}");
|
||||
tracing::error!("Error undoing transaction: {err}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
164
nzrd/src/rpc.rs
164
nzrd/src/rpc.rs
|
@ -1,5 +1,6 @@
|
|||
use futures::{future, StreamExt};
|
||||
use nzr_api::{args, model, InstanceQuery, Nazrin};
|
||||
use nzr_api::error::{ApiError, ErrorType, ToApiResult};
|
||||
use nzr_api::{args, model, nzr_event, InstanceQuery, Nazrin};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tarpc::server::{BaseChannel, Channel};
|
||||
|
@ -13,8 +14,8 @@ use uuid::Uuid;
|
|||
use crate::cmd;
|
||||
use crate::ctx::Context;
|
||||
use crate::model::{Instance, SshPubkey, Subnet};
|
||||
use log::*;
|
||||
use std::collections::HashMap;
|
||||
use tracing::*;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NzrServer {
|
||||
|
@ -36,7 +37,7 @@ impl Nazrin for NzrServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
build_args: args::NewInstance,
|
||||
) -> Result<uuid::Uuid, String> {
|
||||
) -> Result<uuid::Uuid, ApiError> {
|
||||
let progress = Arc::new(RwLock::new(crate::ctrl::vm::Progress {
|
||||
status_text: "Starting...".to_owned(),
|
||||
percentage: 0.0,
|
||||
|
@ -44,13 +45,11 @@ impl Nazrin for NzrServer {
|
|||
let prog_task = progress.clone();
|
||||
let build_task = tokio::spawn(async move {
|
||||
let (inst, dom) =
|
||||
cmd::vm::new_instance(self.ctx.clone(), prog_task.clone(), &build_args)
|
||||
.await
|
||||
.map_err(|e| format!("Instance creation failed: {}", e))?;
|
||||
cmd::vm::new_instance(self.ctx.clone(), prog_task.clone(), &build_args).await?;
|
||||
let mut api_model = inst
|
||||
.api_model(&self.ctx)
|
||||
.await
|
||||
.map_err(|e| format!("Couldn't generate API response: {e}"))?;
|
||||
.to_api_with("Couldn't generate API response")?;
|
||||
match dom.state().await {
|
||||
Ok(state) => {
|
||||
api_model.state = state.into();
|
||||
|
@ -59,6 +58,9 @@ impl Nazrin for NzrServer {
|
|||
warn!("Unable to get instance state: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
// Inform event listeners
|
||||
nzr_event!(self.ctx.events, Created, api_model);
|
||||
Ok(api_model)
|
||||
});
|
||||
|
||||
|
@ -94,7 +96,7 @@ impl Nazrin for NzrServer {
|
|||
Some(
|
||||
task.inner
|
||||
.await
|
||||
.map_err(|err| format!("Task failed with panic: {}", err))
|
||||
.to_api_with("Task failed with panic")
|
||||
.and_then(|res| res),
|
||||
)
|
||||
} else {
|
||||
|
@ -108,10 +110,16 @@ impl Nazrin for NzrServer {
|
|||
})
|
||||
}
|
||||
|
||||
async fn delete_instance(self, _: tarpc::context::Context, name: String) -> Result<(), String> {
|
||||
cmd::vm::delete_instance(self.ctx.clone(), name)
|
||||
.await
|
||||
.map_err(|e| format!("Couldn't delete instance: {}", e))?;
|
||||
async fn delete_instance(
|
||||
self,
|
||||
_: tarpc::context::Context,
|
||||
name: String,
|
||||
) -> Result<(), ApiError> {
|
||||
let api_model = cmd::vm::delete_instance(self.ctx.clone(), name).await?;
|
||||
|
||||
if let Some(api_model) = api_model {
|
||||
nzr_event!(self.ctx.events, Deleted, api_model);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -119,18 +127,16 @@ impl Nazrin for NzrServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
query: nzr_api::InstanceQuery,
|
||||
) -> Result<Option<model::Instance>, String> {
|
||||
) -> Result<Option<model::Instance>, ApiError> {
|
||||
let res = match query {
|
||||
InstanceQuery::Name(name) => Instance::get_by_name(&self.ctx, name).await,
|
||||
InstanceQuery::MacAddr(addr) => Instance::get_by_mac(&self.ctx, addr).await,
|
||||
InstanceQuery::Ipv4Addr(addr) => Instance::get_by_ip4(&self.ctx, addr).await,
|
||||
}
|
||||
.map_err(|e| e.to_string())?;
|
||||
.to_api()?;
|
||||
|
||||
if let Some(inst) = res {
|
||||
inst.api_model(&self.ctx)
|
||||
.await
|
||||
.map_or_else(|e| Err(e.to_string()), |m| Ok(Some(m)))
|
||||
inst.api_model(&self.ctx).await.to_api().map(Some)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
|
@ -140,10 +146,10 @@ impl Nazrin for NzrServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
with_status: bool,
|
||||
) -> Result<Vec<model::Instance>, String> {
|
||||
) -> Result<Vec<model::Instance>, ApiError> {
|
||||
let db_models = Instance::all(&self.ctx)
|
||||
.await
|
||||
.map_err(|e| format!("Unable to get all instances: {e}"))?;
|
||||
.to_api_type(ErrorType::Database)?;
|
||||
let mut models = Vec::new();
|
||||
for inst in db_models {
|
||||
let mut api_model = match inst.api_model(&self.ctx).await {
|
||||
|
@ -181,34 +187,39 @@ impl Nazrin for NzrServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
build_args: model::Subnet,
|
||||
) -> Result<model::Subnet, String> {
|
||||
cmd::net::add_subnet(&self.ctx, build_args)
|
||||
) -> Result<model::Subnet, ApiError> {
|
||||
let subnet = Subnet::insert(&self.ctx, build_args.name, build_args.data)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?
|
||||
.to_api_type(ErrorType::Database)?
|
||||
.api_model()
|
||||
.map_err(|e| e.to_string())
|
||||
.to_api_with("Unable to generate API model")?;
|
||||
|
||||
// inform event listeners
|
||||
nzr_event!(self.ctx.events, Created, subnet);
|
||||
Ok(subnet)
|
||||
}
|
||||
|
||||
async fn modify_subnet(
|
||||
self,
|
||||
_: tarpc::context::Context,
|
||||
edit_args: model::Subnet,
|
||||
) -> Result<model::Subnet, String> {
|
||||
) -> Result<model::Subnet, ApiError> {
|
||||
if let Some(subnet) = Subnet::get_by_name(&self.ctx, &edit_args.name)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?
|
||||
{
|
||||
todo!("support updating Subnets")
|
||||
Err("Modifying subnets not yet supported".into())
|
||||
} else {
|
||||
Err(format!("Subnet {} not found", &edit_args.name))
|
||||
Err(ErrorType::NotFound.into())
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_subnets(self, _: tarpc::context::Context) -> Result<Vec<model::Subnet>, String> {
|
||||
Subnet::all(&self.ctx).await.map_or_else(
|
||||
|e| Err(e.to_string()),
|
||||
|v| {
|
||||
Ok(v.into_iter()
|
||||
async fn get_subnets(self, _: tarpc::context::Context) -> Result<Vec<model::Subnet>, ApiError> {
|
||||
Subnet::all(&self.ctx)
|
||||
.await
|
||||
.to_api_with("Couldn't get list of subnets")
|
||||
.map(|v| {
|
||||
v.into_iter()
|
||||
.filter_map(|s| match s.api_model() {
|
||||
Ok(model) => Some(model),
|
||||
Err(err) => {
|
||||
|
@ -216,23 +227,43 @@ impl Nazrin for NzrServer {
|
|||
None
|
||||
}
|
||||
})
|
||||
.collect())
|
||||
},
|
||||
)
|
||||
.collect()
|
||||
})
|
||||
}
|
||||
|
||||
async fn delete_subnet(
|
||||
self,
|
||||
_: tarpc::context::Context,
|
||||
subnet_name: String,
|
||||
) -> Result<(), String> {
|
||||
cmd::net::delete_subnet(&self.ctx, &subnet_name)
|
||||
) -> Result<(), ApiError> {
|
||||
if let Some(subnet) = Subnet::get_by_name(&self.ctx, subnet_name)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
Ok(())
|
||||
.to_api_type(ErrorType::Database)?
|
||||
{
|
||||
let api_model = match subnet.api_model() {
|
||||
Ok(model) => Some(model),
|
||||
Err(err) => {
|
||||
tracing::error!("Unable to generate model for clients: {err}");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
subnet
|
||||
.delete(&self.ctx)
|
||||
.await
|
||||
.to_api_type(ErrorType::Database)?;
|
||||
|
||||
if let Some(api_model) = api_model {
|
||||
nzr_event!(&self.ctx.events, Deleted, api_model);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ErrorType::NotFound.into())
|
||||
}
|
||||
}
|
||||
|
||||
async fn garbage_collect(self, _: tarpc::context::Context) -> Result<(), String> {
|
||||
async fn garbage_collect(self, _: tarpc::context::Context) -> Result<(), ApiError> {
|
||||
cmd::vm::prune_instances(&self.ctx)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
@ -243,50 +274,52 @@ impl Nazrin for NzrServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
id: i32,
|
||||
) -> Result<Vec<u8>, String> {
|
||||
let Some(db_model) = Instance::get(&self.ctx, id)
|
||||
) -> Result<Vec<u8>, ApiError> {
|
||||
if let Some(db_model) = Instance::get(&self.ctx, id)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?
|
||||
else {
|
||||
return Err("Instance doesn't exist".to_owned());
|
||||
};
|
||||
|
||||
Ok(db_model.ci_userdata.unwrap_or_default())
|
||||
.to_api_type(ErrorType::Database)?
|
||||
{
|
||||
Ok(db_model.ci_userdata.unwrap_or_default())
|
||||
} else {
|
||||
Err(ErrorType::NotFound.into())
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_ssh_pubkeys(
|
||||
self,
|
||||
_: tarpc::context::Context,
|
||||
) -> Result<Vec<model::SshPubkey>, String> {
|
||||
SshPubkey::all(&self.ctx).await.map_or_else(
|
||||
|e| Err(e.to_string()),
|
||||
|k| Ok(k.iter().map(|k| k.api_model()).collect()),
|
||||
)
|
||||
) -> Result<Vec<model::SshPubkey>, ApiError> {
|
||||
SshPubkey::all(&self.ctx)
|
||||
.await
|
||||
.to_api_type(ErrorType::Database)
|
||||
.map(|k| k.iter().map(|k| k.api_model()).collect())
|
||||
}
|
||||
|
||||
async fn add_ssh_pubkey(
|
||||
self,
|
||||
_: tarpc::context::Context,
|
||||
pub_key: String,
|
||||
) -> Result<model::SshPubkey, String> {
|
||||
let pubkey = model::SshPubkey::from_str(&pub_key).map_err(|e| e.to_string())?;
|
||||
) -> Result<model::SshPubkey, ApiError> {
|
||||
let pubkey = model::SshPubkey::from_str(&pub_key).to_api_type(ErrorType::Parse)?;
|
||||
|
||||
SshPubkey::insert(&self.ctx, pubkey.algorithm, pubkey.key_data, pubkey.comment)
|
||||
.await
|
||||
.map_err(|e| e.to_string())
|
||||
.to_api_type(ErrorType::Database)
|
||||
.map(|k| k.api_model())
|
||||
}
|
||||
|
||||
async fn delete_ssh_pubkey(self, _: tarpc::context::Context, id: i32) -> Result<(), String> {
|
||||
let Some(key) = SshPubkey::get(&self.ctx, id)
|
||||
async fn delete_ssh_pubkey(self, _: tarpc::context::Context, id: i32) -> Result<(), ApiError> {
|
||||
if let Some(key) = SshPubkey::get(&self.ctx, id)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?
|
||||
else {
|
||||
return Err("SSH key with ID doesn't exist".into());
|
||||
};
|
||||
|
||||
key.delete(&self.ctx).await.map_err(|e| e.to_string())?;
|
||||
Ok(())
|
||||
.to_api_type(ErrorType::Database)?
|
||||
{
|
||||
key.delete(&self.ctx)
|
||||
.await
|
||||
.to_api_type(ErrorType::Database)?;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ErrorType::NotFound.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -324,7 +357,6 @@ pub async fn serve(ctx: Context) -> Result<(), Box<dyn std::error::Error>> {
|
|||
debug!("Listening for new connection...");
|
||||
let (conn, _addr) = listener.accept().await?;
|
||||
let ctx = ctx.clone();
|
||||
// hack?
|
||||
tokio::spawn(async move {
|
||||
let framed = codec_builder.new_framed(conn);
|
||||
let transport = tarpc::serde_transport::new(framed, Bincode::default());
|
||||
|
@ -340,6 +372,6 @@ pub async fn serve(ctx: Context) -> Result<(), Box<dyn std::error::Error>> {
|
|||
}
|
||||
|
||||
struct InstCreateStatus {
|
||||
inner: JoinHandle<Result<model::Instance, String>>,
|
||||
inner: JoinHandle<Result<model::Instance, ApiError>>,
|
||||
progress: Arc<RwLock<crate::ctrl::vm::Progress>>,
|
||||
}
|
||||
|
|
|
@ -125,11 +125,7 @@ async fn handle_message(ctx: &Context, from: SocketAddr, msg: &Message) {
|
|||
|
||||
{
|
||||
let opts = response.opts_mut();
|
||||
let giaddr = if msg.giaddr().is_unspecified() {
|
||||
todo!("no relay??")
|
||||
} else {
|
||||
msg.giaddr()
|
||||
};
|
||||
let giaddr = msg.giaddr();
|
||||
|
||||
opts.insert(DhcpOption::ServerIdentifier(giaddr));
|
||||
if let Some(time) = lease_time {
|
||||
|
|
15
nzrdns/Cargo.toml
Normal file
15
nzrdns/Cargo.toml
Normal file
|
@ -0,0 +1,15 @@
|
|||
[package]
|
||||
name = "nzrdns"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
|
||||
nzr-api = { path = "../nzr-api" }
|
||||
hickory-server = "0.24"
|
||||
hickory-proto = { version = "0.24", features = ["serde-config"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
async-trait = "0.1"
|
||||
futures = "0.3"
|
||||
anyhow = "1"
|
|
@ -1,14 +1,13 @@
|
|||
use crate::model::Subnet;
|
||||
use log::*;
|
||||
use nzr_api::config::DNSConfig;
|
||||
use std::borrow::Borrow;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::net::Ipv4Addr;
|
||||
use std::ops::Deref;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
|
||||
use nzr_api::model::{Instance, SubnetData};
|
||||
|
||||
use hickory_proto::rr::Name;
|
||||
use hickory_server::authority::{AuthorityObject, Catalog};
|
||||
use hickory_server::proto::rr::{rdata::soa, RData, RecordSet};
|
||||
|
@ -70,7 +69,7 @@ pub struct InnerZD {
|
|||
}
|
||||
|
||||
pub fn make_rectree_with_soa(name: &Name, config: &DNSConfig) -> BTreeMap<RrKey, RecordSet> {
|
||||
debug!("Creating initial SOA for {}", &name);
|
||||
tracing::debug!("Creating initial SOA for {}", &name);
|
||||
let mut records: BTreeMap<RrKey, RecordSet> = BTreeMap::new();
|
||||
let soa_key = RrKey::new(
|
||||
LowerName::from(name),
|
||||
|
@ -119,24 +118,28 @@ impl InnerZD {
|
|||
}
|
||||
|
||||
/// Creates a new DNS zone for the given subnet.
|
||||
pub async fn new_zone(&self, subnet: &Subnet) -> Result<(), Box<dyn std::error::Error>> {
|
||||
pub async fn new_zone(
|
||||
&self,
|
||||
zone_id: impl AsRef<str>,
|
||||
subnet: &SubnetData,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
if let Some(name) = &subnet.domain_name {
|
||||
let name: Name = name.parse()?;
|
||||
let rectree = make_rectree_with_soa(&name, &self.config);
|
||||
let rectree = make_rectree_with_soa(name, &self.config);
|
||||
let auth = InMemoryAuthority::new(
|
||||
name,
|
||||
name.clone(),
|
||||
rectree,
|
||||
hickory_server::authority::ZoneType::Primary,
|
||||
false,
|
||||
)?;
|
||||
self.import(&subnet.ifname.to_string(), auth).await;
|
||||
self.import(zone_id.as_ref(), auth).await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn import(&self, name: &str, auth: InMemoryAuthority) {
|
||||
/// Generates a zone with the given records.
|
||||
async fn import(&self, name: &str, auth: InMemoryAuthority) {
|
||||
let auth_arc = Arc::new(auth);
|
||||
log::debug!(
|
||||
tracing::debug!(
|
||||
"Importing {} with {} records...",
|
||||
name,
|
||||
auth_arc.records().await.len()
|
||||
|
@ -159,26 +162,23 @@ impl InnerZD {
|
|||
}
|
||||
|
||||
/// Adds a new host record in the DNS zone.
|
||||
pub async fn new_record(
|
||||
&self,
|
||||
interface: &str,
|
||||
name: &str,
|
||||
addr: Ipv4Addr,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let hostname = Name::from_str(name)?;
|
||||
pub async fn new_record(&self, inst: &Instance) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let hostname = Name::from_str(&inst.name)?;
|
||||
let zones = self.map.lock().await;
|
||||
let zone = zones.get(interface).unwrap_or(&self.default_zone);
|
||||
let zone = zones.get(&inst.lease.subnet).unwrap_or(&self.default_zone);
|
||||
let fqdn = {
|
||||
let origin: Name = zone.origin().into();
|
||||
hostname.append_domain(&origin)?
|
||||
};
|
||||
|
||||
log::debug!(
|
||||
tracing::debug!(
|
||||
"Creating new host entry {} in zone {}...",
|
||||
&fqdn,
|
||||
zone.origin()
|
||||
);
|
||||
|
||||
let addr = inst.lease.addr.addr;
|
||||
|
||||
let record = Record::from_rdata(fqdn, 3600, RData::A(addr.into()));
|
||||
zone.upsert(record, 0).await;
|
||||
self.catalog()
|
||||
|
@ -189,14 +189,10 @@ impl InnerZD {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete_record(
|
||||
&self,
|
||||
interface: &str,
|
||||
name: &str,
|
||||
) -> Result<bool, Box<dyn std::error::Error>> {
|
||||
let hostname = Name::from_str(name)?;
|
||||
pub async fn delete_record(&self, inst: &Instance) -> Result<bool, Box<dyn std::error::Error>> {
|
||||
let hostname = Name::from_str(&inst.name)?;
|
||||
let mut zones = self.map.lock().await;
|
||||
if let Some(zone) = zones.get_mut(interface) {
|
||||
if let Some(zone) = zones.get_mut(&inst.lease.subnet) {
|
||||
let hostname: LowerName = hostname.into();
|
||||
self.catalog.0.write().await.remove(&hostname);
|
||||
let key = RrKey::new(hostname, hickory_server::proto::rr::RecordType::A);
|
169
nzrdns/src/main.rs
Normal file
169
nzrdns/src/main.rs
Normal file
|
@ -0,0 +1,169 @@
|
|||
use std::{net::IpAddr, process::ExitCode};
|
||||
|
||||
use anyhow::Context;
|
||||
use dns::ZoneData;
|
||||
use futures::StreamExt;
|
||||
use hickory_server::ServerFuture;
|
||||
use nzr_api::{
|
||||
config::Config,
|
||||
event::{client::EventClient, EventMessage, ResourceAction},
|
||||
NazrinClient,
|
||||
};
|
||||
use tokio::{
|
||||
io::AsyncRead,
|
||||
net::{UdpSocket, UnixStream},
|
||||
};
|
||||
|
||||
mod dns;
|
||||
|
||||
/// Function to handle incoming events from Nazrin and update the DNS database
|
||||
/// accordingly.
|
||||
async fn event_handler<T: AsyncRead>(zones: ZoneData, mut events: EventClient<T>) {
|
||||
while let Some(event) = events.next().await {
|
||||
match event {
|
||||
Ok(EventMessage::Instance(event)) => {
|
||||
let ent = &event.entity;
|
||||
match event.action {
|
||||
ResourceAction::Created => {
|
||||
if let Err(err) = zones.new_record(ent).await {
|
||||
tracing::error!("Unable to add record {}: {err}", ent.name);
|
||||
}
|
||||
}
|
||||
ResourceAction::Deleted => {
|
||||
if let Err(err) = zones.delete_record(ent).await {
|
||||
tracing::error!("Unable to delete record {}: {err}", ent.name);
|
||||
}
|
||||
}
|
||||
misc => {
|
||||
tracing::debug!("ignoring instance action {misc:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(EventMessage::Subnet(event)) => {
|
||||
let ent = &event.entity;
|
||||
match event.action {
|
||||
ResourceAction::Created => {
|
||||
if let Some(name) = ent.data.domain_name.as_ref() {
|
||||
if let Err(err) = zones.new_zone(&ent.name, &ent.data).await {
|
||||
tracing::error!("Unable to add zone {name}: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
ResourceAction::Deleted => {
|
||||
if ent.data.domain_name.as_ref().is_some() {
|
||||
zones.delete_zone(&ent.name).await;
|
||||
}
|
||||
}
|
||||
misc => {
|
||||
tracing::debug!("ignoring subnet action {misc:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("Error getting events: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::warn!("No more events! (did Nazrin shut down?)");
|
||||
}
|
||||
|
||||
/// Hydrates all existing DNS zones.
|
||||
async fn hydrate_zones(zones: ZoneData, api_client: NazrinClient) -> anyhow::Result<()> {
|
||||
tracing::info!("Hydrating initial zones...");
|
||||
let subnets = api_client
|
||||
.get_subnets(nzr_api::default_ctx())
|
||||
.await
|
||||
.context("RPC error getting subnets")?
|
||||
.map_err(|e| anyhow::anyhow!("API error getting subnets: {e}"))?;
|
||||
|
||||
let instances = api_client
|
||||
.get_instances(nzr_api::default_ctx(), false)
|
||||
.await
|
||||
.context("RPC error getting instances")?
|
||||
.map_err(|e| anyhow::anyhow!("API error getting instances: {e}"))?;
|
||||
|
||||
for subnet in subnets {
|
||||
if let Err(err) = zones.new_zone(&subnet.name, &subnet.data).await {
|
||||
tracing::warn!("Couldn't create zone for {}: {err}", &subnet.name);
|
||||
}
|
||||
}
|
||||
|
||||
for instance in instances {
|
||||
if let Err(err) = zones.new_record(&instance).await {
|
||||
tracing::warn!("Couldn't create zone entry for {}: {err}", &instance.name);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> ExitCode {
|
||||
tracing_subscriber::fmt::init();
|
||||
let cfg: Config = match Config::figment().extract() {
|
||||
Ok(cfg) => cfg,
|
||||
Err(err) => {
|
||||
tracing::error!("Error parsing config: {err}");
|
||||
return ExitCode::FAILURE;
|
||||
}
|
||||
};
|
||||
|
||||
let api_client = {
|
||||
let sock = match UnixStream::connect(&cfg.rpc.socket_path).await {
|
||||
Ok(sock) => sock,
|
||||
Err(err) => {
|
||||
tracing::error!("Connection to nzrd failed: {err}");
|
||||
return ExitCode::FAILURE;
|
||||
}
|
||||
};
|
||||
nzr_api::new_client(sock)
|
||||
};
|
||||
let events = {
|
||||
let sock = match UnixStream::connect(&cfg.rpc.events_sock).await {
|
||||
Ok(sock) => sock,
|
||||
Err(err) => {
|
||||
tracing::error!("Connections to events stream failed: {err}");
|
||||
return ExitCode::FAILURE;
|
||||
}
|
||||
};
|
||||
nzr_api::event::client::EventClient::new(sock)
|
||||
};
|
||||
|
||||
let zones = ZoneData::new(&cfg.dns);
|
||||
|
||||
if let Err(err) = hydrate_zones(zones.clone(), api_client.clone()).await {
|
||||
tracing::error!("{err}");
|
||||
return ExitCode::FAILURE;
|
||||
}
|
||||
|
||||
let mut dns_listener = ServerFuture::new(zones.catalog());
|
||||
let dns_socket = {
|
||||
let Ok(dns_ip) = cfg.dns.listen_addr.parse::<IpAddr>() else {
|
||||
tracing::error!("Unable to parse listen_addr");
|
||||
return ExitCode::FAILURE;
|
||||
};
|
||||
|
||||
match UdpSocket::bind((dns_ip, cfg.dns.port)).await {
|
||||
Ok(sock) => sock,
|
||||
Err(err) => {
|
||||
tracing::error!("Couldn't bind to {dns_ip}:{}: {err}", cfg.dns.port);
|
||||
return ExitCode::FAILURE;
|
||||
}
|
||||
}
|
||||
};
|
||||
dns_listener.register_socket(dns_socket);
|
||||
|
||||
tokio::select! {
|
||||
_ = event_handler(zones.clone(), events) => {
|
||||
// nothing to do here
|
||||
},
|
||||
res = dns_listener.block_until_done() => {
|
||||
if let Err(err) = res {
|
||||
tracing::error!("Error from DNS: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ExitCode::SUCCESS
|
||||
}
|
|
@ -57,7 +57,7 @@ impl Context {
|
|||
}
|
||||
|
||||
pub async fn get_sshkeys(&self) -> Result<Vec<SshPubkey>> {
|
||||
// TODO: do we cache SSH keys? I don't like the idea of it
|
||||
// We don't cache SSH keys, so always get from the API server
|
||||
let ssh_keys = self
|
||||
.api_client
|
||||
.get_ssh_pubkeys(nzr_api::default_ctx())
|
||||
|
|
|
@ -41,7 +41,6 @@ async fn get_meta_data(
|
|||
Ok(Some(inst)) => {
|
||||
let meta = Metadata {
|
||||
inst_name: &inst.name,
|
||||
// XXX: this is very silly imo
|
||||
ssh_pubkeys: ssh_pubkeys.iter().collect(),
|
||||
};
|
||||
|
||||
|
@ -99,7 +98,7 @@ async fn get_vendor_data(
|
|||
// admin username from an unknown instance.
|
||||
if let IpAddr::V4(ip) = addr.ip() {
|
||||
match ctx.get_instance(ip).await {
|
||||
Ok(_) => {
|
||||
Ok(Some(_)) => {
|
||||
let data = model::VendorData {
|
||||
username: Some(&ctx.cfg().cloud.admin_user),
|
||||
};
|
||||
|
@ -108,14 +107,14 @@ async fn get_vendor_data(
|
|||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
})
|
||||
}
|
||||
Ok(None) => {
|
||||
tracing::warn!("Request from unregistered server {ip}");
|
||||
Err(StatusCode::FORBIDDEN)
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("{err}");
|
||||
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
_ => {
|
||||
tracing::warn!("Request from unregistered server {ip}");
|
||||
Err(StatusCode::FORBIDDEN)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Err(StatusCode::BAD_REQUEST)
|
||||
|
|
Loading…
Reference in a new issue