Compare commits
No commits in common. "40532c9e36443fe38775adadd15b383ec7f17c8d" and "0cb3aea62e27278a9db885c101f9bdfe90d4f96b" have entirely different histories.
40532c9e36
...
0cb3aea62e
30 changed files with 406 additions and 1127 deletions
75
Cargo.lock
generated
75
Cargo.lock
generated
|
@ -1046,6 +1046,17 @@ dependencies = [
|
|||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hostname"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"windows",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "http"
|
||||
version = "1.1.0"
|
||||
|
@ -1521,6 +1532,15 @@ dependencies = [
|
|||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_threads"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nzr"
|
||||
version = "0.9.0"
|
||||
|
@ -1548,12 +1568,9 @@ dependencies = [
|
|||
"log",
|
||||
"regex",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tarpc",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-serde 0.9.0",
|
||||
"tracing",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
|
@ -1587,6 +1604,7 @@ dependencies = [
|
|||
"home",
|
||||
"libc",
|
||||
"libsqlite3-sys",
|
||||
"log",
|
||||
"nix",
|
||||
"nzr-api",
|
||||
"nzr-virt",
|
||||
|
@ -1598,13 +1616,12 @@ dependencies = [
|
|||
"serde_with",
|
||||
"serde_yaml",
|
||||
"stdext",
|
||||
"syslog",
|
||||
"tarpc",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-serde 0.9.0",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"trait-variant",
|
||||
"uuid",
|
||||
"zerocopy",
|
||||
|
@ -1624,21 +1641,6 @@ dependencies = [
|
|||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nzrdns"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"futures",
|
||||
"hickory-proto",
|
||||
"hickory-server",
|
||||
"nzr-api",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "object"
|
||||
version = "0.36.2"
|
||||
|
@ -2289,6 +2291,18 @@ version = "1.0.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394"
|
||||
|
||||
[[package]]
|
||||
name = "syslog"
|
||||
version = "7.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "019f1500a13379b7d051455df397c75770de6311a7a188a699499502704d9f10"
|
||||
dependencies = [
|
||||
"hostname",
|
||||
"libc",
|
||||
"log",
|
||||
"time",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tabled"
|
||||
version = "0.15.0"
|
||||
|
@ -2412,7 +2426,9 @@ checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885"
|
|||
dependencies = [
|
||||
"deranged",
|
||||
"itoa",
|
||||
"libc",
|
||||
"num-conv",
|
||||
"num_threads",
|
||||
"powerfmt",
|
||||
"serde",
|
||||
"time-core",
|
||||
|
@ -2926,6 +2942,25 @@ version = "0.4.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
||||
|
||||
[[package]]
|
||||
name = "windows"
|
||||
version = "0.52.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be"
|
||||
dependencies = [
|
||||
"windows-core",
|
||||
"windows-targets",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-core"
|
||||
version = "0.52.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
|
||||
dependencies = [
|
||||
"windows-targets",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.42.0"
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
[workspace]
|
||||
members = ["nzrd", "nzr-api", "client", "nzrdhcp", "nzr-virt", "omyacid", "nzrdns"]
|
||||
members = ["nzrd", "nzr-api", "client", "nzrdhcp", "nzr-virt", "omyacid"]
|
||||
resolver = "2"
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
use clap::{CommandFactory, FromArgMatches, Parser, Subcommand};
|
||||
use nzr_api::config;
|
||||
use nzr_api::error::Simplify;
|
||||
use nzr_api::hickory_proto::rr::Name;
|
||||
use nzr_api::model;
|
||||
use nzr_api::net::cidr::CidrV4;
|
||||
|
@ -333,9 +332,9 @@ async fn handle_command() -> Result<(), Box<dyn std::error::Error>> {
|
|||
let mut net = client
|
||||
.get_subnets(nzr_api::default_ctx())
|
||||
.await
|
||||
.simplify()
|
||||
.map_err(|e| e.to_string())
|
||||
.and_then(|res| {
|
||||
res.iter()
|
||||
res?.iter()
|
||||
.find_map(|ent| {
|
||||
if ent.name == args.name {
|
||||
Some(ent.clone())
|
||||
|
@ -343,7 +342,7 @@ async fn handle_command() -> Result<(), Box<dyn std::error::Error>> {
|
|||
None
|
||||
}
|
||||
})
|
||||
.ok_or_else(|| format!("Couldn't find network {}", &args.name).into())
|
||||
.ok_or_else(|| format!("Couldn't find network {}", &args.name))
|
||||
})?;
|
||||
|
||||
// merge in the new args
|
||||
|
@ -366,11 +365,15 @@ async fn handle_command() -> Result<(), Box<dyn std::error::Error>> {
|
|||
}
|
||||
|
||||
// run the update
|
||||
let net = client
|
||||
client
|
||||
.modify_subnet(nzr_api::default_ctx(), net)
|
||||
.await
|
||||
.simplify()?;
|
||||
println!("Subnet {} updated.", net.name);
|
||||
.map_err(|err| format!("RPC error: {}", err))
|
||||
.and_then(|res| {
|
||||
res.map(|e| {
|
||||
println!("Subnet {} updated.", e.name);
|
||||
})
|
||||
})?;
|
||||
}
|
||||
NetCmd::Dump { name } => {
|
||||
let subnets = (client.get_subnets(nzr_api::default_ctx()).await?)?;
|
||||
|
|
|
@ -17,17 +17,14 @@ uuid = { version = "1.2.2", features = ["serde"] }
|
|||
hickory-proto = { version = "0.24", features = ["serde-config"] }
|
||||
log = "0.4.17"
|
||||
diesel = { version = "2.2", optional = true }
|
||||
futures = "0.3"
|
||||
futures = { version = "0.3", optional = true }
|
||||
thiserror = "1"
|
||||
regex = "1"
|
||||
lazy_static = "1"
|
||||
tracing = "0.1"
|
||||
tokio-serde = { version = "0.9", features = ["bincode"] }
|
||||
serde_json = "1"
|
||||
|
||||
[dev-dependencies]
|
||||
uuid = { version = "1.2.2", features = ["serde", "v4"] }
|
||||
|
||||
[features]
|
||||
diesel = ["dep:diesel"]
|
||||
mock = []
|
||||
mock = ["dep:futures"]
|
||||
|
|
|
@ -72,7 +72,6 @@ impl CloudConfig {
|
|||
pub struct RPCConfig {
|
||||
pub socket_path: PathBuf,
|
||||
pub admin_group: Option<String>,
|
||||
pub events_sock: PathBuf,
|
||||
}
|
||||
|
||||
/// The root configuration struct.
|
||||
|
@ -99,7 +98,6 @@ impl Default for Config {
|
|||
rpc: RPCConfig {
|
||||
socket_path: PathBuf::from("/var/run/nazrin/nzrd.sock"),
|
||||
admin_group: None,
|
||||
events_sock: PathBuf::from("/var/run/nazrin/events.sock"),
|
||||
},
|
||||
db_uri: "sqlite:/var/lib/nazrin/main_sql.db".to_owned(),
|
||||
libvirt_uri: match std::env::var("LIBVIRT_URI") {
|
||||
|
|
|
@ -1,208 +0,0 @@
|
|||
use std::fmt;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tarpc::client::RpcError;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
pub enum ErrorType {
|
||||
/// Entity was not found.
|
||||
NotFound,
|
||||
/// Error occurred with a database call.
|
||||
Database,
|
||||
/// Error occurred in a libvirt call.
|
||||
VirtError,
|
||||
/// Error occurred while parsing input.
|
||||
Parse,
|
||||
/// An unknown API error occurred.
|
||||
Other,
|
||||
}
|
||||
|
||||
impl ErrorType {
|
||||
fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
ErrorType::NotFound => "Entity not found",
|
||||
ErrorType::Database => "Database error",
|
||||
ErrorType::VirtError => "libvirt error",
|
||||
ErrorType::Parse => "Unable to parse input",
|
||||
ErrorType::Other => "Unknown API error",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ErrorType {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.as_str().fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ApiError {
|
||||
error_type: ErrorType,
|
||||
message: Option<String>,
|
||||
inner: Option<InnerError>,
|
||||
}
|
||||
|
||||
impl fmt::Display for ApiError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.message
|
||||
.as_deref()
|
||||
.unwrap_or_else(|| self.error_type.as_str())
|
||||
.fmt(f)?;
|
||||
if let Some(inner) = &self.inner {
|
||||
write!(f, ": {inner}")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for ApiError {}
|
||||
|
||||
impl ApiError {
|
||||
pub fn new<E>(error_type: ErrorType, message: impl Into<String>, err: E) -> Self
|
||||
where
|
||||
E: std::error::Error,
|
||||
{
|
||||
Self {
|
||||
error_type,
|
||||
message: Some(message.into()),
|
||||
inner: Some(err.into()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn err_type(&self) -> ErrorType {
|
||||
self.error_type
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ErrorType> for ApiError {
|
||||
fn from(value: ErrorType) -> Self {
|
||||
Self {
|
||||
error_type: value,
|
||||
message: None,
|
||||
inner: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<T> for ApiError
|
||||
where
|
||||
T: AsRef<str>,
|
||||
{
|
||||
fn from(value: T) -> Self {
|
||||
let value = value.as_ref();
|
||||
Self {
|
||||
error_type: ErrorType::Other,
|
||||
message: Some(value.to_owned()),
|
||||
inner: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct InnerError {
|
||||
error_debug: String,
|
||||
error_message: String,
|
||||
}
|
||||
|
||||
impl fmt::Debug for InnerError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.error_debug.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for InnerError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.error_message.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E> From<E> for InnerError
|
||||
where
|
||||
E: std::error::Error,
|
||||
{
|
||||
fn from(value: E) -> Self {
|
||||
Self {
|
||||
error_debug: format!("{value:?}"),
|
||||
error_message: format!("{value}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ToApiResult<T> {
|
||||
/// Converts the result's error type to [`ApiError`] with [`ErrorType::Other`].
|
||||
///
|
||||
/// [`ApiError`]: nzr-api::error::ApiError
|
||||
/// [`ErrorType::Other`]: nzr-api::error::ErrorType::Other
|
||||
fn to_api(self) -> Result<T, ApiError>;
|
||||
/// Converts the result's error type to [`ApiError`] with
|
||||
/// [`ErrorType::Other`], and the given context.
|
||||
///
|
||||
/// [`ApiError`]: nzr-api::error::ApiError
|
||||
/// [`ErrorType::Other`]: nzr-api::error::ErrorType::Other
|
||||
fn to_api_with(self, context: impl AsRef<str>) -> Result<T, ApiError>;
|
||||
/// Converts the result's error type to [`ApiError`] with the given
|
||||
/// [`ErrorType`] and context.
|
||||
///
|
||||
/// [`ApiError`]: nzr-api::error::ApiError
|
||||
/// [`ErrorType`]: nzr-api::error::ErrorType
|
||||
fn to_api_with_type(self, err_type: ErrorType, context: impl AsRef<str>)
|
||||
-> Result<T, ApiError>;
|
||||
/// Converts the result's error type to [`ApiError`] with the given
|
||||
/// [`ErrorType`].
|
||||
///
|
||||
/// [`ApiError`]: nzr-api::error::ApiError
|
||||
/// [`ErrorType`]: nzr-api::error::ErrorType
|
||||
fn to_api_type(self, err_type: ErrorType) -> Result<T, ApiError>;
|
||||
}
|
||||
|
||||
impl<T, E> ToApiResult<T> for Result<T, E>
|
||||
where
|
||||
E: std::error::Error + 'static,
|
||||
{
|
||||
fn to_api(self) -> Result<T, ApiError> {
|
||||
self.map_err(|e| ApiError {
|
||||
error_type: ErrorType::Other,
|
||||
message: None,
|
||||
inner: Some(e.into()),
|
||||
})
|
||||
}
|
||||
|
||||
fn to_api_with(self, context: impl AsRef<str>) -> Result<T, ApiError> {
|
||||
self.map_err(|e| ApiError {
|
||||
error_type: ErrorType::Other,
|
||||
message: Some(context.as_ref().to_owned()),
|
||||
inner: Some(e.into()),
|
||||
})
|
||||
}
|
||||
|
||||
fn to_api_type(self, err_type: ErrorType) -> Result<T, ApiError> {
|
||||
self.map_err(|e| ApiError {
|
||||
error_type: err_type,
|
||||
message: None,
|
||||
inner: Some(e.into()),
|
||||
})
|
||||
}
|
||||
|
||||
fn to_api_with_type(
|
||||
self,
|
||||
err_type: ErrorType,
|
||||
context: impl AsRef<str>,
|
||||
) -> Result<T, ApiError> {
|
||||
self.map_err(|e| ApiError {
|
||||
error_type: err_type,
|
||||
message: Some(context.as_ref().to_owned()),
|
||||
inner: Some(e.into()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Simplify<T> {
|
||||
fn simplify(self) -> Result<T, ApiError>;
|
||||
}
|
||||
|
||||
impl<T> Simplify<T> for Result<Result<T, ApiError>, RpcError> {
|
||||
/// Flattens a Result of `RpcError` and `ApiError` to just `ApiError`.
|
||||
fn simplify(self) -> Result<T, ApiError> {
|
||||
self.to_api_with("RPC Error").and_then(|r| r)
|
||||
}
|
||||
}
|
|
@ -1,53 +0,0 @@
|
|||
use std::{pin::Pin, task::Poll};
|
||||
|
||||
use futures::{Stream, TryStreamExt};
|
||||
use tarpc::tokio_util::codec::{FramedRead, LengthDelimitedCodec};
|
||||
use tokio::io::AsyncRead;
|
||||
|
||||
use super::{EventError, EventMessage};
|
||||
|
||||
/// Client for receiving various events emitted by Nazrin.
|
||||
pub struct EventClient<T>
|
||||
where
|
||||
T: AsyncRead,
|
||||
{
|
||||
transport: Pin<Box<FramedRead<T, LengthDelimitedCodec>>>,
|
||||
}
|
||||
|
||||
impl<T> EventClient<T>
|
||||
where
|
||||
T: AsyncRead,
|
||||
{
|
||||
/// Creates a new EventClient.
|
||||
pub fn new(inner: T) -> Self {
|
||||
let transport = FramedRead::new(inner, LengthDelimitedCodec::new());
|
||||
Self {
|
||||
transport: Box::pin(transport),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Stream for EventClient<T>
|
||||
where
|
||||
T: AsyncRead,
|
||||
{
|
||||
type Item = Result<EventMessage, EventError>;
|
||||
|
||||
fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
match self.as_mut().transport.try_poll_next_unpin(cx) {
|
||||
Poll::Ready(res) => {
|
||||
let our_res = res.map(|res| {
|
||||
res.map_err(|e| e.into()).and_then(|bytes| {
|
||||
let msg: EventMessage = serde_json::from_slice(&bytes)?;
|
||||
Ok(msg)
|
||||
})
|
||||
});
|
||||
Poll::Ready(our_res)
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,77 +0,0 @@
|
|||
pub mod client;
|
||||
pub mod server;
|
||||
#[cfg(test)]
|
||||
mod test;
|
||||
|
||||
use std::io;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::model;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub enum ResourceAction {
|
||||
/// The referenced resource was created.
|
||||
Created,
|
||||
/// The referenced resource was deleted, and is no longer available.
|
||||
Deleted,
|
||||
/// The referenced resource was modified in some way.
|
||||
Modified,
|
||||
}
|
||||
|
||||
/// Represents an event pertaining to a specific action.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct ResourceEvent<T> {
|
||||
pub action: ResourceAction,
|
||||
/// The entity that was acted upon.
|
||||
pub entity: T,
|
||||
}
|
||||
|
||||
/// Represents any event that is emitted by Nazrin.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(tag = "event")]
|
||||
pub enum EventMessage {
|
||||
/// A subnet was created, modified, or deleted.
|
||||
Subnet(ResourceEvent<model::Subnet>),
|
||||
/// An instance was created, modified, or deleted.
|
||||
Instance(ResourceEvent<model::Instance>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum EventError {
|
||||
#[error("Transport error: {0}")]
|
||||
Transport(#[from] io::Error),
|
||||
#[error("Serialization error: {0}")]
|
||||
Json(#[from] serde_json::Error),
|
||||
}
|
||||
|
||||
pub trait Emittable {
|
||||
fn as_event(&self, action: ResourceAction) -> EventMessage;
|
||||
}
|
||||
|
||||
macro_rules! emittable {
|
||||
($t:ty, $msg:ident) => {
|
||||
impl Emittable for $t {
|
||||
fn as_event(&self, action: ResourceAction) -> EventMessage {
|
||||
EventMessage::$msg(ResourceEvent {
|
||||
action,
|
||||
entity: self.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
emittable!(model::Instance, Instance);
|
||||
emittable!(model::Subnet, Subnet);
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! nzr_event {
|
||||
($srv:expr, $act:ident, $ent:tt) => {{
|
||||
use $crate::event::Emittable;
|
||||
|
||||
$srv.emit($ent.as_event($crate::event::ResourceAction::$act))
|
||||
.await
|
||||
}};
|
||||
}
|
|
@ -1,191 +0,0 @@
|
|||
use futures::Future;
|
||||
use std::{fmt, io, pin::Pin};
|
||||
|
||||
use futures::SinkExt;
|
||||
use tarpc::tokio_util::codec::{FramedWrite, LengthDelimitedCodec};
|
||||
use tokio::{
|
||||
io::AsyncWrite,
|
||||
sync::broadcast::{self, Receiver, Sender},
|
||||
};
|
||||
use tracing::instrument;
|
||||
|
||||
use super::EventMessage;
|
||||
|
||||
/// Representation of multiple types of SocketAddrs, because you can't have just
|
||||
/// one!
|
||||
#[derive(Debug)]
|
||||
pub enum SocketAddr {
|
||||
#[cfg(unix)]
|
||||
TokioUnix(tokio::net::unix::SocketAddr),
|
||||
#[cfg(unix)]
|
||||
Unix(std::os::unix::net::SocketAddr),
|
||||
Net(std::net::SocketAddr),
|
||||
#[cfg(test)]
|
||||
None,
|
||||
}
|
||||
|
||||
impl fmt::Display for SocketAddr {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
#[cfg(unix)]
|
||||
Self::TokioUnix(addr) => std::fmt::Debug::fmt(addr, f),
|
||||
#[cfg(unix)]
|
||||
Self::Unix(addr) => std::fmt::Debug::fmt(addr, f),
|
||||
Self::Net(addr) => addr.fmt(f),
|
||||
#[cfg(test)]
|
||||
Self::None => write!(f, "mock client"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! as_sockaddr {
|
||||
($id:ident, $t:ty) => {
|
||||
impl From<$t> for SocketAddr {
|
||||
fn from(value: $t) -> Self {
|
||||
Self::$id(value)
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
as_sockaddr!(TokioUnix, tokio::net::unix::SocketAddr);
|
||||
#[cfg(unix)]
|
||||
as_sockaddr!(Unix, std::os::unix::net::SocketAddr);
|
||||
as_sockaddr!(Net, std::net::SocketAddr);
|
||||
|
||||
/// Represents a connection to a client. Instead of being owned by the server
|
||||
/// struct, a [`tokio::sync::broadcast::Receiver`] is used to get the serialized
|
||||
/// message and pass it to the client.
|
||||
///
|
||||
/// [`tokio::sync::broadcast::Receiver`]: tokio::sync::broadcast::Receiver
|
||||
struct EventEmitter<T>
|
||||
where
|
||||
T: AsyncWrite + Send + 'static,
|
||||
{
|
||||
transport: Pin<Box<FramedWrite<T, LengthDelimitedCodec>>>,
|
||||
client_addr: SocketAddr,
|
||||
channel: Receiver<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl<T> EventEmitter<T>
|
||||
where
|
||||
T: AsyncWrite + Send + 'static,
|
||||
{
|
||||
fn new(inner: T, client_addr: SocketAddr, channel: Receiver<Vec<u8>>) -> Self {
|
||||
let transport = FramedWrite::new(inner, LengthDelimitedCodec::new());
|
||||
Self {
|
||||
transport: Box::pin(transport),
|
||||
client_addr,
|
||||
channel,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self), fields(client = %self.client_addr))]
|
||||
async fn handler(&mut self) -> bool {
|
||||
match self.channel.recv().await {
|
||||
Ok(msg) => {
|
||||
if let Err(err) = self.transport.send(msg.into()).await {
|
||||
tracing::error!("Couldn't write to client: {err}");
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("IPC error: {err}");
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn run(mut self) {
|
||||
tokio::spawn(async move { while self.handler().await {} });
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles the creation and sending of events to clients.
|
||||
pub struct EventServer {
|
||||
channel: Sender<Vec<u8>>,
|
||||
}
|
||||
|
||||
// TODO: consider letting this be configurable
|
||||
const MAX_RECEIVERS: usize = 16;
|
||||
|
||||
impl EventServer {
|
||||
/// Creates a new EventServer.
|
||||
pub fn new() -> Self {
|
||||
let (channel, _) = broadcast::channel(MAX_RECEIVERS);
|
||||
Self { channel }
|
||||
}
|
||||
|
||||
/// Returns a future that returns [`Poll::Pending`] until the client count falls below the threshold.
|
||||
pub fn until_available(&self) -> EventServerAvailability<'_> {
|
||||
EventServerAvailability { parent: self }
|
||||
}
|
||||
|
||||
/// Whether we're able to take connections.
|
||||
#[inline]
|
||||
fn is_available(&self) -> bool {
|
||||
self.channel.receiver_count() < MAX_RECEIVERS
|
||||
}
|
||||
|
||||
/// Spawns a new [`EventEmitter`] where events will be sent to.
|
||||
pub async fn spawn<T: AsyncWrite + Send + 'static>(
|
||||
&self,
|
||||
inner: T,
|
||||
client_addr: impl Into<SocketAddr>,
|
||||
) -> io::Result<()> {
|
||||
// Sender<T> doesn't have a try_subscribe, so this is our last-ditch
|
||||
// effort to avoid a panic
|
||||
if !self.is_available() {
|
||||
return Err(io::Error::new(io::ErrorKind::Other, "Too many connections"));
|
||||
}
|
||||
|
||||
EventEmitter::new(inner, client_addr.into(), self.channel.subscribe()).run();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send the given event to all connected clients.
|
||||
pub async fn emit(&self, msg: EventMessage) {
|
||||
let bytes = match serde_json::to_vec(&msg) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(err) => {
|
||||
tracing::error!("Failed to serialize: {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if self.channel.send(bytes).is_err() {
|
||||
tracing::debug!("Tried to emit an event, but no clients were around to hear it");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for EventServer {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct EventServerAvailability<'a> {
|
||||
parent: &'a EventServer,
|
||||
}
|
||||
|
||||
impl<'a> Future for EventServerAvailability<'a> {
|
||||
type Output = ();
|
||||
|
||||
fn poll(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Self::Output> {
|
||||
use std::task::Poll;
|
||||
|
||||
if self.parent.is_available() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,43 +0,0 @@
|
|||
use std::str::FromStr;
|
||||
|
||||
use futures::StreamExt;
|
||||
|
||||
use crate::{
|
||||
event::{server::SocketAddr, EventMessage, ResourceAction},
|
||||
net::cidr::CidrV4,
|
||||
nzr_event,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn event_serde() {
|
||||
let (rx, tx) = tokio::io::duplex(1024);
|
||||
let server = super::server::EventServer::default();
|
||||
server.spawn(tx, SocketAddr::None).await.unwrap();
|
||||
let mut client = super::client::EventClient::new(rx);
|
||||
let net = CidrV4::from_str("192.0.2.0/24").unwrap();
|
||||
let some_subnet = crate::model::Subnet {
|
||||
name: "whatever".into(),
|
||||
data: crate::model::SubnetData {
|
||||
ifname: "eth0".into(),
|
||||
network: net,
|
||||
start_host: net.make_ip(10).unwrap(),
|
||||
end_host: net.make_ip(254).unwrap(),
|
||||
gateway4: Some(net.make_ip(1).unwrap()),
|
||||
dns: Vec::new(),
|
||||
domain_name: Some("homestarrunnner.net".parse().unwrap()),
|
||||
vlan_id: None,
|
||||
},
|
||||
};
|
||||
nzr_event!(server, Created, some_subnet);
|
||||
let next = client
|
||||
.next()
|
||||
.await
|
||||
.expect("client must receive message")
|
||||
.unwrap();
|
||||
|
||||
let EventMessage::Subnet(net) = next else {
|
||||
panic!("Unexpected event received: {next:?}");
|
||||
};
|
||||
|
||||
assert_eq!(net.action, ResourceAction::Created);
|
||||
}
|
|
@ -1,12 +1,9 @@
|
|||
use std::net::Ipv4Addr;
|
||||
|
||||
use error::ApiError;
|
||||
use model::{CreateStatus, Instance, SshPubkey, Subnet};
|
||||
|
||||
pub mod args;
|
||||
pub mod config;
|
||||
pub mod error;
|
||||
pub mod event;
|
||||
#[cfg(feature = "mock")]
|
||||
pub mod mock;
|
||||
pub mod model;
|
||||
|
@ -26,40 +23,40 @@ pub enum InstanceQuery {
|
|||
#[tarpc::service]
|
||||
pub trait Nazrin {
|
||||
/// Creates a new instance.
|
||||
async fn new_instance(build_args: args::NewInstance) -> Result<uuid::Uuid, ApiError>;
|
||||
async fn new_instance(build_args: args::NewInstance) -> Result<uuid::Uuid, String>;
|
||||
/// Poll for the current status of an instance being created.
|
||||
async fn poll_new_instance(task_id: uuid::Uuid) -> Option<CreateStatus>;
|
||||
/// Deletes an existing instance.
|
||||
///
|
||||
/// This should involve deleting all related disks and clearing
|
||||
/// the lease information from the subnet data, if any.
|
||||
async fn delete_instance(name: String) -> Result<(), ApiError>;
|
||||
async fn delete_instance(name: String) -> Result<(), String>;
|
||||
/// Gets a single instance by the given InstanceQuery.
|
||||
async fn find_instance(query: InstanceQuery) -> Result<Option<Instance>, ApiError>;
|
||||
async fn find_instance(query: InstanceQuery) -> Result<Option<Instance>, String>;
|
||||
/// Gets a list of existing instances.
|
||||
async fn get_instances(with_status: bool) -> Result<Vec<Instance>, ApiError>;
|
||||
async fn get_instances(with_status: bool) -> Result<Vec<Instance>, String>;
|
||||
/// Cleans up unusable entries in the database.
|
||||
async fn garbage_collect() -> Result<(), ApiError>;
|
||||
async fn garbage_collect() -> Result<(), String>;
|
||||
/// Creates a new subnet.
|
||||
///
|
||||
/// Unlike instances, subnets shouldn't perform any changes to the
|
||||
/// interfaces they reference. This should be used primarily for
|
||||
/// ease-of-use and bookkeeping (e.g., assigning dynamic leases).
|
||||
async fn new_subnet(build_args: Subnet) -> Result<Subnet, ApiError>;
|
||||
async fn new_subnet(build_args: Subnet) -> Result<Subnet, String>;
|
||||
/// Modifies an existing subnet.
|
||||
async fn modify_subnet(edit_args: Subnet) -> Result<Subnet, ApiError>;
|
||||
async fn modify_subnet(edit_args: Subnet) -> Result<Subnet, String>;
|
||||
/// Gets a list of existing subnets.
|
||||
async fn get_subnets() -> Result<Vec<Subnet>, ApiError>;
|
||||
async fn get_subnets() -> Result<Vec<Subnet>, String>;
|
||||
/// Deletes an existing subnet.
|
||||
async fn delete_subnet(interface: String) -> Result<(), ApiError>;
|
||||
async fn delete_subnet(interface: String) -> Result<(), String>;
|
||||
/// Gets the cloud-init user-data for the given instance.
|
||||
async fn get_instance_userdata(id: i32) -> Result<Vec<u8>, ApiError>;
|
||||
async fn get_instance_userdata(id: i32) -> Result<Vec<u8>, String>;
|
||||
/// Gets all SSH keys stored in the database.
|
||||
async fn get_ssh_pubkeys() -> Result<Vec<SshPubkey>, ApiError>;
|
||||
async fn get_ssh_pubkeys() -> Result<Vec<SshPubkey>, String>;
|
||||
/// Adds a new SSH public key to the database.
|
||||
async fn add_ssh_pubkey(pub_key: String) -> Result<SshPubkey, ApiError>;
|
||||
async fn add_ssh_pubkey(pub_key: String) -> Result<SshPubkey, String>;
|
||||
/// Deletes an SSH public key from the database.
|
||||
async fn delete_ssh_pubkey(id: i32) -> Result<(), ApiError>;
|
||||
async fn delete_ssh_pubkey(id: i32) -> Result<(), String>;
|
||||
}
|
||||
|
||||
/// Create a new NazrinClient.
|
||||
|
|
|
@ -1,20 +1,20 @@
|
|||
use std::net::Ipv4Addr;
|
||||
|
||||
use crate::{args, error::ApiError, model, net::cidr::CidrV4};
|
||||
use crate::{args, model, net::cidr::CidrV4};
|
||||
|
||||
pub trait NzrClientExt {
|
||||
#[allow(async_fn_in_trait)]
|
||||
async fn new_mock_instance(
|
||||
&mut self,
|
||||
name: impl AsRef<str>,
|
||||
) -> Result<Result<model::Instance, ApiError>, crate::RpcError>;
|
||||
) -> Result<Result<model::Instance, String>, crate::RpcError>;
|
||||
}
|
||||
|
||||
impl NzrClientExt for crate::NazrinClient {
|
||||
async fn new_mock_instance(
|
||||
&mut self,
|
||||
name: impl AsRef<str>,
|
||||
) -> Result<Result<model::Instance, ApiError>, crate::RpcError> {
|
||||
) -> Result<Result<model::Instance, String>, crate::RpcError> {
|
||||
let name = name.as_ref().to_owned();
|
||||
|
||||
let subnet = self
|
||||
|
|
|
@ -10,7 +10,6 @@ use futures::{future, StreamExt};
|
|||
use tokio::{sync::RwLock, task::JoinHandle};
|
||||
|
||||
use crate::{
|
||||
error::{ApiError, ErrorType},
|
||||
model,
|
||||
net::{cidr::CidrV4, mac::MacAddr},
|
||||
InstanceQuery, Nazrin, NazrinClient,
|
||||
|
@ -63,14 +62,14 @@ impl Nazrin for MockServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
build_args: crate::args::NewInstance,
|
||||
) -> Result<uuid::Uuid, ApiError> {
|
||||
) -> Result<uuid::Uuid, String> {
|
||||
let mut db = self.db.write().await;
|
||||
let Some(net_pos) = db
|
||||
.subnets
|
||||
.iter()
|
||||
.position(|s| s.as_ref().filter(|s| s.name == build_args.subnet).is_some())
|
||||
else {
|
||||
return Err("Subnet doesn't exist".into());
|
||||
return Err("Subnet doesn't exist".to_owned());
|
||||
};
|
||||
let subnet = db.subnets[net_pos].as_ref().unwrap().clone();
|
||||
let cur_lease = *(db
|
||||
|
@ -135,11 +134,7 @@ impl Nazrin for MockServer {
|
|||
}
|
||||
}
|
||||
|
||||
async fn delete_instance(
|
||||
self,
|
||||
_: tarpc::context::Context,
|
||||
name: String,
|
||||
) -> Result<(), ApiError> {
|
||||
async fn delete_instance(self, _: tarpc::context::Context, name: String) -> Result<(), String> {
|
||||
let mut db = self.db.write().await;
|
||||
let Some(inst) = db
|
||||
.instances
|
||||
|
@ -147,7 +142,7 @@ impl Nazrin for MockServer {
|
|||
.find(|i| i.as_ref().filter(|i| i.name == name).is_some())
|
||||
.take()
|
||||
else {
|
||||
return Err("Instance doesn't exist".into());
|
||||
return Err("Instance doesn't exist".to_owned());
|
||||
};
|
||||
inst.take();
|
||||
Ok(())
|
||||
|
@ -157,7 +152,7 @@ impl Nazrin for MockServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
query: crate::InstanceQuery,
|
||||
) -> Result<Option<crate::model::Instance>, ApiError> {
|
||||
) -> Result<Option<crate::model::Instance>, String> {
|
||||
let db = self.db.read().await;
|
||||
|
||||
let res = {
|
||||
|
@ -182,7 +177,7 @@ impl Nazrin for MockServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
id: i32,
|
||||
) -> Result<Vec<u8>, ApiError> {
|
||||
) -> Result<Vec<u8>, String> {
|
||||
let db = self.db.read().await;
|
||||
let Some(inst) = db
|
||||
.instances
|
||||
|
@ -190,7 +185,7 @@ impl Nazrin for MockServer {
|
|||
.find(|i| i.as_ref().map(|i| i.id == id).is_some())
|
||||
.and_then(|o| o.as_ref())
|
||||
else {
|
||||
return Err("No such instance".into());
|
||||
return Err("No such instance".to_owned());
|
||||
};
|
||||
Ok(db.ci_userdatas.get(&inst.name).cloned().unwrap_or_default())
|
||||
}
|
||||
|
@ -199,7 +194,7 @@ impl Nazrin for MockServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
_with_status: bool,
|
||||
) -> Result<Vec<crate::model::Instance>, ApiError> {
|
||||
) -> Result<Vec<crate::model::Instance>, String> {
|
||||
let db = self.db.read().await;
|
||||
Ok(db
|
||||
.instances
|
||||
|
@ -212,7 +207,7 @@ impl Nazrin for MockServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
build_args: crate::model::Subnet,
|
||||
) -> Result<crate::model::Subnet, ApiError> {
|
||||
) -> Result<crate::model::Subnet, String> {
|
||||
let mut db = self.db.write().await;
|
||||
let subnet = build_args.clone();
|
||||
db.subnets.push(Some(build_args));
|
||||
|
@ -223,14 +218,14 @@ impl Nazrin for MockServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
_edit_args: crate::model::Subnet,
|
||||
) -> Result<crate::model::Subnet, ApiError> {
|
||||
) -> Result<crate::model::Subnet, String> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn get_subnets(
|
||||
self,
|
||||
_: tarpc::context::Context,
|
||||
) -> Result<Vec<crate::model::Subnet>, ApiError> {
|
||||
) -> Result<Vec<crate::model::Subnet>, String> {
|
||||
let db = self.db.read().await;
|
||||
Ok(db.subnets.iter().filter_map(|net| net.clone()).collect())
|
||||
}
|
||||
|
@ -239,40 +234,35 @@ impl Nazrin for MockServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
interface: String,
|
||||
) -> Result<(), ApiError> {
|
||||
) -> Result<(), String> {
|
||||
let mut db = self.db.write().await;
|
||||
{
|
||||
let Some(subnet) = db
|
||||
.subnets
|
||||
.iter_mut()
|
||||
.find(|net| net.as_ref().filter(|n| n.name == interface).is_some())
|
||||
else {
|
||||
return Err(ErrorType::NotFound.into());
|
||||
};
|
||||
subnet.take();
|
||||
}
|
||||
// Drop all instances that belong to this subnet
|
||||
db.instances.iter_mut().for_each(|inst| {
|
||||
if inst
|
||||
.as_mut()
|
||||
.filter(|inst| inst.lease.subnet != interface)
|
||||
.is_some()
|
||||
{
|
||||
inst.take();
|
||||
}
|
||||
});
|
||||
db.instances
|
||||
.iter()
|
||||
.filter_map(|inst| inst.as_ref())
|
||||
.for_each(|inst| {
|
||||
if inst.lease.subnet == interface {
|
||||
todo!("what now")
|
||||
}
|
||||
});
|
||||
let Some(subnet) = db
|
||||
.subnets
|
||||
.iter_mut()
|
||||
.find(|net| net.as_ref().filter(|n| n.name == interface).is_some())
|
||||
else {
|
||||
return Err("Subnet doesn't exist".to_owned());
|
||||
};
|
||||
subnet.take();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn garbage_collect(self, _: tarpc::context::Context) -> Result<(), ApiError> {
|
||||
// no libvirt to compare against, no instances to GC
|
||||
Ok(())
|
||||
async fn garbage_collect(self, _: tarpc::context::Context) -> Result<(), String> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn get_ssh_pubkeys(
|
||||
self,
|
||||
_: tarpc::context::Context,
|
||||
) -> Result<Vec<model::SshPubkey>, ApiError> {
|
||||
) -> Result<Vec<model::SshPubkey>, String> {
|
||||
let db = self.db.read().await;
|
||||
|
||||
Ok(db
|
||||
|
@ -286,7 +276,7 @@ impl Nazrin for MockServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
pub_key: String,
|
||||
) -> Result<model::SshPubkey, ApiError> {
|
||||
) -> Result<model::SshPubkey, String> {
|
||||
let mut key_model = model::SshPubkey::from_str(&pub_key).map_err(|e| e.to_string())?;
|
||||
let mut db = self.db.write().await;
|
||||
key_model.id = Some(db.ssh_keys.len() as i32);
|
||||
|
@ -294,7 +284,7 @@ impl Nazrin for MockServer {
|
|||
Ok(key_model)
|
||||
}
|
||||
|
||||
async fn delete_ssh_pubkey(self, _: tarpc::context::Context, id: i32) -> Result<(), ApiError> {
|
||||
async fn delete_ssh_pubkey(self, _: tarpc::context::Context, id: i32) -> Result<(), String> {
|
||||
let mut db = self.db.write().await;
|
||||
if let Some(key) = db.ssh_keys.get_mut(id as usize) {
|
||||
key.take();
|
||||
|
|
|
@ -5,10 +5,7 @@ use serde::{Deserialize, Serialize};
|
|||
use std::{fmt, net::Ipv4Addr};
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::{
|
||||
error::ApiError,
|
||||
net::{cidr::CidrV4, mac::MacAddr},
|
||||
};
|
||||
use crate::net::{cidr::CidrV4, mac::MacAddr};
|
||||
|
||||
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
|
||||
#[repr(u32)]
|
||||
|
@ -70,7 +67,7 @@ impl fmt::Display for DomainState {
|
|||
pub struct CreateStatus {
|
||||
pub status_text: String,
|
||||
pub completion: f32,
|
||||
pub result: Option<Result<Instance, ApiError>>,
|
||||
pub result: Option<Result<Instance, String>>,
|
||||
}
|
||||
|
||||
/// Struct representing a VM instance.
|
||||
|
|
|
@ -26,8 +26,9 @@ tarpc = { version = "0.34", features = [
|
|||
] }
|
||||
|
||||
# Logging
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
# TODO: switch to tracing?
|
||||
log = "0.4.17"
|
||||
syslog = "7"
|
||||
|
||||
# Database
|
||||
diesel = { version = "2.2", features = [
|
||||
|
|
|
@ -1 +1,23 @@
|
|||
pub mod net;
|
||||
pub mod vm;
|
||||
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CommandError(String);
|
||||
|
||||
impl fmt::Display for CommandError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for CommandError {}
|
||||
|
||||
macro_rules! cmd_error {
|
||||
($($arg:tt)*) => {
|
||||
Box::new(CommandError(format!($($arg)*)))
|
||||
};
|
||||
}
|
||||
|
||||
pub(crate) use cmd_error;
|
||||
|
|
47
nzrd/src/cmd/net.rs
Normal file
47
nzrd/src/cmd/net.rs
Normal file
|
@ -0,0 +1,47 @@
|
|||
use super::*;
|
||||
use crate::ctx::Context;
|
||||
use crate::model::tx::Transaction;
|
||||
use crate::model::Subnet;
|
||||
use nzr_api::model;
|
||||
|
||||
pub async fn add_subnet(
|
||||
ctx: &Context,
|
||||
args: model::Subnet,
|
||||
) -> Result<Subnet, Box<dyn std::error::Error>> {
|
||||
let subnet = {
|
||||
let s = Subnet::insert(ctx, args.name, args.data)
|
||||
.await
|
||||
.map_err(|er| cmd_error!("Couldn't generate subnet: {}", er))?;
|
||||
Transaction::begin(ctx, s)
|
||||
};
|
||||
|
||||
if let Err(err) = ctx.zones.new_zone(&subnet).await {
|
||||
Err(cmd_error!("Failed to create new DNS zone: {}", err))
|
||||
} else {
|
||||
Ok(subnet.take())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_subnet(
|
||||
ctx: &Context,
|
||||
name: impl AsRef<str>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
match Subnet::get_by_name(ctx, name.as_ref())
|
||||
.await
|
||||
.map_err(|er| cmd_error!("Couldn't find subnet: {}", er))?
|
||||
{
|
||||
Some(subnet) => {
|
||||
if let Some(domain_name) = &subnet.domain_name {
|
||||
ctx.zones.delete_zone(domain_name).await;
|
||||
}
|
||||
|
||||
subnet
|
||||
.delete(ctx)
|
||||
.await
|
||||
.map_err(|er| cmd_error!("Couldn't fully delete subnet entry: {}", er))
|
||||
}
|
||||
None => Err(cmd_error!("Subnet not found")),
|
||||
}?;
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -1,4 +1,3 @@
|
|||
use nzr_api::error::{ApiError, ErrorType, ToApiResult};
|
||||
use nzr_api::net::cidr::CidrV4;
|
||||
use nzr_virt::error::DomainError;
|
||||
use nzr_virt::xml::build::DomainBuilder;
|
||||
|
@ -6,14 +5,14 @@ use nzr_virt::xml::{self, InfoMap, SerialType, Sysinfo};
|
|||
use nzr_virt::{datasize, dom, vol};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::*;
|
||||
use crate::ctrl::vm::Progress;
|
||||
use crate::ctx::Context;
|
||||
use crate::model::tx::Transaction;
|
||||
use crate::model::{Instance, Subnet};
|
||||
use log::{debug, info, warn};
|
||||
use nzr_api::args;
|
||||
use nzr_api::net::mac::MacAddr;
|
||||
use nzr_api::{args, model, nzr_event};
|
||||
use std::sync::Arc;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
const VIRT_MAC_OUI: &[u8] = &[0x02, 0xf1, 0x0f];
|
||||
|
||||
|
@ -30,21 +29,23 @@ pub async fn new_instance(
|
|||
ctx: Context,
|
||||
prog_task: Arc<RwLock<Progress>>,
|
||||
args: &args::NewInstance,
|
||||
) -> Result<(Instance, dom::Domain), ApiError> {
|
||||
) -> Result<(Instance, dom::Domain), Box<dyn std::error::Error>> {
|
||||
progress!(prog_task, 0.0, "Starting...");
|
||||
// find the subnet corresponding to the interface
|
||||
let subnet = Subnet::get_by_name(&ctx, &args.subnet)
|
||||
.await
|
||||
.to_api_with("Unable to get interface")?
|
||||
.ok_or::<ApiError>(format!("Subnet {} wasn't found in database", &args.subnet).into())?;
|
||||
.map_err(|er| cmd_error!("Unable to get interface: {}", er))?
|
||||
.ok_or(cmd_error!(
|
||||
"Subnet {} wasn't found in database",
|
||||
&args.subnet
|
||||
))?;
|
||||
|
||||
// bail if a domain already exists
|
||||
if let Ok(dom) = ctx.virt.conn.get_instance(&args.name).await {
|
||||
Err(format!(
|
||||
Err(cmd_error!(
|
||||
"Domain with name already exists (uuid {})",
|
||||
dom.xml().await.uuid,
|
||||
)
|
||||
.into())
|
||||
))
|
||||
} else {
|
||||
// make sure the base image exists
|
||||
let mut base_image = ctx
|
||||
|
@ -53,7 +54,7 @@ pub async fn new_instance(
|
|||
.baseimg
|
||||
.volume(&args.base_image)
|
||||
.await
|
||||
.to_api_with("Couldn't find base image")?;
|
||||
.map_err(|er| cmd_error!("Couldn't find base image: {}", er))?;
|
||||
progress!(prog_task, 10.0, "Generating metadata...");
|
||||
|
||||
// generate a new lease with a new MAC addr
|
||||
|
@ -61,23 +62,19 @@ pub async fn new_instance(
|
|||
let bytes = [VIRT_MAC_OUI, rand::random::<[u8; 3]>().as_ref()].concat();
|
||||
MacAddr::from_bytes(bytes)
|
||||
}
|
||||
.to_api_with("Unable to create a new MAC address")?;
|
||||
.map_err(|er| cmd_error!("Unable to create a new MAC address: {}", er))?;
|
||||
|
||||
// Get highest host addr + 1 for our new addr
|
||||
let addr = {
|
||||
let addr_num = Instance::all_in_subnet(&ctx, &subnet)
|
||||
.await
|
||||
.to_api_with("Couldn't get instances in subnet")?
|
||||
.await?
|
||||
.into_iter()
|
||||
.max_by(|a, b| a.host_num.cmp(&b.host_num))
|
||||
.map_or(subnet.start_host, |i| i.host_num + 1);
|
||||
if addr_num > subnet.end_host || addr_num < subnet.start_host {
|
||||
return Err("Got invalid lease address for instance".into());
|
||||
Err(cmd_error!("Got invalid lease address for instance"))?;
|
||||
}
|
||||
let addr = subnet
|
||||
.network
|
||||
.make_ip(addr_num as u32)
|
||||
.to_api_with("Unable to generate instance IP")?;
|
||||
let addr = subnet.network.make_ip(addr_num as u32)?;
|
||||
CidrV4::new(addr, subnet.network.cidr())
|
||||
};
|
||||
|
||||
|
@ -88,12 +85,7 @@ pub async fn new_instance(
|
|||
};
|
||||
|
||||
// generate cloud-init data
|
||||
let db_inst = {
|
||||
let inst = Instance::insert(&ctx, &args.name, &subnet, lease.clone(), None)
|
||||
.await
|
||||
.to_api_type(ErrorType::Database)?;
|
||||
Transaction::begin(&ctx, inst)
|
||||
};
|
||||
let db_inst = Instance::insert(&ctx, &args.name, &subnet, lease.clone(), None).await?;
|
||||
|
||||
progress!(prog_task, 30.0, "Creating instance images...");
|
||||
// create primary volume from base image
|
||||
|
@ -104,7 +96,7 @@ pub async fn new_instance(
|
|||
datasize!((args.disk_sizes.0) GiB),
|
||||
)
|
||||
.await
|
||||
.to_api_with("Failed to clone base image")?;
|
||||
.map_err(|er| cmd_error!("Failed to clone base image: {}", er))?;
|
||||
|
||||
// and, if it exists: the second volume
|
||||
let sec_vol = match args.disk_sizes.1 {
|
||||
|
@ -112,11 +104,7 @@ pub async fn new_instance(
|
|||
let voldata =
|
||||
// TODO: Fix VolType
|
||||
xml::Volume::new(&args.name, xml::VolType::Qcow2, datasize!(sec_size GiB));
|
||||
Some(
|
||||
vol::Volume::create(&ctx.virt.pools.secondary, voldata, 0)
|
||||
.await
|
||||
.to_api_with("Couldn't create secondary volume")?,
|
||||
)
|
||||
Some(vol::Volume::create(&ctx.virt.pools.secondary, voldata, 0).await?)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
@ -180,20 +168,15 @@ pub async fn new_instance(
|
|||
.build()
|
||||
};
|
||||
|
||||
let mut virt_dom = ctx
|
||||
.virt
|
||||
.conn
|
||||
.define_instance(dom_xml)
|
||||
.await
|
||||
.to_api_with("Couldn't define libvirt instance")?;
|
||||
let mut virt_dom = ctx.virt.conn.define_instance(dom_xml).await?;
|
||||
|
||||
// not a fatal error, we can set autostart afterward
|
||||
if let Err(err) = virt_dom.autostart(true).await {
|
||||
warn!("Couldn't set autostart for domain: {err}");
|
||||
if let Err(er) = virt_dom.autostart(true).await {
|
||||
warn!("Couldn't set autostart for domain: {}", er);
|
||||
}
|
||||
|
||||
if let Err(err) = virt_dom.start().await {
|
||||
warn!("Domain defined, but couldn't be started! Error: {err}");
|
||||
if let Err(er) = virt_dom.start().await {
|
||||
warn!("Domain defined, but couldn't be started! Error: {}", er);
|
||||
}
|
||||
|
||||
// set all volumes to persistent to avoid deletion
|
||||
|
@ -205,76 +188,40 @@ pub async fn new_instance(
|
|||
|
||||
progress!(prog_task, 80.0, "Domain created!");
|
||||
debug!("Domain {} created!", virt_dom.xml().await.name.as_str());
|
||||
Ok((db_inst.take(), virt_dom))
|
||||
Ok((db_inst, virt_dom))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_instance(
|
||||
ctx: Context,
|
||||
name: String,
|
||||
) -> Result<Option<model::Instance>, ApiError> {
|
||||
let Some(inst_db) = Instance::get_by_name(&ctx, &name)
|
||||
.await
|
||||
.to_api_with_type(ErrorType::Database, "Couldn't find instance")?
|
||||
else {
|
||||
return Err(ErrorType::NotFound.into());
|
||||
};
|
||||
let api_model = match inst_db.api_model(&ctx).await {
|
||||
Ok(model) => Some(model),
|
||||
Err(err) => {
|
||||
warn!("Couldn't get API model to notify clients: {err}");
|
||||
None
|
||||
}
|
||||
pub async fn delete_instance(ctx: Context, name: String) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let Some(inst_db) = Instance::get_by_name(&ctx, &name).await? else {
|
||||
return Err(cmd_error!("Instance {name} not found"));
|
||||
};
|
||||
// First, destroy the instance
|
||||
match ctx.virt.conn.get_instance(name.clone()).await {
|
||||
Ok(mut inst) => {
|
||||
inst.stop().await.to_api_with("Couldn't stop instance")?;
|
||||
inst.undefine(true)
|
||||
.await
|
||||
.to_api_with("Couldn't undefine instance")?;
|
||||
inst.stop().await?;
|
||||
inst.undefine(true).await?;
|
||||
}
|
||||
Err(DomainError::DomainNotFound) => {
|
||||
warn!("Deleting instance that exists in DB but not libvirt");
|
||||
}
|
||||
Err(err) => Err(ApiError::new(
|
||||
nzr_api::error::ErrorType::VirtError,
|
||||
"Couldn't get instance from libvirt",
|
||||
err,
|
||||
))?,
|
||||
Err(err) => Err(err)?,
|
||||
}
|
||||
// Then, delete the DB entity
|
||||
inst_db
|
||||
.delete(&ctx)
|
||||
.await
|
||||
.to_api_with("Couldn't delete from database")?;
|
||||
inst_db.delete(&ctx).await?;
|
||||
|
||||
Ok(api_model)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete all instances that don't have a matching libvirt domain
|
||||
pub async fn prune_instances(ctx: &Context) -> Result<(), Box<dyn std::error::Error>> {
|
||||
for entity in Instance::all(ctx).await? {
|
||||
if let Err(DomainError::DomainNotFound) = ctx.virt.conn.get_instance(&entity.name).await {
|
||||
info!("Invalid domain {}, deleting", &entity.name);
|
||||
// First, get the API model to notify clients with
|
||||
let api_model = match entity.api_model(ctx).await {
|
||||
Ok(ent) => Some(ent),
|
||||
Err(err) => {
|
||||
warn!("Couldn't get api model to notify clients: {err}");
|
||||
None
|
||||
if let Err(err) = ctx.virt.conn.get_instance(&entity.name).await {
|
||||
if err == DomainError::DomainNotFound {
|
||||
info!("Invalid domain {}, deleting", &entity.name);
|
||||
let name = entity.name.clone();
|
||||
if let Err(err) = entity.delete(ctx).await {
|
||||
warn!("Couldn't delete {}: {}", name, err);
|
||||
}
|
||||
};
|
||||
|
||||
// then, delete by name
|
||||
let name = entity.name.clone();
|
||||
if let Err(err) = entity.delete(ctx).await {
|
||||
warn!("Couldn't delete {}: {}", name, err);
|
||||
}
|
||||
|
||||
// and assuming all goes well, notify clients
|
||||
if let Some(ent) = api_model {
|
||||
nzr_event!(ctx.events, Deleted, ent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,11 +3,13 @@ use diesel::{
|
|||
SqliteConnection,
|
||||
};
|
||||
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
|
||||
use log::trace;
|
||||
use nzr_virt::{vol, Connection};
|
||||
use std::ops::Deref;
|
||||
use thiserror::Error;
|
||||
|
||||
use nzr_api::{config::Config, event::server::EventServer};
|
||||
use crate::dns::ZoneData;
|
||||
use nzr_api::config::Config;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -40,8 +42,8 @@ impl Deref for Context {
|
|||
pub struct InnerCtx {
|
||||
pub sqldb: diesel::r2d2::Pool<ConnectionManager<SqliteConnection>>,
|
||||
pub config: Config,
|
||||
pub zones: crate::dns::ZoneData,
|
||||
pub virt: VirtCtx,
|
||||
pub events: Arc<EventServer>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
|
@ -58,6 +60,7 @@ pub enum ContextError {
|
|||
|
||||
impl InnerCtx {
|
||||
async fn new(config: Config) -> Result<Self, ContextError> {
|
||||
let zones = ZoneData::new(&config.dns);
|
||||
let conn = Connection::open(&config.libvirt_uri)?;
|
||||
|
||||
let pools = PoolRefs {
|
||||
|
@ -66,7 +69,7 @@ impl InnerCtx {
|
|||
baseimg: conn.get_pool(&config.storage.base_image_pool).await?,
|
||||
};
|
||||
|
||||
tracing::trace!("Connecting to database");
|
||||
trace!("Connecting to database");
|
||||
let db_uri = config.db_uri.clone();
|
||||
let sqldb = tokio::task::spawn_blocking(|| {
|
||||
let manager = ConnectionManager::<SqliteConnection>::new(db_uri);
|
||||
|
@ -77,7 +80,7 @@ impl InnerCtx {
|
|||
.unwrap()?;
|
||||
|
||||
{
|
||||
tracing::trace!("Running pending migrations");
|
||||
trace!("Running pending migrations");
|
||||
let mut conn = sqldb.get()?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
conn.run_pending_migrations(MIGRATIONS)
|
||||
|
@ -87,13 +90,11 @@ impl InnerCtx {
|
|||
.unwrap()?;
|
||||
}
|
||||
|
||||
let events = Arc::new(EventServer::new());
|
||||
|
||||
Ok(Self {
|
||||
sqldb,
|
||||
config,
|
||||
zones,
|
||||
virt: VirtCtx { conn, pools },
|
||||
events,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
use crate::model::Subnet;
|
||||
use log::*;
|
||||
use nzr_api::config::DNSConfig;
|
||||
use std::borrow::Borrow;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::net::Ipv4Addr;
|
||||
use std::ops::Deref;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
|
||||
use nzr_api::model::{Instance, SubnetData};
|
||||
|
||||
use hickory_proto::rr::Name;
|
||||
use hickory_server::authority::{AuthorityObject, Catalog};
|
||||
use hickory_server::proto::rr::{rdata::soa, RData, RecordSet};
|
||||
|
@ -69,7 +70,7 @@ pub struct InnerZD {
|
|||
}
|
||||
|
||||
pub fn make_rectree_with_soa(name: &Name, config: &DNSConfig) -> BTreeMap<RrKey, RecordSet> {
|
||||
tracing::debug!("Creating initial SOA for {}", &name);
|
||||
debug!("Creating initial SOA for {}", &name);
|
||||
let mut records: BTreeMap<RrKey, RecordSet> = BTreeMap::new();
|
||||
let soa_key = RrKey::new(
|
||||
LowerName::from(name),
|
||||
|
@ -118,28 +119,24 @@ impl InnerZD {
|
|||
}
|
||||
|
||||
/// Creates a new DNS zone for the given subnet.
|
||||
pub async fn new_zone(
|
||||
&self,
|
||||
zone_id: impl AsRef<str>,
|
||||
subnet: &SubnetData,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
pub async fn new_zone(&self, subnet: &Subnet) -> Result<(), Box<dyn std::error::Error>> {
|
||||
if let Some(name) = &subnet.domain_name {
|
||||
let rectree = make_rectree_with_soa(name, &self.config);
|
||||
let name: Name = name.parse()?;
|
||||
let rectree = make_rectree_with_soa(&name, &self.config);
|
||||
let auth = InMemoryAuthority::new(
|
||||
name.clone(),
|
||||
name,
|
||||
rectree,
|
||||
hickory_server::authority::ZoneType::Primary,
|
||||
false,
|
||||
)?;
|
||||
self.import(zone_id.as_ref(), auth).await;
|
||||
self.import(&subnet.ifname.to_string(), auth).await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Generates a zone with the given records.
|
||||
async fn import(&self, name: &str, auth: InMemoryAuthority) {
|
||||
pub async fn import(&self, name: &str, auth: InMemoryAuthority) {
|
||||
let auth_arc = Arc::new(auth);
|
||||
tracing::debug!(
|
||||
log::debug!(
|
||||
"Importing {} with {} records...",
|
||||
name,
|
||||
auth_arc.records().await.len()
|
||||
|
@ -162,23 +159,26 @@ impl InnerZD {
|
|||
}
|
||||
|
||||
/// Adds a new host record in the DNS zone.
|
||||
pub async fn new_record(&self, inst: &Instance) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let hostname = Name::from_str(&inst.name)?;
|
||||
pub async fn new_record(
|
||||
&self,
|
||||
interface: &str,
|
||||
name: &str,
|
||||
addr: Ipv4Addr,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let hostname = Name::from_str(name)?;
|
||||
let zones = self.map.lock().await;
|
||||
let zone = zones.get(&inst.lease.subnet).unwrap_or(&self.default_zone);
|
||||
let zone = zones.get(interface).unwrap_or(&self.default_zone);
|
||||
let fqdn = {
|
||||
let origin: Name = zone.origin().into();
|
||||
hostname.append_domain(&origin)?
|
||||
};
|
||||
|
||||
tracing::debug!(
|
||||
log::debug!(
|
||||
"Creating new host entry {} in zone {}...",
|
||||
&fqdn,
|
||||
zone.origin()
|
||||
);
|
||||
|
||||
let addr = inst.lease.addr.addr;
|
||||
|
||||
let record = Record::from_rdata(fqdn, 3600, RData::A(addr.into()));
|
||||
zone.upsert(record, 0).await;
|
||||
self.catalog()
|
||||
|
@ -189,10 +189,14 @@ impl InnerZD {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete_record(&self, inst: &Instance) -> Result<bool, Box<dyn std::error::Error>> {
|
||||
let hostname = Name::from_str(&inst.name)?;
|
||||
pub async fn delete_record(
|
||||
&self,
|
||||
interface: &str,
|
||||
name: &str,
|
||||
) -> Result<bool, Box<dyn std::error::Error>> {
|
||||
let hostname = Name::from_str(name)?;
|
||||
let mut zones = self.map.lock().await;
|
||||
if let Some(zone) = zones.get_mut(&inst.lease.subnet) {
|
||||
if let Some(zone) = zones.get_mut(interface) {
|
||||
let hostname: LowerName = hostname.into();
|
||||
self.catalog.0.write().await.remove(&hostname);
|
||||
let key = RrKey::new(hostname, hickory_server::proto::rr::RecordType::A);
|
|
@ -1,16 +0,0 @@
|
|||
use std::io;
|
||||
|
||||
use tokio::net::UnixListener;
|
||||
|
||||
use crate::ctx::Context;
|
||||
|
||||
pub async fn event_server(ctx: Context) -> io::Result<()> {
|
||||
let sock = UnixListener::bind(&ctx.config.rpc.events_sock)?;
|
||||
|
||||
loop {
|
||||
// Wait until we have an available slot for a connection
|
||||
ctx.events.until_available().await;
|
||||
let (client, addr) = sock.accept().await?;
|
||||
ctx.events.spawn(client, addr).await?;
|
||||
}
|
||||
}
|
|
@ -1,41 +1,79 @@
|
|||
mod cmd;
|
||||
mod ctrl;
|
||||
mod ctx;
|
||||
mod event;
|
||||
mod dns;
|
||||
mod model;
|
||||
mod rpc;
|
||||
|
||||
use std::str::FromStr;
|
||||
|
||||
use hickory_server::ServerFuture;
|
||||
use log::LevelFilter;
|
||||
use log::*;
|
||||
use model::{Instance, Subnet};
|
||||
use nzr_api::config;
|
||||
use std::{net::IpAddr, str::FromStr};
|
||||
use tokio::net::UdpSocket;
|
||||
|
||||
#[tokio::main(flavor = "multi_thread")]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let cfg: config::Config = config::Config::figment().extract()?;
|
||||
let ctx = ctx::Context::new(cfg).await?;
|
||||
|
||||
let mut bad_loglevel = false;
|
||||
let log_level = tracing::Level::from_str(&ctx.config.log_level).unwrap_or_else(|_| {
|
||||
bad_loglevel = true;
|
||||
tracing::Level::WARN
|
||||
});
|
||||
syslog::init_unix(
|
||||
syslog::Facility::LOG_DAEMON,
|
||||
LevelFilter::from_str(ctx.config.log_level.as_str())?,
|
||||
)?;
|
||||
|
||||
tracing_subscriber::fmt().with_max_level(log_level).init();
|
||||
info!("Hydrating initial zones...");
|
||||
for subnet in Subnet::all(&ctx).await? {
|
||||
// A records
|
||||
if let Err(err) = ctx.zones.new_zone(&subnet).await {
|
||||
error!("Couldn't create zone for {}: {}", &subnet.ifname, err);
|
||||
continue;
|
||||
}
|
||||
match Instance::all_in_subnet(&ctx, &subnet).await {
|
||||
Ok(leases) => {
|
||||
for lease in leases {
|
||||
let Ok(lease_addr) = subnet.network.make_ip(lease.host_num as u32) else {
|
||||
warn!("Ignoring {} due to lease address issue", &lease.name);
|
||||
continue;
|
||||
};
|
||||
|
||||
if bad_loglevel {
|
||||
tracing::warn!("Couldn't parse log level from config, defaulting to {log_level}");
|
||||
if let Err(err) = ctx
|
||||
.zones
|
||||
.new_record(&subnet.ifname.to_string(), &lease.name, lease_addr)
|
||||
.await
|
||||
{
|
||||
error!(
|
||||
"Failed to set up lease for {} in {}: {}",
|
||||
&lease.name, &subnet.ifname, err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Couldn't get leases for {}: {}", &subnet.ifname, err);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run both the RPC and events servers
|
||||
// DNS init
|
||||
let mut dns_listener = ServerFuture::new(ctx.zones.catalog());
|
||||
let dns_socket = {
|
||||
let dns_ip: IpAddr = ctx.config.dns.listen_addr.parse()?;
|
||||
UdpSocket::bind((dns_ip, ctx.config.dns.port)).await?
|
||||
};
|
||||
dns_listener.register_socket(dns_socket);
|
||||
|
||||
tokio::select! {
|
||||
res = rpc::serve(ctx.clone()) => {
|
||||
if let Err(err) = res {
|
||||
tracing::error!("RPC server error: {err}");
|
||||
error!("Error from RPC: {}", err);
|
||||
}
|
||||
},
|
||||
res = event::event_server(ctx.clone()) => {
|
||||
res = dns_listener.block_until_done() => {
|
||||
if let Err(err) = res {
|
||||
tracing::error!("Event server error: {err}");
|
||||
error!("Error from DNS: {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,14 +20,12 @@ use tx::Transactable;
|
|||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ModelError {
|
||||
#[error("{0}")]
|
||||
#[error("Database error occurred: {0}")]
|
||||
Db(#[from] diesel::result::Error),
|
||||
#[error("Database pool error ({0})")]
|
||||
#[error("Unable to get database handle: {0}")]
|
||||
Pool(#[from] diesel::r2d2::PoolError),
|
||||
#[error("{0}")]
|
||||
Cidr(#[from] cidr::Error),
|
||||
#[error("Instance belongs to a subnet that has since disappeared")]
|
||||
NoSubnet,
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
|
@ -249,7 +247,10 @@ impl Instance {
|
|||
|
||||
/// Creates an [nzr_api::model::Instance] from the information available in
|
||||
/// the database.
|
||||
pub async fn api_model(&self, ctx: &Context) -> Result<nzr_api::model::Instance, ModelError> {
|
||||
pub async fn api_model(
|
||||
&self,
|
||||
ctx: &Context,
|
||||
) -> Result<nzr_api::model::Instance, Box<dyn std::error::Error>> {
|
||||
let netid = self.subnet_id;
|
||||
let Some(subnet) = ctx
|
||||
.spawn_db(move |mut db| Subnet::table().find(netid).load::<Subnet>(&mut db))
|
||||
|
@ -257,7 +258,7 @@ impl Instance {
|
|||
.into_iter()
|
||||
.next()
|
||||
else {
|
||||
return Err(ModelError::NoSubnet);
|
||||
todo!("something went horribly wrong");
|
||||
};
|
||||
|
||||
Ok(nzr_api::model::Instance {
|
||||
|
@ -403,7 +404,7 @@ impl Subnet {
|
|||
// the API.
|
||||
Ok(addr) => Some(addr),
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
log::error!(
|
||||
"Error parsing DNS server '{}' for {}: {}",
|
||||
s,
|
||||
&self.name,
|
||||
|
@ -415,7 +416,7 @@ impl Subnet {
|
|||
.collect(),
|
||||
domain_name: self.domain_name.as_ref().map(|s| {
|
||||
Name::from_str(s).unwrap_or_else(|e| {
|
||||
tracing::error!("Error parsing DNS name for {}: {}", &self.name, e);
|
||||
log::error!("Error parsing DNS name for {}: {}", &self.name, e);
|
||||
Name::default()
|
||||
})
|
||||
}),
|
||||
|
|
|
@ -48,7 +48,7 @@ impl<'a, T: Transactable> Drop for Transaction<'a, T> {
|
|||
let ctx = self.ctx.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = inner.undo_tx(&ctx).await {
|
||||
tracing::error!("Error undoing transaction: {err}");
|
||||
log::error!("Error undoing transaction: {err}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
164
nzrd/src/rpc.rs
164
nzrd/src/rpc.rs
|
@ -1,6 +1,5 @@
|
|||
use futures::{future, StreamExt};
|
||||
use nzr_api::error::{ApiError, ErrorType, ToApiResult};
|
||||
use nzr_api::{args, model, nzr_event, InstanceQuery, Nazrin};
|
||||
use nzr_api::{args, model, InstanceQuery, Nazrin};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use tarpc::server::{BaseChannel, Channel};
|
||||
|
@ -14,8 +13,8 @@ use uuid::Uuid;
|
|||
use crate::cmd;
|
||||
use crate::ctx::Context;
|
||||
use crate::model::{Instance, SshPubkey, Subnet};
|
||||
use log::*;
|
||||
use std::collections::HashMap;
|
||||
use tracing::*;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NzrServer {
|
||||
|
@ -37,7 +36,7 @@ impl Nazrin for NzrServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
build_args: args::NewInstance,
|
||||
) -> Result<uuid::Uuid, ApiError> {
|
||||
) -> Result<uuid::Uuid, String> {
|
||||
let progress = Arc::new(RwLock::new(crate::ctrl::vm::Progress {
|
||||
status_text: "Starting...".to_owned(),
|
||||
percentage: 0.0,
|
||||
|
@ -45,11 +44,13 @@ impl Nazrin for NzrServer {
|
|||
let prog_task = progress.clone();
|
||||
let build_task = tokio::spawn(async move {
|
||||
let (inst, dom) =
|
||||
cmd::vm::new_instance(self.ctx.clone(), prog_task.clone(), &build_args).await?;
|
||||
cmd::vm::new_instance(self.ctx.clone(), prog_task.clone(), &build_args)
|
||||
.await
|
||||
.map_err(|e| format!("Instance creation failed: {}", e))?;
|
||||
let mut api_model = inst
|
||||
.api_model(&self.ctx)
|
||||
.await
|
||||
.to_api_with("Couldn't generate API response")?;
|
||||
.map_err(|e| format!("Couldn't generate API response: {e}"))?;
|
||||
match dom.state().await {
|
||||
Ok(state) => {
|
||||
api_model.state = state.into();
|
||||
|
@ -58,9 +59,6 @@ impl Nazrin for NzrServer {
|
|||
warn!("Unable to get instance state: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
// Inform event listeners
|
||||
nzr_event!(self.ctx.events, Created, api_model);
|
||||
Ok(api_model)
|
||||
});
|
||||
|
||||
|
@ -96,7 +94,7 @@ impl Nazrin for NzrServer {
|
|||
Some(
|
||||
task.inner
|
||||
.await
|
||||
.to_api_with("Task failed with panic")
|
||||
.map_err(|err| format!("Task failed with panic: {}", err))
|
||||
.and_then(|res| res),
|
||||
)
|
||||
} else {
|
||||
|
@ -110,16 +108,10 @@ impl Nazrin for NzrServer {
|
|||
})
|
||||
}
|
||||
|
||||
async fn delete_instance(
|
||||
self,
|
||||
_: tarpc::context::Context,
|
||||
name: String,
|
||||
) -> Result<(), ApiError> {
|
||||
let api_model = cmd::vm::delete_instance(self.ctx.clone(), name).await?;
|
||||
|
||||
if let Some(api_model) = api_model {
|
||||
nzr_event!(self.ctx.events, Deleted, api_model);
|
||||
}
|
||||
async fn delete_instance(self, _: tarpc::context::Context, name: String) -> Result<(), String> {
|
||||
cmd::vm::delete_instance(self.ctx.clone(), name)
|
||||
.await
|
||||
.map_err(|e| format!("Couldn't delete instance: {}", e))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -127,16 +119,18 @@ impl Nazrin for NzrServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
query: nzr_api::InstanceQuery,
|
||||
) -> Result<Option<model::Instance>, ApiError> {
|
||||
) -> Result<Option<model::Instance>, String> {
|
||||
let res = match query {
|
||||
InstanceQuery::Name(name) => Instance::get_by_name(&self.ctx, name).await,
|
||||
InstanceQuery::MacAddr(addr) => Instance::get_by_mac(&self.ctx, addr).await,
|
||||
InstanceQuery::Ipv4Addr(addr) => Instance::get_by_ip4(&self.ctx, addr).await,
|
||||
}
|
||||
.to_api()?;
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
if let Some(inst) = res {
|
||||
inst.api_model(&self.ctx).await.to_api().map(Some)
|
||||
inst.api_model(&self.ctx)
|
||||
.await
|
||||
.map_or_else(|e| Err(e.to_string()), |m| Ok(Some(m)))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
|
@ -146,10 +140,10 @@ impl Nazrin for NzrServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
with_status: bool,
|
||||
) -> Result<Vec<model::Instance>, ApiError> {
|
||||
) -> Result<Vec<model::Instance>, String> {
|
||||
let db_models = Instance::all(&self.ctx)
|
||||
.await
|
||||
.to_api_type(ErrorType::Database)?;
|
||||
.map_err(|e| format!("Unable to get all instances: {e}"))?;
|
||||
let mut models = Vec::new();
|
||||
for inst in db_models {
|
||||
let mut api_model = match inst.api_model(&self.ctx).await {
|
||||
|
@ -187,39 +181,34 @@ impl Nazrin for NzrServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
build_args: model::Subnet,
|
||||
) -> Result<model::Subnet, ApiError> {
|
||||
let subnet = Subnet::insert(&self.ctx, build_args.name, build_args.data)
|
||||
) -> Result<model::Subnet, String> {
|
||||
cmd::net::add_subnet(&self.ctx, build_args)
|
||||
.await
|
||||
.to_api_type(ErrorType::Database)?
|
||||
.map_err(|e| e.to_string())?
|
||||
.api_model()
|
||||
.to_api_with("Unable to generate API model")?;
|
||||
|
||||
// inform event listeners
|
||||
nzr_event!(self.ctx.events, Created, subnet);
|
||||
Ok(subnet)
|
||||
.map_err(|e| e.to_string())
|
||||
}
|
||||
|
||||
async fn modify_subnet(
|
||||
self,
|
||||
_: tarpc::context::Context,
|
||||
edit_args: model::Subnet,
|
||||
) -> Result<model::Subnet, ApiError> {
|
||||
) -> Result<model::Subnet, String> {
|
||||
if let Some(subnet) = Subnet::get_by_name(&self.ctx, &edit_args.name)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?
|
||||
{
|
||||
Err("Modifying subnets not yet supported".into())
|
||||
todo!("support updating Subnets")
|
||||
} else {
|
||||
Err(ErrorType::NotFound.into())
|
||||
Err(format!("Subnet {} not found", &edit_args.name))
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_subnets(self, _: tarpc::context::Context) -> Result<Vec<model::Subnet>, ApiError> {
|
||||
Subnet::all(&self.ctx)
|
||||
.await
|
||||
.to_api_with("Couldn't get list of subnets")
|
||||
.map(|v| {
|
||||
v.into_iter()
|
||||
async fn get_subnets(self, _: tarpc::context::Context) -> Result<Vec<model::Subnet>, String> {
|
||||
Subnet::all(&self.ctx).await.map_or_else(
|
||||
|e| Err(e.to_string()),
|
||||
|v| {
|
||||
Ok(v.into_iter()
|
||||
.filter_map(|s| match s.api_model() {
|
||||
Ok(model) => Some(model),
|
||||
Err(err) => {
|
||||
|
@ -227,43 +216,23 @@ impl Nazrin for NzrServer {
|
|||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.collect())
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
async fn delete_subnet(
|
||||
self,
|
||||
_: tarpc::context::Context,
|
||||
subnet_name: String,
|
||||
) -> Result<(), ApiError> {
|
||||
if let Some(subnet) = Subnet::get_by_name(&self.ctx, subnet_name)
|
||||
) -> Result<(), String> {
|
||||
cmd::net::delete_subnet(&self.ctx, &subnet_name)
|
||||
.await
|
||||
.to_api_type(ErrorType::Database)?
|
||||
{
|
||||
let api_model = match subnet.api_model() {
|
||||
Ok(model) => Some(model),
|
||||
Err(err) => {
|
||||
tracing::error!("Unable to generate model for clients: {err}");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
subnet
|
||||
.delete(&self.ctx)
|
||||
.await
|
||||
.to_api_type(ErrorType::Database)?;
|
||||
|
||||
if let Some(api_model) = api_model {
|
||||
nzr_event!(&self.ctx.events, Deleted, api_model);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ErrorType::NotFound.into())
|
||||
}
|
||||
.map_err(|e| e.to_string())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn garbage_collect(self, _: tarpc::context::Context) -> Result<(), ApiError> {
|
||||
async fn garbage_collect(self, _: tarpc::context::Context) -> Result<(), String> {
|
||||
cmd::vm::prune_instances(&self.ctx)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
@ -274,52 +243,50 @@ impl Nazrin for NzrServer {
|
|||
self,
|
||||
_: tarpc::context::Context,
|
||||
id: i32,
|
||||
) -> Result<Vec<u8>, ApiError> {
|
||||
if let Some(db_model) = Instance::get(&self.ctx, id)
|
||||
) -> Result<Vec<u8>, String> {
|
||||
let Some(db_model) = Instance::get(&self.ctx, id)
|
||||
.await
|
||||
.to_api_type(ErrorType::Database)?
|
||||
{
|
||||
Ok(db_model.ci_userdata.unwrap_or_default())
|
||||
} else {
|
||||
Err(ErrorType::NotFound.into())
|
||||
}
|
||||
.map_err(|e| e.to_string())?
|
||||
else {
|
||||
return Err("Instance doesn't exist".to_owned());
|
||||
};
|
||||
|
||||
Ok(db_model.ci_userdata.unwrap_or_default())
|
||||
}
|
||||
|
||||
async fn get_ssh_pubkeys(
|
||||
self,
|
||||
_: tarpc::context::Context,
|
||||
) -> Result<Vec<model::SshPubkey>, ApiError> {
|
||||
SshPubkey::all(&self.ctx)
|
||||
.await
|
||||
.to_api_type(ErrorType::Database)
|
||||
.map(|k| k.iter().map(|k| k.api_model()).collect())
|
||||
) -> Result<Vec<model::SshPubkey>, String> {
|
||||
SshPubkey::all(&self.ctx).await.map_or_else(
|
||||
|e| Err(e.to_string()),
|
||||
|k| Ok(k.iter().map(|k| k.api_model()).collect()),
|
||||
)
|
||||
}
|
||||
|
||||
async fn add_ssh_pubkey(
|
||||
self,
|
||||
_: tarpc::context::Context,
|
||||
pub_key: String,
|
||||
) -> Result<model::SshPubkey, ApiError> {
|
||||
let pubkey = model::SshPubkey::from_str(&pub_key).to_api_type(ErrorType::Parse)?;
|
||||
) -> Result<model::SshPubkey, String> {
|
||||
let pubkey = model::SshPubkey::from_str(&pub_key).map_err(|e| e.to_string())?;
|
||||
|
||||
SshPubkey::insert(&self.ctx, pubkey.algorithm, pubkey.key_data, pubkey.comment)
|
||||
.await
|
||||
.to_api_type(ErrorType::Database)
|
||||
.map_err(|e| e.to_string())
|
||||
.map(|k| k.api_model())
|
||||
}
|
||||
|
||||
async fn delete_ssh_pubkey(self, _: tarpc::context::Context, id: i32) -> Result<(), ApiError> {
|
||||
if let Some(key) = SshPubkey::get(&self.ctx, id)
|
||||
async fn delete_ssh_pubkey(self, _: tarpc::context::Context, id: i32) -> Result<(), String> {
|
||||
let Some(key) = SshPubkey::get(&self.ctx, id)
|
||||
.await
|
||||
.to_api_type(ErrorType::Database)?
|
||||
{
|
||||
key.delete(&self.ctx)
|
||||
.await
|
||||
.to_api_type(ErrorType::Database)?;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ErrorType::NotFound.into())
|
||||
}
|
||||
.map_err(|e| e.to_string())?
|
||||
else {
|
||||
return Err("SSH key with ID doesn't exist".into());
|
||||
};
|
||||
|
||||
key.delete(&self.ctx).await.map_err(|e| e.to_string())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -357,6 +324,7 @@ pub async fn serve(ctx: Context) -> Result<(), Box<dyn std::error::Error>> {
|
|||
debug!("Listening for new connection...");
|
||||
let (conn, _addr) = listener.accept().await?;
|
||||
let ctx = ctx.clone();
|
||||
// hack?
|
||||
tokio::spawn(async move {
|
||||
let framed = codec_builder.new_framed(conn);
|
||||
let transport = tarpc::serde_transport::new(framed, Bincode::default());
|
||||
|
@ -372,6 +340,6 @@ pub async fn serve(ctx: Context) -> Result<(), Box<dyn std::error::Error>> {
|
|||
}
|
||||
|
||||
struct InstCreateStatus {
|
||||
inner: JoinHandle<Result<model::Instance, ApiError>>,
|
||||
inner: JoinHandle<Result<model::Instance, String>>,
|
||||
progress: Arc<RwLock<crate::ctrl::vm::Progress>>,
|
||||
}
|
||||
|
|
|
@ -125,7 +125,11 @@ async fn handle_message(ctx: &Context, from: SocketAddr, msg: &Message) {
|
|||
|
||||
{
|
||||
let opts = response.opts_mut();
|
||||
let giaddr = msg.giaddr();
|
||||
let giaddr = if msg.giaddr().is_unspecified() {
|
||||
todo!("no relay??")
|
||||
} else {
|
||||
msg.giaddr()
|
||||
};
|
||||
|
||||
opts.insert(DhcpOption::ServerIdentifier(giaddr));
|
||||
if let Some(time) = lease_time {
|
||||
|
|
|
@ -1,15 +0,0 @@
|
|||
[package]
|
||||
name = "nzrdns"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
|
||||
nzr-api = { path = "../nzr-api" }
|
||||
hickory-server = "0.24"
|
||||
hickory-proto = { version = "0.24", features = ["serde-config"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
async-trait = "0.1"
|
||||
futures = "0.3"
|
||||
anyhow = "1"
|
|
@ -1,169 +0,0 @@
|
|||
use std::{net::IpAddr, process::ExitCode};
|
||||
|
||||
use anyhow::Context;
|
||||
use dns::ZoneData;
|
||||
use futures::StreamExt;
|
||||
use hickory_server::ServerFuture;
|
||||
use nzr_api::{
|
||||
config::Config,
|
||||
event::{client::EventClient, EventMessage, ResourceAction},
|
||||
NazrinClient,
|
||||
};
|
||||
use tokio::{
|
||||
io::AsyncRead,
|
||||
net::{UdpSocket, UnixStream},
|
||||
};
|
||||
|
||||
mod dns;
|
||||
|
||||
/// Function to handle incoming events from Nazrin and update the DNS database
|
||||
/// accordingly.
|
||||
async fn event_handler<T: AsyncRead>(zones: ZoneData, mut events: EventClient<T>) {
|
||||
while let Some(event) = events.next().await {
|
||||
match event {
|
||||
Ok(EventMessage::Instance(event)) => {
|
||||
let ent = &event.entity;
|
||||
match event.action {
|
||||
ResourceAction::Created => {
|
||||
if let Err(err) = zones.new_record(ent).await {
|
||||
tracing::error!("Unable to add record {}: {err}", ent.name);
|
||||
}
|
||||
}
|
||||
ResourceAction::Deleted => {
|
||||
if let Err(err) = zones.delete_record(ent).await {
|
||||
tracing::error!("Unable to delete record {}: {err}", ent.name);
|
||||
}
|
||||
}
|
||||
misc => {
|
||||
tracing::debug!("ignoring instance action {misc:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(EventMessage::Subnet(event)) => {
|
||||
let ent = &event.entity;
|
||||
match event.action {
|
||||
ResourceAction::Created => {
|
||||
if let Some(name) = ent.data.domain_name.as_ref() {
|
||||
if let Err(err) = zones.new_zone(&ent.name, &ent.data).await {
|
||||
tracing::error!("Unable to add zone {name}: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
ResourceAction::Deleted => {
|
||||
if ent.data.domain_name.as_ref().is_some() {
|
||||
zones.delete_zone(&ent.name).await;
|
||||
}
|
||||
}
|
||||
misc => {
|
||||
tracing::debug!("ignoring subnet action {misc:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("Error getting events: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::warn!("No more events! (did Nazrin shut down?)");
|
||||
}
|
||||
|
||||
/// Hydrates all existing DNS zones.
|
||||
async fn hydrate_zones(zones: ZoneData, api_client: NazrinClient) -> anyhow::Result<()> {
|
||||
tracing::info!("Hydrating initial zones...");
|
||||
let subnets = api_client
|
||||
.get_subnets(nzr_api::default_ctx())
|
||||
.await
|
||||
.context("RPC error getting subnets")?
|
||||
.map_err(|e| anyhow::anyhow!("API error getting subnets: {e}"))?;
|
||||
|
||||
let instances = api_client
|
||||
.get_instances(nzr_api::default_ctx(), false)
|
||||
.await
|
||||
.context("RPC error getting instances")?
|
||||
.map_err(|e| anyhow::anyhow!("API error getting instances: {e}"))?;
|
||||
|
||||
for subnet in subnets {
|
||||
if let Err(err) = zones.new_zone(&subnet.name, &subnet.data).await {
|
||||
tracing::warn!("Couldn't create zone for {}: {err}", &subnet.name);
|
||||
}
|
||||
}
|
||||
|
||||
for instance in instances {
|
||||
if let Err(err) = zones.new_record(&instance).await {
|
||||
tracing::warn!("Couldn't create zone entry for {}: {err}", &instance.name);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> ExitCode {
|
||||
tracing_subscriber::fmt::init();
|
||||
let cfg: Config = match Config::figment().extract() {
|
||||
Ok(cfg) => cfg,
|
||||
Err(err) => {
|
||||
tracing::error!("Error parsing config: {err}");
|
||||
return ExitCode::FAILURE;
|
||||
}
|
||||
};
|
||||
|
||||
let api_client = {
|
||||
let sock = match UnixStream::connect(&cfg.rpc.socket_path).await {
|
||||
Ok(sock) => sock,
|
||||
Err(err) => {
|
||||
tracing::error!("Connection to nzrd failed: {err}");
|
||||
return ExitCode::FAILURE;
|
||||
}
|
||||
};
|
||||
nzr_api::new_client(sock)
|
||||
};
|
||||
let events = {
|
||||
let sock = match UnixStream::connect(&cfg.rpc.events_sock).await {
|
||||
Ok(sock) => sock,
|
||||
Err(err) => {
|
||||
tracing::error!("Connections to events stream failed: {err}");
|
||||
return ExitCode::FAILURE;
|
||||
}
|
||||
};
|
||||
nzr_api::event::client::EventClient::new(sock)
|
||||
};
|
||||
|
||||
let zones = ZoneData::new(&cfg.dns);
|
||||
|
||||
if let Err(err) = hydrate_zones(zones.clone(), api_client.clone()).await {
|
||||
tracing::error!("{err}");
|
||||
return ExitCode::FAILURE;
|
||||
}
|
||||
|
||||
let mut dns_listener = ServerFuture::new(zones.catalog());
|
||||
let dns_socket = {
|
||||
let Ok(dns_ip) = cfg.dns.listen_addr.parse::<IpAddr>() else {
|
||||
tracing::error!("Unable to parse listen_addr");
|
||||
return ExitCode::FAILURE;
|
||||
};
|
||||
|
||||
match UdpSocket::bind((dns_ip, cfg.dns.port)).await {
|
||||
Ok(sock) => sock,
|
||||
Err(err) => {
|
||||
tracing::error!("Couldn't bind to {dns_ip}:{}: {err}", cfg.dns.port);
|
||||
return ExitCode::FAILURE;
|
||||
}
|
||||
}
|
||||
};
|
||||
dns_listener.register_socket(dns_socket);
|
||||
|
||||
tokio::select! {
|
||||
_ = event_handler(zones.clone(), events) => {
|
||||
// nothing to do here
|
||||
},
|
||||
res = dns_listener.block_until_done() => {
|
||||
if let Err(err) = res {
|
||||
tracing::error!("Error from DNS: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ExitCode::SUCCESS
|
||||
}
|
|
@ -57,7 +57,7 @@ impl Context {
|
|||
}
|
||||
|
||||
pub async fn get_sshkeys(&self) -> Result<Vec<SshPubkey>> {
|
||||
// We don't cache SSH keys, so always get from the API server
|
||||
// TODO: do we cache SSH keys? I don't like the idea of it
|
||||
let ssh_keys = self
|
||||
.api_client
|
||||
.get_ssh_pubkeys(nzr_api::default_ctx())
|
||||
|
|
|
@ -41,6 +41,7 @@ async fn get_meta_data(
|
|||
Ok(Some(inst)) => {
|
||||
let meta = Metadata {
|
||||
inst_name: &inst.name,
|
||||
// XXX: this is very silly imo
|
||||
ssh_pubkeys: ssh_pubkeys.iter().collect(),
|
||||
};
|
||||
|
||||
|
@ -98,7 +99,7 @@ async fn get_vendor_data(
|
|||
// admin username from an unknown instance.
|
||||
if let IpAddr::V4(ip) = addr.ip() {
|
||||
match ctx.get_instance(ip).await {
|
||||
Ok(Some(_)) => {
|
||||
Ok(_) => {
|
||||
let data = model::VendorData {
|
||||
username: Some(&ctx.cfg().cloud.admin_user),
|
||||
};
|
||||
|
@ -107,14 +108,14 @@ async fn get_vendor_data(
|
|||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
})
|
||||
}
|
||||
Ok(None) => {
|
||||
tracing::warn!("Request from unregistered server {ip}");
|
||||
Err(StatusCode::FORBIDDEN)
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("{err}");
|
||||
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
_ => {
|
||||
tracing::warn!("Request from unregistered server {ip}");
|
||||
Err(StatusCode::FORBIDDEN)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Err(StatusCode::BAD_REQUEST)
|
||||
|
|
Loading…
Reference in a new issue