Compare commits
No commits in common. "40532c9e36443fe38775adadd15b383ec7f17c8d" and "0cb3aea62e27278a9db885c101f9bdfe90d4f96b" have entirely different histories.
40532c9e36
...
0cb3aea62e
30 changed files with 406 additions and 1127 deletions
75
Cargo.lock
generated
75
Cargo.lock
generated
|
@ -1046,6 +1046,17 @@ dependencies = [
|
||||||
"windows-sys 0.52.0",
|
"windows-sys 0.52.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hostname"
|
||||||
|
version = "0.4.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"libc",
|
||||||
|
"windows",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "http"
|
name = "http"
|
||||||
version = "1.1.0"
|
version = "1.1.0"
|
||||||
|
@ -1521,6 +1532,15 @@ dependencies = [
|
||||||
"autocfg",
|
"autocfg",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "num_threads"
|
||||||
|
version = "0.1.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nzr"
|
name = "nzr"
|
||||||
version = "0.9.0"
|
version = "0.9.0"
|
||||||
|
@ -1548,12 +1568,9 @@ dependencies = [
|
||||||
"log",
|
"log",
|
||||||
"regex",
|
"regex",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
|
||||||
"tarpc",
|
"tarpc",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-serde 0.9.0",
|
|
||||||
"tracing",
|
|
||||||
"uuid",
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -1587,6 +1604,7 @@ dependencies = [
|
||||||
"home",
|
"home",
|
||||||
"libc",
|
"libc",
|
||||||
"libsqlite3-sys",
|
"libsqlite3-sys",
|
||||||
|
"log",
|
||||||
"nix",
|
"nix",
|
||||||
"nzr-api",
|
"nzr-api",
|
||||||
"nzr-virt",
|
"nzr-virt",
|
||||||
|
@ -1598,13 +1616,12 @@ dependencies = [
|
||||||
"serde_with",
|
"serde_with",
|
||||||
"serde_yaml",
|
"serde_yaml",
|
||||||
"stdext",
|
"stdext",
|
||||||
|
"syslog",
|
||||||
"tarpc",
|
"tarpc",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-serde 0.9.0",
|
"tokio-serde 0.9.0",
|
||||||
"tracing",
|
|
||||||
"tracing-subscriber",
|
|
||||||
"trait-variant",
|
"trait-variant",
|
||||||
"uuid",
|
"uuid",
|
||||||
"zerocopy",
|
"zerocopy",
|
||||||
|
@ -1624,21 +1641,6 @@ dependencies = [
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "nzrdns"
|
|
||||||
version = "0.1.0"
|
|
||||||
dependencies = [
|
|
||||||
"anyhow",
|
|
||||||
"async-trait",
|
|
||||||
"futures",
|
|
||||||
"hickory-proto",
|
|
||||||
"hickory-server",
|
|
||||||
"nzr-api",
|
|
||||||
"tokio",
|
|
||||||
"tracing",
|
|
||||||
"tracing-subscriber",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "object"
|
name = "object"
|
||||||
version = "0.36.2"
|
version = "0.36.2"
|
||||||
|
@ -2289,6 +2291,18 @@ version = "1.0.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394"
|
checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "syslog"
|
||||||
|
version = "7.0.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "019f1500a13379b7d051455df397c75770de6311a7a188a699499502704d9f10"
|
||||||
|
dependencies = [
|
||||||
|
"hostname",
|
||||||
|
"libc",
|
||||||
|
"log",
|
||||||
|
"time",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tabled"
|
name = "tabled"
|
||||||
version = "0.15.0"
|
version = "0.15.0"
|
||||||
|
@ -2412,7 +2426,9 @@ checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"deranged",
|
"deranged",
|
||||||
"itoa",
|
"itoa",
|
||||||
|
"libc",
|
||||||
"num-conv",
|
"num-conv",
|
||||||
|
"num_threads",
|
||||||
"powerfmt",
|
"powerfmt",
|
||||||
"serde",
|
"serde",
|
||||||
"time-core",
|
"time-core",
|
||||||
|
@ -2926,6 +2942,25 @@ version = "0.4.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "windows"
|
||||||
|
version = "0.52.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be"
|
||||||
|
dependencies = [
|
||||||
|
"windows-core",
|
||||||
|
"windows-targets",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "windows-core"
|
||||||
|
version = "0.52.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
|
||||||
|
dependencies = [
|
||||||
|
"windows-targets",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows-sys"
|
name = "windows-sys"
|
||||||
version = "0.42.0"
|
version = "0.42.0"
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
[workspace]
|
[workspace]
|
||||||
members = ["nzrd", "nzr-api", "client", "nzrdhcp", "nzr-virt", "omyacid", "nzrdns"]
|
members = ["nzrd", "nzr-api", "client", "nzrdhcp", "nzr-virt", "omyacid"]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
use clap::{CommandFactory, FromArgMatches, Parser, Subcommand};
|
use clap::{CommandFactory, FromArgMatches, Parser, Subcommand};
|
||||||
use nzr_api::config;
|
use nzr_api::config;
|
||||||
use nzr_api::error::Simplify;
|
|
||||||
use nzr_api::hickory_proto::rr::Name;
|
use nzr_api::hickory_proto::rr::Name;
|
||||||
use nzr_api::model;
|
use nzr_api::model;
|
||||||
use nzr_api::net::cidr::CidrV4;
|
use nzr_api::net::cidr::CidrV4;
|
||||||
|
@ -333,9 +332,9 @@ async fn handle_command() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let mut net = client
|
let mut net = client
|
||||||
.get_subnets(nzr_api::default_ctx())
|
.get_subnets(nzr_api::default_ctx())
|
||||||
.await
|
.await
|
||||||
.simplify()
|
.map_err(|e| e.to_string())
|
||||||
.and_then(|res| {
|
.and_then(|res| {
|
||||||
res.iter()
|
res?.iter()
|
||||||
.find_map(|ent| {
|
.find_map(|ent| {
|
||||||
if ent.name == args.name {
|
if ent.name == args.name {
|
||||||
Some(ent.clone())
|
Some(ent.clone())
|
||||||
|
@ -343,7 +342,7 @@ async fn handle_command() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.ok_or_else(|| format!("Couldn't find network {}", &args.name).into())
|
.ok_or_else(|| format!("Couldn't find network {}", &args.name))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// merge in the new args
|
// merge in the new args
|
||||||
|
@ -366,11 +365,15 @@ async fn handle_command() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// run the update
|
// run the update
|
||||||
let net = client
|
client
|
||||||
.modify_subnet(nzr_api::default_ctx(), net)
|
.modify_subnet(nzr_api::default_ctx(), net)
|
||||||
.await
|
.await
|
||||||
.simplify()?;
|
.map_err(|err| format!("RPC error: {}", err))
|
||||||
println!("Subnet {} updated.", net.name);
|
.and_then(|res| {
|
||||||
|
res.map(|e| {
|
||||||
|
println!("Subnet {} updated.", e.name);
|
||||||
|
})
|
||||||
|
})?;
|
||||||
}
|
}
|
||||||
NetCmd::Dump { name } => {
|
NetCmd::Dump { name } => {
|
||||||
let subnets = (client.get_subnets(nzr_api::default_ctx()).await?)?;
|
let subnets = (client.get_subnets(nzr_api::default_ctx()).await?)?;
|
||||||
|
|
|
@ -17,17 +17,14 @@ uuid = { version = "1.2.2", features = ["serde"] }
|
||||||
hickory-proto = { version = "0.24", features = ["serde-config"] }
|
hickory-proto = { version = "0.24", features = ["serde-config"] }
|
||||||
log = "0.4.17"
|
log = "0.4.17"
|
||||||
diesel = { version = "2.2", optional = true }
|
diesel = { version = "2.2", optional = true }
|
||||||
futures = "0.3"
|
futures = { version = "0.3", optional = true }
|
||||||
thiserror = "1"
|
thiserror = "1"
|
||||||
regex = "1"
|
regex = "1"
|
||||||
lazy_static = "1"
|
lazy_static = "1"
|
||||||
tracing = "0.1"
|
|
||||||
tokio-serde = { version = "0.9", features = ["bincode"] }
|
|
||||||
serde_json = "1"
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
uuid = { version = "1.2.2", features = ["serde", "v4"] }
|
uuid = { version = "1.2.2", features = ["serde", "v4"] }
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
diesel = ["dep:diesel"]
|
diesel = ["dep:diesel"]
|
||||||
mock = []
|
mock = ["dep:futures"]
|
||||||
|
|
|
@ -72,7 +72,6 @@ impl CloudConfig {
|
||||||
pub struct RPCConfig {
|
pub struct RPCConfig {
|
||||||
pub socket_path: PathBuf,
|
pub socket_path: PathBuf,
|
||||||
pub admin_group: Option<String>,
|
pub admin_group: Option<String>,
|
||||||
pub events_sock: PathBuf,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The root configuration struct.
|
/// The root configuration struct.
|
||||||
|
@ -99,7 +98,6 @@ impl Default for Config {
|
||||||
rpc: RPCConfig {
|
rpc: RPCConfig {
|
||||||
socket_path: PathBuf::from("/var/run/nazrin/nzrd.sock"),
|
socket_path: PathBuf::from("/var/run/nazrin/nzrd.sock"),
|
||||||
admin_group: None,
|
admin_group: None,
|
||||||
events_sock: PathBuf::from("/var/run/nazrin/events.sock"),
|
|
||||||
},
|
},
|
||||||
db_uri: "sqlite:/var/lib/nazrin/main_sql.db".to_owned(),
|
db_uri: "sqlite:/var/lib/nazrin/main_sql.db".to_owned(),
|
||||||
libvirt_uri: match std::env::var("LIBVIRT_URI") {
|
libvirt_uri: match std::env::var("LIBVIRT_URI") {
|
||||||
|
|
|
@ -1,208 +0,0 @@
|
||||||
use std::fmt;
|
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use tarpc::client::RpcError;
|
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
|
||||||
pub enum ErrorType {
|
|
||||||
/// Entity was not found.
|
|
||||||
NotFound,
|
|
||||||
/// Error occurred with a database call.
|
|
||||||
Database,
|
|
||||||
/// Error occurred in a libvirt call.
|
|
||||||
VirtError,
|
|
||||||
/// Error occurred while parsing input.
|
|
||||||
Parse,
|
|
||||||
/// An unknown API error occurred.
|
|
||||||
Other,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ErrorType {
|
|
||||||
fn as_str(&self) -> &'static str {
|
|
||||||
match self {
|
|
||||||
ErrorType::NotFound => "Entity not found",
|
|
||||||
ErrorType::Database => "Database error",
|
|
||||||
ErrorType::VirtError => "libvirt error",
|
|
||||||
ErrorType::Parse => "Unable to parse input",
|
|
||||||
ErrorType::Other => "Unknown API error",
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for ErrorType {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
self.as_str().fmt(f)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
pub struct ApiError {
|
|
||||||
error_type: ErrorType,
|
|
||||||
message: Option<String>,
|
|
||||||
inner: Option<InnerError>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for ApiError {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
self.message
|
|
||||||
.as_deref()
|
|
||||||
.unwrap_or_else(|| self.error_type.as_str())
|
|
||||||
.fmt(f)?;
|
|
||||||
if let Some(inner) = &self.inner {
|
|
||||||
write!(f, ": {inner}")?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::error::Error for ApiError {}
|
|
||||||
|
|
||||||
impl ApiError {
|
|
||||||
pub fn new<E>(error_type: ErrorType, message: impl Into<String>, err: E) -> Self
|
|
||||||
where
|
|
||||||
E: std::error::Error,
|
|
||||||
{
|
|
||||||
Self {
|
|
||||||
error_type,
|
|
||||||
message: Some(message.into()),
|
|
||||||
inner: Some(err.into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn err_type(&self) -> ErrorType {
|
|
||||||
self.error_type
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<ErrorType> for ApiError {
|
|
||||||
fn from(value: ErrorType) -> Self {
|
|
||||||
Self {
|
|
||||||
error_type: value,
|
|
||||||
message: None,
|
|
||||||
inner: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> From<T> for ApiError
|
|
||||||
where
|
|
||||||
T: AsRef<str>,
|
|
||||||
{
|
|
||||||
fn from(value: T) -> Self {
|
|
||||||
let value = value.as_ref();
|
|
||||||
Self {
|
|
||||||
error_type: ErrorType::Other,
|
|
||||||
message: Some(value.to_owned()),
|
|
||||||
inner: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct InnerError {
|
|
||||||
error_debug: String,
|
|
||||||
error_message: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for InnerError {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
self.error_debug.fmt(f)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for InnerError {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
self.error_message.fmt(f)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<E> From<E> for InnerError
|
|
||||||
where
|
|
||||||
E: std::error::Error,
|
|
||||||
{
|
|
||||||
fn from(value: E) -> Self {
|
|
||||||
Self {
|
|
||||||
error_debug: format!("{value:?}"),
|
|
||||||
error_message: format!("{value}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait ToApiResult<T> {
|
|
||||||
/// Converts the result's error type to [`ApiError`] with [`ErrorType::Other`].
|
|
||||||
///
|
|
||||||
/// [`ApiError`]: nzr-api::error::ApiError
|
|
||||||
/// [`ErrorType::Other`]: nzr-api::error::ErrorType::Other
|
|
||||||
fn to_api(self) -> Result<T, ApiError>;
|
|
||||||
/// Converts the result's error type to [`ApiError`] with
|
|
||||||
/// [`ErrorType::Other`], and the given context.
|
|
||||||
///
|
|
||||||
/// [`ApiError`]: nzr-api::error::ApiError
|
|
||||||
/// [`ErrorType::Other`]: nzr-api::error::ErrorType::Other
|
|
||||||
fn to_api_with(self, context: impl AsRef<str>) -> Result<T, ApiError>;
|
|
||||||
/// Converts the result's error type to [`ApiError`] with the given
|
|
||||||
/// [`ErrorType`] and context.
|
|
||||||
///
|
|
||||||
/// [`ApiError`]: nzr-api::error::ApiError
|
|
||||||
/// [`ErrorType`]: nzr-api::error::ErrorType
|
|
||||||
fn to_api_with_type(self, err_type: ErrorType, context: impl AsRef<str>)
|
|
||||||
-> Result<T, ApiError>;
|
|
||||||
/// Converts the result's error type to [`ApiError`] with the given
|
|
||||||
/// [`ErrorType`].
|
|
||||||
///
|
|
||||||
/// [`ApiError`]: nzr-api::error::ApiError
|
|
||||||
/// [`ErrorType`]: nzr-api::error::ErrorType
|
|
||||||
fn to_api_type(self, err_type: ErrorType) -> Result<T, ApiError>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, E> ToApiResult<T> for Result<T, E>
|
|
||||||
where
|
|
||||||
E: std::error::Error + 'static,
|
|
||||||
{
|
|
||||||
fn to_api(self) -> Result<T, ApiError> {
|
|
||||||
self.map_err(|e| ApiError {
|
|
||||||
error_type: ErrorType::Other,
|
|
||||||
message: None,
|
|
||||||
inner: Some(e.into()),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn to_api_with(self, context: impl AsRef<str>) -> Result<T, ApiError> {
|
|
||||||
self.map_err(|e| ApiError {
|
|
||||||
error_type: ErrorType::Other,
|
|
||||||
message: Some(context.as_ref().to_owned()),
|
|
||||||
inner: Some(e.into()),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn to_api_type(self, err_type: ErrorType) -> Result<T, ApiError> {
|
|
||||||
self.map_err(|e| ApiError {
|
|
||||||
error_type: err_type,
|
|
||||||
message: None,
|
|
||||||
inner: Some(e.into()),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn to_api_with_type(
|
|
||||||
self,
|
|
||||||
err_type: ErrorType,
|
|
||||||
context: impl AsRef<str>,
|
|
||||||
) -> Result<T, ApiError> {
|
|
||||||
self.map_err(|e| ApiError {
|
|
||||||
error_type: err_type,
|
|
||||||
message: Some(context.as_ref().to_owned()),
|
|
||||||
inner: Some(e.into()),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait Simplify<T> {
|
|
||||||
fn simplify(self) -> Result<T, ApiError>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Simplify<T> for Result<Result<T, ApiError>, RpcError> {
|
|
||||||
/// Flattens a Result of `RpcError` and `ApiError` to just `ApiError`.
|
|
||||||
fn simplify(self) -> Result<T, ApiError> {
|
|
||||||
self.to_api_with("RPC Error").and_then(|r| r)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,53 +0,0 @@
|
||||||
use std::{pin::Pin, task::Poll};
|
|
||||||
|
|
||||||
use futures::{Stream, TryStreamExt};
|
|
||||||
use tarpc::tokio_util::codec::{FramedRead, LengthDelimitedCodec};
|
|
||||||
use tokio::io::AsyncRead;
|
|
||||||
|
|
||||||
use super::{EventError, EventMessage};
|
|
||||||
|
|
||||||
/// Client for receiving various events emitted by Nazrin.
|
|
||||||
pub struct EventClient<T>
|
|
||||||
where
|
|
||||||
T: AsyncRead,
|
|
||||||
{
|
|
||||||
transport: Pin<Box<FramedRead<T, LengthDelimitedCodec>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> EventClient<T>
|
|
||||||
where
|
|
||||||
T: AsyncRead,
|
|
||||||
{
|
|
||||||
/// Creates a new EventClient.
|
|
||||||
pub fn new(inner: T) -> Self {
|
|
||||||
let transport = FramedRead::new(inner, LengthDelimitedCodec::new());
|
|
||||||
Self {
|
|
||||||
transport: Box::pin(transport),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Stream for EventClient<T>
|
|
||||||
where
|
|
||||||
T: AsyncRead,
|
|
||||||
{
|
|
||||||
type Item = Result<EventMessage, EventError>;
|
|
||||||
|
|
||||||
fn poll_next(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut std::task::Context<'_>,
|
|
||||||
) -> Poll<Option<Self::Item>> {
|
|
||||||
match self.as_mut().transport.try_poll_next_unpin(cx) {
|
|
||||||
Poll::Ready(res) => {
|
|
||||||
let our_res = res.map(|res| {
|
|
||||||
res.map_err(|e| e.into()).and_then(|bytes| {
|
|
||||||
let msg: EventMessage = serde_json::from_slice(&bytes)?;
|
|
||||||
Ok(msg)
|
|
||||||
})
|
|
||||||
});
|
|
||||||
Poll::Ready(our_res)
|
|
||||||
}
|
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,77 +0,0 @@
|
||||||
pub mod client;
|
|
||||||
pub mod server;
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test;
|
|
||||||
|
|
||||||
use std::io;
|
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use thiserror::Error;
|
|
||||||
|
|
||||||
use crate::model;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
|
|
||||||
pub enum ResourceAction {
|
|
||||||
/// The referenced resource was created.
|
|
||||||
Created,
|
|
||||||
/// The referenced resource was deleted, and is no longer available.
|
|
||||||
Deleted,
|
|
||||||
/// The referenced resource was modified in some way.
|
|
||||||
Modified,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Represents an event pertaining to a specific action.
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
|
||||||
pub struct ResourceEvent<T> {
|
|
||||||
pub action: ResourceAction,
|
|
||||||
/// The entity that was acted upon.
|
|
||||||
pub entity: T,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Represents any event that is emitted by Nazrin.
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
|
||||||
#[serde(tag = "event")]
|
|
||||||
pub enum EventMessage {
|
|
||||||
/// A subnet was created, modified, or deleted.
|
|
||||||
Subnet(ResourceEvent<model::Subnet>),
|
|
||||||
/// An instance was created, modified, or deleted.
|
|
||||||
Instance(ResourceEvent<model::Instance>),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
|
||||||
pub enum EventError {
|
|
||||||
#[error("Transport error: {0}")]
|
|
||||||
Transport(#[from] io::Error),
|
|
||||||
#[error("Serialization error: {0}")]
|
|
||||||
Json(#[from] serde_json::Error),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait Emittable {
|
|
||||||
fn as_event(&self, action: ResourceAction) -> EventMessage;
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! emittable {
|
|
||||||
($t:ty, $msg:ident) => {
|
|
||||||
impl Emittable for $t {
|
|
||||||
fn as_event(&self, action: ResourceAction) -> EventMessage {
|
|
||||||
EventMessage::$msg(ResourceEvent {
|
|
||||||
action,
|
|
||||||
entity: self.clone(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
emittable!(model::Instance, Instance);
|
|
||||||
emittable!(model::Subnet, Subnet);
|
|
||||||
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! nzr_event {
|
|
||||||
($srv:expr, $act:ident, $ent:tt) => {{
|
|
||||||
use $crate::event::Emittable;
|
|
||||||
|
|
||||||
$srv.emit($ent.as_event($crate::event::ResourceAction::$act))
|
|
||||||
.await
|
|
||||||
}};
|
|
||||||
}
|
|
|
@ -1,191 +0,0 @@
|
||||||
use futures::Future;
|
|
||||||
use std::{fmt, io, pin::Pin};
|
|
||||||
|
|
||||||
use futures::SinkExt;
|
|
||||||
use tarpc::tokio_util::codec::{FramedWrite, LengthDelimitedCodec};
|
|
||||||
use tokio::{
|
|
||||||
io::AsyncWrite,
|
|
||||||
sync::broadcast::{self, Receiver, Sender},
|
|
||||||
};
|
|
||||||
use tracing::instrument;
|
|
||||||
|
|
||||||
use super::EventMessage;
|
|
||||||
|
|
||||||
/// Representation of multiple types of SocketAddrs, because you can't have just
|
|
||||||
/// one!
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum SocketAddr {
|
|
||||||
#[cfg(unix)]
|
|
||||||
TokioUnix(tokio::net::unix::SocketAddr),
|
|
||||||
#[cfg(unix)]
|
|
||||||
Unix(std::os::unix::net::SocketAddr),
|
|
||||||
Net(std::net::SocketAddr),
|
|
||||||
#[cfg(test)]
|
|
||||||
None,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for SocketAddr {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
match self {
|
|
||||||
#[cfg(unix)]
|
|
||||||
Self::TokioUnix(addr) => std::fmt::Debug::fmt(addr, f),
|
|
||||||
#[cfg(unix)]
|
|
||||||
Self::Unix(addr) => std::fmt::Debug::fmt(addr, f),
|
|
||||||
Self::Net(addr) => addr.fmt(f),
|
|
||||||
#[cfg(test)]
|
|
||||||
Self::None => write!(f, "mock client"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! as_sockaddr {
|
|
||||||
($id:ident, $t:ty) => {
|
|
||||||
impl From<$t> for SocketAddr {
|
|
||||||
fn from(value: $t) -> Self {
|
|
||||||
Self::$id(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(unix)]
|
|
||||||
as_sockaddr!(TokioUnix, tokio::net::unix::SocketAddr);
|
|
||||||
#[cfg(unix)]
|
|
||||||
as_sockaddr!(Unix, std::os::unix::net::SocketAddr);
|
|
||||||
as_sockaddr!(Net, std::net::SocketAddr);
|
|
||||||
|
|
||||||
/// Represents a connection to a client. Instead of being owned by the server
|
|
||||||
/// struct, a [`tokio::sync::broadcast::Receiver`] is used to get the serialized
|
|
||||||
/// message and pass it to the client.
|
|
||||||
///
|
|
||||||
/// [`tokio::sync::broadcast::Receiver`]: tokio::sync::broadcast::Receiver
|
|
||||||
struct EventEmitter<T>
|
|
||||||
where
|
|
||||||
T: AsyncWrite + Send + 'static,
|
|
||||||
{
|
|
||||||
transport: Pin<Box<FramedWrite<T, LengthDelimitedCodec>>>,
|
|
||||||
client_addr: SocketAddr,
|
|
||||||
channel: Receiver<Vec<u8>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> EventEmitter<T>
|
|
||||||
where
|
|
||||||
T: AsyncWrite + Send + 'static,
|
|
||||||
{
|
|
||||||
fn new(inner: T, client_addr: SocketAddr, channel: Receiver<Vec<u8>>) -> Self {
|
|
||||||
let transport = FramedWrite::new(inner, LengthDelimitedCodec::new());
|
|
||||||
Self {
|
|
||||||
transport: Box::pin(transport),
|
|
||||||
client_addr,
|
|
||||||
channel,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[instrument(skip(self), fields(client = %self.client_addr))]
|
|
||||||
async fn handler(&mut self) -> bool {
|
|
||||||
match self.channel.recv().await {
|
|
||||||
Ok(msg) => {
|
|
||||||
if let Err(err) = self.transport.send(msg.into()).await {
|
|
||||||
tracing::error!("Couldn't write to client: {err}");
|
|
||||||
false
|
|
||||||
} else {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
tracing::error!("IPC error: {err}");
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn run(mut self) {
|
|
||||||
tokio::spawn(async move { while self.handler().await {} });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Handles the creation and sending of events to clients.
|
|
||||||
pub struct EventServer {
|
|
||||||
channel: Sender<Vec<u8>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: consider letting this be configurable
|
|
||||||
const MAX_RECEIVERS: usize = 16;
|
|
||||||
|
|
||||||
impl EventServer {
|
|
||||||
/// Creates a new EventServer.
|
|
||||||
pub fn new() -> Self {
|
|
||||||
let (channel, _) = broadcast::channel(MAX_RECEIVERS);
|
|
||||||
Self { channel }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a future that returns [`Poll::Pending`] until the client count falls below the threshold.
|
|
||||||
pub fn until_available(&self) -> EventServerAvailability<'_> {
|
|
||||||
EventServerAvailability { parent: self }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Whether we're able to take connections.
|
|
||||||
#[inline]
|
|
||||||
fn is_available(&self) -> bool {
|
|
||||||
self.channel.receiver_count() < MAX_RECEIVERS
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Spawns a new [`EventEmitter`] where events will be sent to.
|
|
||||||
pub async fn spawn<T: AsyncWrite + Send + 'static>(
|
|
||||||
&self,
|
|
||||||
inner: T,
|
|
||||||
client_addr: impl Into<SocketAddr>,
|
|
||||||
) -> io::Result<()> {
|
|
||||||
// Sender<T> doesn't have a try_subscribe, so this is our last-ditch
|
|
||||||
// effort to avoid a panic
|
|
||||||
if !self.is_available() {
|
|
||||||
return Err(io::Error::new(io::ErrorKind::Other, "Too many connections"));
|
|
||||||
}
|
|
||||||
|
|
||||||
EventEmitter::new(inner, client_addr.into(), self.channel.subscribe()).run();
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Send the given event to all connected clients.
|
|
||||||
pub async fn emit(&self, msg: EventMessage) {
|
|
||||||
let bytes = match serde_json::to_vec(&msg) {
|
|
||||||
Ok(bytes) => bytes,
|
|
||||||
Err(err) => {
|
|
||||||
tracing::error!("Failed to serialize: {err}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if self.channel.send(bytes).is_err() {
|
|
||||||
tracing::debug!("Tried to emit an event, but no clients were around to hear it");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for EventServer {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::new()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct EventServerAvailability<'a> {
|
|
||||||
parent: &'a EventServer,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> Future for EventServerAvailability<'a> {
|
|
||||||
type Output = ();
|
|
||||||
|
|
||||||
fn poll(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
_cx: &mut std::task::Context<'_>,
|
|
||||||
) -> std::task::Poll<Self::Output> {
|
|
||||||
use std::task::Poll;
|
|
||||||
|
|
||||||
if self.parent.is_available() {
|
|
||||||
Poll::Ready(())
|
|
||||||
} else {
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,43 +0,0 @@
|
||||||
use std::str::FromStr;
|
|
||||||
|
|
||||||
use futures::StreamExt;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
event::{server::SocketAddr, EventMessage, ResourceAction},
|
|
||||||
net::cidr::CidrV4,
|
|
||||||
nzr_event,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn event_serde() {
|
|
||||||
let (rx, tx) = tokio::io::duplex(1024);
|
|
||||||
let server = super::server::EventServer::default();
|
|
||||||
server.spawn(tx, SocketAddr::None).await.unwrap();
|
|
||||||
let mut client = super::client::EventClient::new(rx);
|
|
||||||
let net = CidrV4::from_str("192.0.2.0/24").unwrap();
|
|
||||||
let some_subnet = crate::model::Subnet {
|
|
||||||
name: "whatever".into(),
|
|
||||||
data: crate::model::SubnetData {
|
|
||||||
ifname: "eth0".into(),
|
|
||||||
network: net,
|
|
||||||
start_host: net.make_ip(10).unwrap(),
|
|
||||||
end_host: net.make_ip(254).unwrap(),
|
|
||||||
gateway4: Some(net.make_ip(1).unwrap()),
|
|
||||||
dns: Vec::new(),
|
|
||||||
domain_name: Some("homestarrunnner.net".parse().unwrap()),
|
|
||||||
vlan_id: None,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
nzr_event!(server, Created, some_subnet);
|
|
||||||
let next = client
|
|
||||||
.next()
|
|
||||||
.await
|
|
||||||
.expect("client must receive message")
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let EventMessage::Subnet(net) = next else {
|
|
||||||
panic!("Unexpected event received: {next:?}");
|
|
||||||
};
|
|
||||||
|
|
||||||
assert_eq!(net.action, ResourceAction::Created);
|
|
||||||
}
|
|
|
@ -1,12 +1,9 @@
|
||||||
use std::net::Ipv4Addr;
|
use std::net::Ipv4Addr;
|
||||||
|
|
||||||
use error::ApiError;
|
|
||||||
use model::{CreateStatus, Instance, SshPubkey, Subnet};
|
use model::{CreateStatus, Instance, SshPubkey, Subnet};
|
||||||
|
|
||||||
pub mod args;
|
pub mod args;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod error;
|
|
||||||
pub mod event;
|
|
||||||
#[cfg(feature = "mock")]
|
#[cfg(feature = "mock")]
|
||||||
pub mod mock;
|
pub mod mock;
|
||||||
pub mod model;
|
pub mod model;
|
||||||
|
@ -26,40 +23,40 @@ pub enum InstanceQuery {
|
||||||
#[tarpc::service]
|
#[tarpc::service]
|
||||||
pub trait Nazrin {
|
pub trait Nazrin {
|
||||||
/// Creates a new instance.
|
/// Creates a new instance.
|
||||||
async fn new_instance(build_args: args::NewInstance) -> Result<uuid::Uuid, ApiError>;
|
async fn new_instance(build_args: args::NewInstance) -> Result<uuid::Uuid, String>;
|
||||||
/// Poll for the current status of an instance being created.
|
/// Poll for the current status of an instance being created.
|
||||||
async fn poll_new_instance(task_id: uuid::Uuid) -> Option<CreateStatus>;
|
async fn poll_new_instance(task_id: uuid::Uuid) -> Option<CreateStatus>;
|
||||||
/// Deletes an existing instance.
|
/// Deletes an existing instance.
|
||||||
///
|
///
|
||||||
/// This should involve deleting all related disks and clearing
|
/// This should involve deleting all related disks and clearing
|
||||||
/// the lease information from the subnet data, if any.
|
/// the lease information from the subnet data, if any.
|
||||||
async fn delete_instance(name: String) -> Result<(), ApiError>;
|
async fn delete_instance(name: String) -> Result<(), String>;
|
||||||
/// Gets a single instance by the given InstanceQuery.
|
/// Gets a single instance by the given InstanceQuery.
|
||||||
async fn find_instance(query: InstanceQuery) -> Result<Option<Instance>, ApiError>;
|
async fn find_instance(query: InstanceQuery) -> Result<Option<Instance>, String>;
|
||||||
/// Gets a list of existing instances.
|
/// Gets a list of existing instances.
|
||||||
async fn get_instances(with_status: bool) -> Result<Vec<Instance>, ApiError>;
|
async fn get_instances(with_status: bool) -> Result<Vec<Instance>, String>;
|
||||||
/// Cleans up unusable entries in the database.
|
/// Cleans up unusable entries in the database.
|
||||||
async fn garbage_collect() -> Result<(), ApiError>;
|
async fn garbage_collect() -> Result<(), String>;
|
||||||
/// Creates a new subnet.
|
/// Creates a new subnet.
|
||||||
///
|
///
|
||||||
/// Unlike instances, subnets shouldn't perform any changes to the
|
/// Unlike instances, subnets shouldn't perform any changes to the
|
||||||
/// interfaces they reference. This should be used primarily for
|
/// interfaces they reference. This should be used primarily for
|
||||||
/// ease-of-use and bookkeeping (e.g., assigning dynamic leases).
|
/// ease-of-use and bookkeeping (e.g., assigning dynamic leases).
|
||||||
async fn new_subnet(build_args: Subnet) -> Result<Subnet, ApiError>;
|
async fn new_subnet(build_args: Subnet) -> Result<Subnet, String>;
|
||||||
/// Modifies an existing subnet.
|
/// Modifies an existing subnet.
|
||||||
async fn modify_subnet(edit_args: Subnet) -> Result<Subnet, ApiError>;
|
async fn modify_subnet(edit_args: Subnet) -> Result<Subnet, String>;
|
||||||
/// Gets a list of existing subnets.
|
/// Gets a list of existing subnets.
|
||||||
async fn get_subnets() -> Result<Vec<Subnet>, ApiError>;
|
async fn get_subnets() -> Result<Vec<Subnet>, String>;
|
||||||
/// Deletes an existing subnet.
|
/// Deletes an existing subnet.
|
||||||
async fn delete_subnet(interface: String) -> Result<(), ApiError>;
|
async fn delete_subnet(interface: String) -> Result<(), String>;
|
||||||
/// Gets the cloud-init user-data for the given instance.
|
/// Gets the cloud-init user-data for the given instance.
|
||||||
async fn get_instance_userdata(id: i32) -> Result<Vec<u8>, ApiError>;
|
async fn get_instance_userdata(id: i32) -> Result<Vec<u8>, String>;
|
||||||
/// Gets all SSH keys stored in the database.
|
/// Gets all SSH keys stored in the database.
|
||||||
async fn get_ssh_pubkeys() -> Result<Vec<SshPubkey>, ApiError>;
|
async fn get_ssh_pubkeys() -> Result<Vec<SshPubkey>, String>;
|
||||||
/// Adds a new SSH public key to the database.
|
/// Adds a new SSH public key to the database.
|
||||||
async fn add_ssh_pubkey(pub_key: String) -> Result<SshPubkey, ApiError>;
|
async fn add_ssh_pubkey(pub_key: String) -> Result<SshPubkey, String>;
|
||||||
/// Deletes an SSH public key from the database.
|
/// Deletes an SSH public key from the database.
|
||||||
async fn delete_ssh_pubkey(id: i32) -> Result<(), ApiError>;
|
async fn delete_ssh_pubkey(id: i32) -> Result<(), String>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new NazrinClient.
|
/// Create a new NazrinClient.
|
||||||
|
|
|
@ -1,20 +1,20 @@
|
||||||
use std::net::Ipv4Addr;
|
use std::net::Ipv4Addr;
|
||||||
|
|
||||||
use crate::{args, error::ApiError, model, net::cidr::CidrV4};
|
use crate::{args, model, net::cidr::CidrV4};
|
||||||
|
|
||||||
pub trait NzrClientExt {
|
pub trait NzrClientExt {
|
||||||
#[allow(async_fn_in_trait)]
|
#[allow(async_fn_in_trait)]
|
||||||
async fn new_mock_instance(
|
async fn new_mock_instance(
|
||||||
&mut self,
|
&mut self,
|
||||||
name: impl AsRef<str>,
|
name: impl AsRef<str>,
|
||||||
) -> Result<Result<model::Instance, ApiError>, crate::RpcError>;
|
) -> Result<Result<model::Instance, String>, crate::RpcError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NzrClientExt for crate::NazrinClient {
|
impl NzrClientExt for crate::NazrinClient {
|
||||||
async fn new_mock_instance(
|
async fn new_mock_instance(
|
||||||
&mut self,
|
&mut self,
|
||||||
name: impl AsRef<str>,
|
name: impl AsRef<str>,
|
||||||
) -> Result<Result<model::Instance, ApiError>, crate::RpcError> {
|
) -> Result<Result<model::Instance, String>, crate::RpcError> {
|
||||||
let name = name.as_ref().to_owned();
|
let name = name.as_ref().to_owned();
|
||||||
|
|
||||||
let subnet = self
|
let subnet = self
|
||||||
|
|
|
@ -10,7 +10,6 @@ use futures::{future, StreamExt};
|
||||||
use tokio::{sync::RwLock, task::JoinHandle};
|
use tokio::{sync::RwLock, task::JoinHandle};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
error::{ApiError, ErrorType},
|
|
||||||
model,
|
model,
|
||||||
net::{cidr::CidrV4, mac::MacAddr},
|
net::{cidr::CidrV4, mac::MacAddr},
|
||||||
InstanceQuery, Nazrin, NazrinClient,
|
InstanceQuery, Nazrin, NazrinClient,
|
||||||
|
@ -63,14 +62,14 @@ impl Nazrin for MockServer {
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
build_args: crate::args::NewInstance,
|
build_args: crate::args::NewInstance,
|
||||||
) -> Result<uuid::Uuid, ApiError> {
|
) -> Result<uuid::Uuid, String> {
|
||||||
let mut db = self.db.write().await;
|
let mut db = self.db.write().await;
|
||||||
let Some(net_pos) = db
|
let Some(net_pos) = db
|
||||||
.subnets
|
.subnets
|
||||||
.iter()
|
.iter()
|
||||||
.position(|s| s.as_ref().filter(|s| s.name == build_args.subnet).is_some())
|
.position(|s| s.as_ref().filter(|s| s.name == build_args.subnet).is_some())
|
||||||
else {
|
else {
|
||||||
return Err("Subnet doesn't exist".into());
|
return Err("Subnet doesn't exist".to_owned());
|
||||||
};
|
};
|
||||||
let subnet = db.subnets[net_pos].as_ref().unwrap().clone();
|
let subnet = db.subnets[net_pos].as_ref().unwrap().clone();
|
||||||
let cur_lease = *(db
|
let cur_lease = *(db
|
||||||
|
@ -135,11 +134,7 @@ impl Nazrin for MockServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_instance(
|
async fn delete_instance(self, _: tarpc::context::Context, name: String) -> Result<(), String> {
|
||||||
self,
|
|
||||||
_: tarpc::context::Context,
|
|
||||||
name: String,
|
|
||||||
) -> Result<(), ApiError> {
|
|
||||||
let mut db = self.db.write().await;
|
let mut db = self.db.write().await;
|
||||||
let Some(inst) = db
|
let Some(inst) = db
|
||||||
.instances
|
.instances
|
||||||
|
@ -147,7 +142,7 @@ impl Nazrin for MockServer {
|
||||||
.find(|i| i.as_ref().filter(|i| i.name == name).is_some())
|
.find(|i| i.as_ref().filter(|i| i.name == name).is_some())
|
||||||
.take()
|
.take()
|
||||||
else {
|
else {
|
||||||
return Err("Instance doesn't exist".into());
|
return Err("Instance doesn't exist".to_owned());
|
||||||
};
|
};
|
||||||
inst.take();
|
inst.take();
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -157,7 +152,7 @@ impl Nazrin for MockServer {
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
query: crate::InstanceQuery,
|
query: crate::InstanceQuery,
|
||||||
) -> Result<Option<crate::model::Instance>, ApiError> {
|
) -> Result<Option<crate::model::Instance>, String> {
|
||||||
let db = self.db.read().await;
|
let db = self.db.read().await;
|
||||||
|
|
||||||
let res = {
|
let res = {
|
||||||
|
@ -182,7 +177,7 @@ impl Nazrin for MockServer {
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
id: i32,
|
id: i32,
|
||||||
) -> Result<Vec<u8>, ApiError> {
|
) -> Result<Vec<u8>, String> {
|
||||||
let db = self.db.read().await;
|
let db = self.db.read().await;
|
||||||
let Some(inst) = db
|
let Some(inst) = db
|
||||||
.instances
|
.instances
|
||||||
|
@ -190,7 +185,7 @@ impl Nazrin for MockServer {
|
||||||
.find(|i| i.as_ref().map(|i| i.id == id).is_some())
|
.find(|i| i.as_ref().map(|i| i.id == id).is_some())
|
||||||
.and_then(|o| o.as_ref())
|
.and_then(|o| o.as_ref())
|
||||||
else {
|
else {
|
||||||
return Err("No such instance".into());
|
return Err("No such instance".to_owned());
|
||||||
};
|
};
|
||||||
Ok(db.ci_userdatas.get(&inst.name).cloned().unwrap_or_default())
|
Ok(db.ci_userdatas.get(&inst.name).cloned().unwrap_or_default())
|
||||||
}
|
}
|
||||||
|
@ -199,7 +194,7 @@ impl Nazrin for MockServer {
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
_with_status: bool,
|
_with_status: bool,
|
||||||
) -> Result<Vec<crate::model::Instance>, ApiError> {
|
) -> Result<Vec<crate::model::Instance>, String> {
|
||||||
let db = self.db.read().await;
|
let db = self.db.read().await;
|
||||||
Ok(db
|
Ok(db
|
||||||
.instances
|
.instances
|
||||||
|
@ -212,7 +207,7 @@ impl Nazrin for MockServer {
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
build_args: crate::model::Subnet,
|
build_args: crate::model::Subnet,
|
||||||
) -> Result<crate::model::Subnet, ApiError> {
|
) -> Result<crate::model::Subnet, String> {
|
||||||
let mut db = self.db.write().await;
|
let mut db = self.db.write().await;
|
||||||
let subnet = build_args.clone();
|
let subnet = build_args.clone();
|
||||||
db.subnets.push(Some(build_args));
|
db.subnets.push(Some(build_args));
|
||||||
|
@ -223,14 +218,14 @@ impl Nazrin for MockServer {
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
_edit_args: crate::model::Subnet,
|
_edit_args: crate::model::Subnet,
|
||||||
) -> Result<crate::model::Subnet, ApiError> {
|
) -> Result<crate::model::Subnet, String> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_subnets(
|
async fn get_subnets(
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
) -> Result<Vec<crate::model::Subnet>, ApiError> {
|
) -> Result<Vec<crate::model::Subnet>, String> {
|
||||||
let db = self.db.read().await;
|
let db = self.db.read().await;
|
||||||
Ok(db.subnets.iter().filter_map(|net| net.clone()).collect())
|
Ok(db.subnets.iter().filter_map(|net| net.clone()).collect())
|
||||||
}
|
}
|
||||||
|
@ -239,40 +234,35 @@ impl Nazrin for MockServer {
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
interface: String,
|
interface: String,
|
||||||
) -> Result<(), ApiError> {
|
) -> Result<(), String> {
|
||||||
let mut db = self.db.write().await;
|
let mut db = self.db.write().await;
|
||||||
{
|
db.instances
|
||||||
|
.iter()
|
||||||
|
.filter_map(|inst| inst.as_ref())
|
||||||
|
.for_each(|inst| {
|
||||||
|
if inst.lease.subnet == interface {
|
||||||
|
todo!("what now")
|
||||||
|
}
|
||||||
|
});
|
||||||
let Some(subnet) = db
|
let Some(subnet) = db
|
||||||
.subnets
|
.subnets
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.find(|net| net.as_ref().filter(|n| n.name == interface).is_some())
|
.find(|net| net.as_ref().filter(|n| n.name == interface).is_some())
|
||||||
else {
|
else {
|
||||||
return Err(ErrorType::NotFound.into());
|
return Err("Subnet doesn't exist".to_owned());
|
||||||
};
|
};
|
||||||
subnet.take();
|
subnet.take();
|
||||||
}
|
|
||||||
// Drop all instances that belong to this subnet
|
|
||||||
db.instances.iter_mut().for_each(|inst| {
|
|
||||||
if inst
|
|
||||||
.as_mut()
|
|
||||||
.filter(|inst| inst.lease.subnet != interface)
|
|
||||||
.is_some()
|
|
||||||
{
|
|
||||||
inst.take();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn garbage_collect(self, _: tarpc::context::Context) -> Result<(), ApiError> {
|
async fn garbage_collect(self, _: tarpc::context::Context) -> Result<(), String> {
|
||||||
// no libvirt to compare against, no instances to GC
|
todo!()
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_ssh_pubkeys(
|
async fn get_ssh_pubkeys(
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
) -> Result<Vec<model::SshPubkey>, ApiError> {
|
) -> Result<Vec<model::SshPubkey>, String> {
|
||||||
let db = self.db.read().await;
|
let db = self.db.read().await;
|
||||||
|
|
||||||
Ok(db
|
Ok(db
|
||||||
|
@ -286,7 +276,7 @@ impl Nazrin for MockServer {
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
pub_key: String,
|
pub_key: String,
|
||||||
) -> Result<model::SshPubkey, ApiError> {
|
) -> Result<model::SshPubkey, String> {
|
||||||
let mut key_model = model::SshPubkey::from_str(&pub_key).map_err(|e| e.to_string())?;
|
let mut key_model = model::SshPubkey::from_str(&pub_key).map_err(|e| e.to_string())?;
|
||||||
let mut db = self.db.write().await;
|
let mut db = self.db.write().await;
|
||||||
key_model.id = Some(db.ssh_keys.len() as i32);
|
key_model.id = Some(db.ssh_keys.len() as i32);
|
||||||
|
@ -294,7 +284,7 @@ impl Nazrin for MockServer {
|
||||||
Ok(key_model)
|
Ok(key_model)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_ssh_pubkey(self, _: tarpc::context::Context, id: i32) -> Result<(), ApiError> {
|
async fn delete_ssh_pubkey(self, _: tarpc::context::Context, id: i32) -> Result<(), String> {
|
||||||
let mut db = self.db.write().await;
|
let mut db = self.db.write().await;
|
||||||
if let Some(key) = db.ssh_keys.get_mut(id as usize) {
|
if let Some(key) = db.ssh_keys.get_mut(id as usize) {
|
||||||
key.take();
|
key.take();
|
||||||
|
|
|
@ -5,10 +5,7 @@ use serde::{Deserialize, Serialize};
|
||||||
use std::{fmt, net::Ipv4Addr};
|
use std::{fmt, net::Ipv4Addr};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
use crate::{
|
use crate::net::{cidr::CidrV4, mac::MacAddr};
|
||||||
error::ApiError,
|
|
||||||
net::{cidr::CidrV4, mac::MacAddr},
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
|
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
|
||||||
#[repr(u32)]
|
#[repr(u32)]
|
||||||
|
@ -70,7 +67,7 @@ impl fmt::Display for DomainState {
|
||||||
pub struct CreateStatus {
|
pub struct CreateStatus {
|
||||||
pub status_text: String,
|
pub status_text: String,
|
||||||
pub completion: f32,
|
pub completion: f32,
|
||||||
pub result: Option<Result<Instance, ApiError>>,
|
pub result: Option<Result<Instance, String>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Struct representing a VM instance.
|
/// Struct representing a VM instance.
|
||||||
|
|
|
@ -26,8 +26,9 @@ tarpc = { version = "0.34", features = [
|
||||||
] }
|
] }
|
||||||
|
|
||||||
# Logging
|
# Logging
|
||||||
tracing = "0.1"
|
# TODO: switch to tracing?
|
||||||
tracing-subscriber = "0.3"
|
log = "0.4.17"
|
||||||
|
syslog = "7"
|
||||||
|
|
||||||
# Database
|
# Database
|
||||||
diesel = { version = "2.2", features = [
|
diesel = { version = "2.2", features = [
|
||||||
|
|
|
@ -1 +1,23 @@
|
||||||
|
pub mod net;
|
||||||
pub mod vm;
|
pub mod vm;
|
||||||
|
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct CommandError(String);
|
||||||
|
|
||||||
|
impl fmt::Display for CommandError {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(f, "{}", self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for CommandError {}
|
||||||
|
|
||||||
|
macro_rules! cmd_error {
|
||||||
|
($($arg:tt)*) => {
|
||||||
|
Box::new(CommandError(format!($($arg)*)))
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) use cmd_error;
|
||||||
|
|
47
nzrd/src/cmd/net.rs
Normal file
47
nzrd/src/cmd/net.rs
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
use super::*;
|
||||||
|
use crate::ctx::Context;
|
||||||
|
use crate::model::tx::Transaction;
|
||||||
|
use crate::model::Subnet;
|
||||||
|
use nzr_api::model;
|
||||||
|
|
||||||
|
pub async fn add_subnet(
|
||||||
|
ctx: &Context,
|
||||||
|
args: model::Subnet,
|
||||||
|
) -> Result<Subnet, Box<dyn std::error::Error>> {
|
||||||
|
let subnet = {
|
||||||
|
let s = Subnet::insert(ctx, args.name, args.data)
|
||||||
|
.await
|
||||||
|
.map_err(|er| cmd_error!("Couldn't generate subnet: {}", er))?;
|
||||||
|
Transaction::begin(ctx, s)
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(err) = ctx.zones.new_zone(&subnet).await {
|
||||||
|
Err(cmd_error!("Failed to create new DNS zone: {}", err))
|
||||||
|
} else {
|
||||||
|
Ok(subnet.take())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn delete_subnet(
|
||||||
|
ctx: &Context,
|
||||||
|
name: impl AsRef<str>,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
match Subnet::get_by_name(ctx, name.as_ref())
|
||||||
|
.await
|
||||||
|
.map_err(|er| cmd_error!("Couldn't find subnet: {}", er))?
|
||||||
|
{
|
||||||
|
Some(subnet) => {
|
||||||
|
if let Some(domain_name) = &subnet.domain_name {
|
||||||
|
ctx.zones.delete_zone(domain_name).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
subnet
|
||||||
|
.delete(ctx)
|
||||||
|
.await
|
||||||
|
.map_err(|er| cmd_error!("Couldn't fully delete subnet entry: {}", er))
|
||||||
|
}
|
||||||
|
None => Err(cmd_error!("Subnet not found")),
|
||||||
|
}?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -1,4 +1,3 @@
|
||||||
use nzr_api::error::{ApiError, ErrorType, ToApiResult};
|
|
||||||
use nzr_api::net::cidr::CidrV4;
|
use nzr_api::net::cidr::CidrV4;
|
||||||
use nzr_virt::error::DomainError;
|
use nzr_virt::error::DomainError;
|
||||||
use nzr_virt::xml::build::DomainBuilder;
|
use nzr_virt::xml::build::DomainBuilder;
|
||||||
|
@ -6,14 +5,14 @@ use nzr_virt::xml::{self, InfoMap, SerialType, Sysinfo};
|
||||||
use nzr_virt::{datasize, dom, vol};
|
use nzr_virt::{datasize, dom, vol};
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
use crate::ctrl::vm::Progress;
|
use crate::ctrl::vm::Progress;
|
||||||
use crate::ctx::Context;
|
use crate::ctx::Context;
|
||||||
use crate::model::tx::Transaction;
|
|
||||||
use crate::model::{Instance, Subnet};
|
use crate::model::{Instance, Subnet};
|
||||||
|
use log::{debug, info, warn};
|
||||||
|
use nzr_api::args;
|
||||||
use nzr_api::net::mac::MacAddr;
|
use nzr_api::net::mac::MacAddr;
|
||||||
use nzr_api::{args, model, nzr_event};
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tracing::{debug, info, warn};
|
|
||||||
|
|
||||||
const VIRT_MAC_OUI: &[u8] = &[0x02, 0xf1, 0x0f];
|
const VIRT_MAC_OUI: &[u8] = &[0x02, 0xf1, 0x0f];
|
||||||
|
|
||||||
|
@ -30,21 +29,23 @@ pub async fn new_instance(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
prog_task: Arc<RwLock<Progress>>,
|
prog_task: Arc<RwLock<Progress>>,
|
||||||
args: &args::NewInstance,
|
args: &args::NewInstance,
|
||||||
) -> Result<(Instance, dom::Domain), ApiError> {
|
) -> Result<(Instance, dom::Domain), Box<dyn std::error::Error>> {
|
||||||
progress!(prog_task, 0.0, "Starting...");
|
progress!(prog_task, 0.0, "Starting...");
|
||||||
// find the subnet corresponding to the interface
|
// find the subnet corresponding to the interface
|
||||||
let subnet = Subnet::get_by_name(&ctx, &args.subnet)
|
let subnet = Subnet::get_by_name(&ctx, &args.subnet)
|
||||||
.await
|
.await
|
||||||
.to_api_with("Unable to get interface")?
|
.map_err(|er| cmd_error!("Unable to get interface: {}", er))?
|
||||||
.ok_or::<ApiError>(format!("Subnet {} wasn't found in database", &args.subnet).into())?;
|
.ok_or(cmd_error!(
|
||||||
|
"Subnet {} wasn't found in database",
|
||||||
|
&args.subnet
|
||||||
|
))?;
|
||||||
|
|
||||||
// bail if a domain already exists
|
// bail if a domain already exists
|
||||||
if let Ok(dom) = ctx.virt.conn.get_instance(&args.name).await {
|
if let Ok(dom) = ctx.virt.conn.get_instance(&args.name).await {
|
||||||
Err(format!(
|
Err(cmd_error!(
|
||||||
"Domain with name already exists (uuid {})",
|
"Domain with name already exists (uuid {})",
|
||||||
dom.xml().await.uuid,
|
dom.xml().await.uuid,
|
||||||
)
|
))
|
||||||
.into())
|
|
||||||
} else {
|
} else {
|
||||||
// make sure the base image exists
|
// make sure the base image exists
|
||||||
let mut base_image = ctx
|
let mut base_image = ctx
|
||||||
|
@ -53,7 +54,7 @@ pub async fn new_instance(
|
||||||
.baseimg
|
.baseimg
|
||||||
.volume(&args.base_image)
|
.volume(&args.base_image)
|
||||||
.await
|
.await
|
||||||
.to_api_with("Couldn't find base image")?;
|
.map_err(|er| cmd_error!("Couldn't find base image: {}", er))?;
|
||||||
progress!(prog_task, 10.0, "Generating metadata...");
|
progress!(prog_task, 10.0, "Generating metadata...");
|
||||||
|
|
||||||
// generate a new lease with a new MAC addr
|
// generate a new lease with a new MAC addr
|
||||||
|
@ -61,23 +62,19 @@ pub async fn new_instance(
|
||||||
let bytes = [VIRT_MAC_OUI, rand::random::<[u8; 3]>().as_ref()].concat();
|
let bytes = [VIRT_MAC_OUI, rand::random::<[u8; 3]>().as_ref()].concat();
|
||||||
MacAddr::from_bytes(bytes)
|
MacAddr::from_bytes(bytes)
|
||||||
}
|
}
|
||||||
.to_api_with("Unable to create a new MAC address")?;
|
.map_err(|er| cmd_error!("Unable to create a new MAC address: {}", er))?;
|
||||||
|
|
||||||
// Get highest host addr + 1 for our new addr
|
// Get highest host addr + 1 for our new addr
|
||||||
let addr = {
|
let addr = {
|
||||||
let addr_num = Instance::all_in_subnet(&ctx, &subnet)
|
let addr_num = Instance::all_in_subnet(&ctx, &subnet)
|
||||||
.await
|
.await?
|
||||||
.to_api_with("Couldn't get instances in subnet")?
|
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.max_by(|a, b| a.host_num.cmp(&b.host_num))
|
.max_by(|a, b| a.host_num.cmp(&b.host_num))
|
||||||
.map_or(subnet.start_host, |i| i.host_num + 1);
|
.map_or(subnet.start_host, |i| i.host_num + 1);
|
||||||
if addr_num > subnet.end_host || addr_num < subnet.start_host {
|
if addr_num > subnet.end_host || addr_num < subnet.start_host {
|
||||||
return Err("Got invalid lease address for instance".into());
|
Err(cmd_error!("Got invalid lease address for instance"))?;
|
||||||
}
|
}
|
||||||
let addr = subnet
|
let addr = subnet.network.make_ip(addr_num as u32)?;
|
||||||
.network
|
|
||||||
.make_ip(addr_num as u32)
|
|
||||||
.to_api_with("Unable to generate instance IP")?;
|
|
||||||
CidrV4::new(addr, subnet.network.cidr())
|
CidrV4::new(addr, subnet.network.cidr())
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -88,12 +85,7 @@ pub async fn new_instance(
|
||||||
};
|
};
|
||||||
|
|
||||||
// generate cloud-init data
|
// generate cloud-init data
|
||||||
let db_inst = {
|
let db_inst = Instance::insert(&ctx, &args.name, &subnet, lease.clone(), None).await?;
|
||||||
let inst = Instance::insert(&ctx, &args.name, &subnet, lease.clone(), None)
|
|
||||||
.await
|
|
||||||
.to_api_type(ErrorType::Database)?;
|
|
||||||
Transaction::begin(&ctx, inst)
|
|
||||||
};
|
|
||||||
|
|
||||||
progress!(prog_task, 30.0, "Creating instance images...");
|
progress!(prog_task, 30.0, "Creating instance images...");
|
||||||
// create primary volume from base image
|
// create primary volume from base image
|
||||||
|
@ -104,7 +96,7 @@ pub async fn new_instance(
|
||||||
datasize!((args.disk_sizes.0) GiB),
|
datasize!((args.disk_sizes.0) GiB),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.to_api_with("Failed to clone base image")?;
|
.map_err(|er| cmd_error!("Failed to clone base image: {}", er))?;
|
||||||
|
|
||||||
// and, if it exists: the second volume
|
// and, if it exists: the second volume
|
||||||
let sec_vol = match args.disk_sizes.1 {
|
let sec_vol = match args.disk_sizes.1 {
|
||||||
|
@ -112,11 +104,7 @@ pub async fn new_instance(
|
||||||
let voldata =
|
let voldata =
|
||||||
// TODO: Fix VolType
|
// TODO: Fix VolType
|
||||||
xml::Volume::new(&args.name, xml::VolType::Qcow2, datasize!(sec_size GiB));
|
xml::Volume::new(&args.name, xml::VolType::Qcow2, datasize!(sec_size GiB));
|
||||||
Some(
|
Some(vol::Volume::create(&ctx.virt.pools.secondary, voldata, 0).await?)
|
||||||
vol::Volume::create(&ctx.virt.pools.secondary, voldata, 0)
|
|
||||||
.await
|
|
||||||
.to_api_with("Couldn't create secondary volume")?,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
None => None,
|
None => None,
|
||||||
};
|
};
|
||||||
|
@ -180,20 +168,15 @@ pub async fn new_instance(
|
||||||
.build()
|
.build()
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut virt_dom = ctx
|
let mut virt_dom = ctx.virt.conn.define_instance(dom_xml).await?;
|
||||||
.virt
|
|
||||||
.conn
|
|
||||||
.define_instance(dom_xml)
|
|
||||||
.await
|
|
||||||
.to_api_with("Couldn't define libvirt instance")?;
|
|
||||||
|
|
||||||
// not a fatal error, we can set autostart afterward
|
// not a fatal error, we can set autostart afterward
|
||||||
if let Err(err) = virt_dom.autostart(true).await {
|
if let Err(er) = virt_dom.autostart(true).await {
|
||||||
warn!("Couldn't set autostart for domain: {err}");
|
warn!("Couldn't set autostart for domain: {}", er);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(err) = virt_dom.start().await {
|
if let Err(er) = virt_dom.start().await {
|
||||||
warn!("Domain defined, but couldn't be started! Error: {err}");
|
warn!("Domain defined, but couldn't be started! Error: {}", er);
|
||||||
}
|
}
|
||||||
|
|
||||||
// set all volumes to persistent to avoid deletion
|
// set all volumes to persistent to avoid deletion
|
||||||
|
@ -205,76 +188,40 @@ pub async fn new_instance(
|
||||||
|
|
||||||
progress!(prog_task, 80.0, "Domain created!");
|
progress!(prog_task, 80.0, "Domain created!");
|
||||||
debug!("Domain {} created!", virt_dom.xml().await.name.as_str());
|
debug!("Domain {} created!", virt_dom.xml().await.name.as_str());
|
||||||
Ok((db_inst.take(), virt_dom))
|
Ok((db_inst, virt_dom))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn delete_instance(
|
pub async fn delete_instance(ctx: Context, name: String) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
ctx: Context,
|
let Some(inst_db) = Instance::get_by_name(&ctx, &name).await? else {
|
||||||
name: String,
|
return Err(cmd_error!("Instance {name} not found"));
|
||||||
) -> Result<Option<model::Instance>, ApiError> {
|
|
||||||
let Some(inst_db) = Instance::get_by_name(&ctx, &name)
|
|
||||||
.await
|
|
||||||
.to_api_with_type(ErrorType::Database, "Couldn't find instance")?
|
|
||||||
else {
|
|
||||||
return Err(ErrorType::NotFound.into());
|
|
||||||
};
|
|
||||||
let api_model = match inst_db.api_model(&ctx).await {
|
|
||||||
Ok(model) => Some(model),
|
|
||||||
Err(err) => {
|
|
||||||
warn!("Couldn't get API model to notify clients: {err}");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
// First, destroy the instance
|
// First, destroy the instance
|
||||||
match ctx.virt.conn.get_instance(name.clone()).await {
|
match ctx.virt.conn.get_instance(name.clone()).await {
|
||||||
Ok(mut inst) => {
|
Ok(mut inst) => {
|
||||||
inst.stop().await.to_api_with("Couldn't stop instance")?;
|
inst.stop().await?;
|
||||||
inst.undefine(true)
|
inst.undefine(true).await?;
|
||||||
.await
|
|
||||||
.to_api_with("Couldn't undefine instance")?;
|
|
||||||
}
|
}
|
||||||
Err(DomainError::DomainNotFound) => {
|
Err(DomainError::DomainNotFound) => {
|
||||||
warn!("Deleting instance that exists in DB but not libvirt");
|
warn!("Deleting instance that exists in DB but not libvirt");
|
||||||
}
|
}
|
||||||
Err(err) => Err(ApiError::new(
|
Err(err) => Err(err)?,
|
||||||
nzr_api::error::ErrorType::VirtError,
|
|
||||||
"Couldn't get instance from libvirt",
|
|
||||||
err,
|
|
||||||
))?,
|
|
||||||
}
|
}
|
||||||
// Then, delete the DB entity
|
// Then, delete the DB entity
|
||||||
inst_db
|
inst_db.delete(&ctx).await?;
|
||||||
.delete(&ctx)
|
|
||||||
.await
|
|
||||||
.to_api_with("Couldn't delete from database")?;
|
|
||||||
|
|
||||||
Ok(api_model)
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delete all instances that don't have a matching libvirt domain
|
|
||||||
pub async fn prune_instances(ctx: &Context) -> Result<(), Box<dyn std::error::Error>> {
|
pub async fn prune_instances(ctx: &Context) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
for entity in Instance::all(ctx).await? {
|
for entity in Instance::all(ctx).await? {
|
||||||
if let Err(DomainError::DomainNotFound) = ctx.virt.conn.get_instance(&entity.name).await {
|
if let Err(err) = ctx.virt.conn.get_instance(&entity.name).await {
|
||||||
|
if err == DomainError::DomainNotFound {
|
||||||
info!("Invalid domain {}, deleting", &entity.name);
|
info!("Invalid domain {}, deleting", &entity.name);
|
||||||
// First, get the API model to notify clients with
|
|
||||||
let api_model = match entity.api_model(ctx).await {
|
|
||||||
Ok(ent) => Some(ent),
|
|
||||||
Err(err) => {
|
|
||||||
warn!("Couldn't get api model to notify clients: {err}");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// then, delete by name
|
|
||||||
let name = entity.name.clone();
|
let name = entity.name.clone();
|
||||||
if let Err(err) = entity.delete(ctx).await {
|
if let Err(err) = entity.delete(ctx).await {
|
||||||
warn!("Couldn't delete {}: {}", name, err);
|
warn!("Couldn't delete {}: {}", name, err);
|
||||||
}
|
}
|
||||||
|
|
||||||
// and assuming all goes well, notify clients
|
|
||||||
if let Some(ent) = api_model {
|
|
||||||
nzr_event!(ctx.events, Deleted, ent);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,11 +3,13 @@ use diesel::{
|
||||||
SqliteConnection,
|
SqliteConnection,
|
||||||
};
|
};
|
||||||
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
|
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
|
||||||
|
use log::trace;
|
||||||
use nzr_virt::{vol, Connection};
|
use nzr_virt::{vol, Connection};
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
use nzr_api::{config::Config, event::server::EventServer};
|
use crate::dns::ZoneData;
|
||||||
|
use nzr_api::config::Config;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -40,8 +42,8 @@ impl Deref for Context {
|
||||||
pub struct InnerCtx {
|
pub struct InnerCtx {
|
||||||
pub sqldb: diesel::r2d2::Pool<ConnectionManager<SqliteConnection>>,
|
pub sqldb: diesel::r2d2::Pool<ConnectionManager<SqliteConnection>>,
|
||||||
pub config: Config,
|
pub config: Config,
|
||||||
|
pub zones: crate::dns::ZoneData,
|
||||||
pub virt: VirtCtx,
|
pub virt: VirtCtx,
|
||||||
pub events: Arc<EventServer>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
|
@ -58,6 +60,7 @@ pub enum ContextError {
|
||||||
|
|
||||||
impl InnerCtx {
|
impl InnerCtx {
|
||||||
async fn new(config: Config) -> Result<Self, ContextError> {
|
async fn new(config: Config) -> Result<Self, ContextError> {
|
||||||
|
let zones = ZoneData::new(&config.dns);
|
||||||
let conn = Connection::open(&config.libvirt_uri)?;
|
let conn = Connection::open(&config.libvirt_uri)?;
|
||||||
|
|
||||||
let pools = PoolRefs {
|
let pools = PoolRefs {
|
||||||
|
@ -66,7 +69,7 @@ impl InnerCtx {
|
||||||
baseimg: conn.get_pool(&config.storage.base_image_pool).await?,
|
baseimg: conn.get_pool(&config.storage.base_image_pool).await?,
|
||||||
};
|
};
|
||||||
|
|
||||||
tracing::trace!("Connecting to database");
|
trace!("Connecting to database");
|
||||||
let db_uri = config.db_uri.clone();
|
let db_uri = config.db_uri.clone();
|
||||||
let sqldb = tokio::task::spawn_blocking(|| {
|
let sqldb = tokio::task::spawn_blocking(|| {
|
||||||
let manager = ConnectionManager::<SqliteConnection>::new(db_uri);
|
let manager = ConnectionManager::<SqliteConnection>::new(db_uri);
|
||||||
|
@ -77,7 +80,7 @@ impl InnerCtx {
|
||||||
.unwrap()?;
|
.unwrap()?;
|
||||||
|
|
||||||
{
|
{
|
||||||
tracing::trace!("Running pending migrations");
|
trace!("Running pending migrations");
|
||||||
let mut conn = sqldb.get()?;
|
let mut conn = sqldb.get()?;
|
||||||
tokio::task::spawn_blocking(move || {
|
tokio::task::spawn_blocking(move || {
|
||||||
conn.run_pending_migrations(MIGRATIONS)
|
conn.run_pending_migrations(MIGRATIONS)
|
||||||
|
@ -87,13 +90,11 @@ impl InnerCtx {
|
||||||
.unwrap()?;
|
.unwrap()?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let events = Arc::new(EventServer::new());
|
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
sqldb,
|
sqldb,
|
||||||
config,
|
config,
|
||||||
|
zones,
|
||||||
virt: VirtCtx { conn, pools },
|
virt: VirtCtx { conn, pools },
|
||||||
events,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,13 +1,14 @@
|
||||||
|
use crate::model::Subnet;
|
||||||
|
use log::*;
|
||||||
use nzr_api::config::DNSConfig;
|
use nzr_api::config::DNSConfig;
|
||||||
use std::borrow::Borrow;
|
use std::borrow::Borrow;
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
|
use std::net::Ipv4Addr;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::{Mutex, RwLock};
|
use tokio::sync::{Mutex, RwLock};
|
||||||
|
|
||||||
use nzr_api::model::{Instance, SubnetData};
|
|
||||||
|
|
||||||
use hickory_proto::rr::Name;
|
use hickory_proto::rr::Name;
|
||||||
use hickory_server::authority::{AuthorityObject, Catalog};
|
use hickory_server::authority::{AuthorityObject, Catalog};
|
||||||
use hickory_server::proto::rr::{rdata::soa, RData, RecordSet};
|
use hickory_server::proto::rr::{rdata::soa, RData, RecordSet};
|
||||||
|
@ -69,7 +70,7 @@ pub struct InnerZD {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn make_rectree_with_soa(name: &Name, config: &DNSConfig) -> BTreeMap<RrKey, RecordSet> {
|
pub fn make_rectree_with_soa(name: &Name, config: &DNSConfig) -> BTreeMap<RrKey, RecordSet> {
|
||||||
tracing::debug!("Creating initial SOA for {}", &name);
|
debug!("Creating initial SOA for {}", &name);
|
||||||
let mut records: BTreeMap<RrKey, RecordSet> = BTreeMap::new();
|
let mut records: BTreeMap<RrKey, RecordSet> = BTreeMap::new();
|
||||||
let soa_key = RrKey::new(
|
let soa_key = RrKey::new(
|
||||||
LowerName::from(name),
|
LowerName::from(name),
|
||||||
|
@ -118,28 +119,24 @@ impl InnerZD {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a new DNS zone for the given subnet.
|
/// Creates a new DNS zone for the given subnet.
|
||||||
pub async fn new_zone(
|
pub async fn new_zone(&self, subnet: &Subnet) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
&self,
|
|
||||||
zone_id: impl AsRef<str>,
|
|
||||||
subnet: &SubnetData,
|
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
|
||||||
if let Some(name) = &subnet.domain_name {
|
if let Some(name) = &subnet.domain_name {
|
||||||
let rectree = make_rectree_with_soa(name, &self.config);
|
let name: Name = name.parse()?;
|
||||||
|
let rectree = make_rectree_with_soa(&name, &self.config);
|
||||||
let auth = InMemoryAuthority::new(
|
let auth = InMemoryAuthority::new(
|
||||||
name.clone(),
|
name,
|
||||||
rectree,
|
rectree,
|
||||||
hickory_server::authority::ZoneType::Primary,
|
hickory_server::authority::ZoneType::Primary,
|
||||||
false,
|
false,
|
||||||
)?;
|
)?;
|
||||||
self.import(zone_id.as_ref(), auth).await;
|
self.import(&subnet.ifname.to_string(), auth).await;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generates a zone with the given records.
|
pub async fn import(&self, name: &str, auth: InMemoryAuthority) {
|
||||||
async fn import(&self, name: &str, auth: InMemoryAuthority) {
|
|
||||||
let auth_arc = Arc::new(auth);
|
let auth_arc = Arc::new(auth);
|
||||||
tracing::debug!(
|
log::debug!(
|
||||||
"Importing {} with {} records...",
|
"Importing {} with {} records...",
|
||||||
name,
|
name,
|
||||||
auth_arc.records().await.len()
|
auth_arc.records().await.len()
|
||||||
|
@ -162,23 +159,26 @@ impl InnerZD {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Adds a new host record in the DNS zone.
|
/// Adds a new host record in the DNS zone.
|
||||||
pub async fn new_record(&self, inst: &Instance) -> Result<(), Box<dyn std::error::Error>> {
|
pub async fn new_record(
|
||||||
let hostname = Name::from_str(&inst.name)?;
|
&self,
|
||||||
|
interface: &str,
|
||||||
|
name: &str,
|
||||||
|
addr: Ipv4Addr,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let hostname = Name::from_str(name)?;
|
||||||
let zones = self.map.lock().await;
|
let zones = self.map.lock().await;
|
||||||
let zone = zones.get(&inst.lease.subnet).unwrap_or(&self.default_zone);
|
let zone = zones.get(interface).unwrap_or(&self.default_zone);
|
||||||
let fqdn = {
|
let fqdn = {
|
||||||
let origin: Name = zone.origin().into();
|
let origin: Name = zone.origin().into();
|
||||||
hostname.append_domain(&origin)?
|
hostname.append_domain(&origin)?
|
||||||
};
|
};
|
||||||
|
|
||||||
tracing::debug!(
|
log::debug!(
|
||||||
"Creating new host entry {} in zone {}...",
|
"Creating new host entry {} in zone {}...",
|
||||||
&fqdn,
|
&fqdn,
|
||||||
zone.origin()
|
zone.origin()
|
||||||
);
|
);
|
||||||
|
|
||||||
let addr = inst.lease.addr.addr;
|
|
||||||
|
|
||||||
let record = Record::from_rdata(fqdn, 3600, RData::A(addr.into()));
|
let record = Record::from_rdata(fqdn, 3600, RData::A(addr.into()));
|
||||||
zone.upsert(record, 0).await;
|
zone.upsert(record, 0).await;
|
||||||
self.catalog()
|
self.catalog()
|
||||||
|
@ -189,10 +189,14 @@ impl InnerZD {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn delete_record(&self, inst: &Instance) -> Result<bool, Box<dyn std::error::Error>> {
|
pub async fn delete_record(
|
||||||
let hostname = Name::from_str(&inst.name)?;
|
&self,
|
||||||
|
interface: &str,
|
||||||
|
name: &str,
|
||||||
|
) -> Result<bool, Box<dyn std::error::Error>> {
|
||||||
|
let hostname = Name::from_str(name)?;
|
||||||
let mut zones = self.map.lock().await;
|
let mut zones = self.map.lock().await;
|
||||||
if let Some(zone) = zones.get_mut(&inst.lease.subnet) {
|
if let Some(zone) = zones.get_mut(interface) {
|
||||||
let hostname: LowerName = hostname.into();
|
let hostname: LowerName = hostname.into();
|
||||||
self.catalog.0.write().await.remove(&hostname);
|
self.catalog.0.write().await.remove(&hostname);
|
||||||
let key = RrKey::new(hostname, hickory_server::proto::rr::RecordType::A);
|
let key = RrKey::new(hostname, hickory_server::proto::rr::RecordType::A);
|
|
@ -1,16 +0,0 @@
|
||||||
use std::io;
|
|
||||||
|
|
||||||
use tokio::net::UnixListener;
|
|
||||||
|
|
||||||
use crate::ctx::Context;
|
|
||||||
|
|
||||||
pub async fn event_server(ctx: Context) -> io::Result<()> {
|
|
||||||
let sock = UnixListener::bind(&ctx.config.rpc.events_sock)?;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
// Wait until we have an available slot for a connection
|
|
||||||
ctx.events.until_available().await;
|
|
||||||
let (client, addr) = sock.accept().await?;
|
|
||||||
ctx.events.spawn(client, addr).await?;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,41 +1,79 @@
|
||||||
mod cmd;
|
mod cmd;
|
||||||
mod ctrl;
|
mod ctrl;
|
||||||
mod ctx;
|
mod ctx;
|
||||||
mod event;
|
mod dns;
|
||||||
mod model;
|
mod model;
|
||||||
mod rpc;
|
mod rpc;
|
||||||
|
|
||||||
use std::str::FromStr;
|
use hickory_server::ServerFuture;
|
||||||
|
use log::LevelFilter;
|
||||||
|
use log::*;
|
||||||
|
use model::{Instance, Subnet};
|
||||||
use nzr_api::config;
|
use nzr_api::config;
|
||||||
|
use std::{net::IpAddr, str::FromStr};
|
||||||
|
use tokio::net::UdpSocket;
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread")]
|
#[tokio::main(flavor = "multi_thread")]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let cfg: config::Config = config::Config::figment().extract()?;
|
let cfg: config::Config = config::Config::figment().extract()?;
|
||||||
let ctx = ctx::Context::new(cfg).await?;
|
let ctx = ctx::Context::new(cfg).await?;
|
||||||
|
|
||||||
let mut bad_loglevel = false;
|
syslog::init_unix(
|
||||||
let log_level = tracing::Level::from_str(&ctx.config.log_level).unwrap_or_else(|_| {
|
syslog::Facility::LOG_DAEMON,
|
||||||
bad_loglevel = true;
|
LevelFilter::from_str(ctx.config.log_level.as_str())?,
|
||||||
tracing::Level::WARN
|
)?;
|
||||||
});
|
|
||||||
|
|
||||||
tracing_subscriber::fmt().with_max_level(log_level).init();
|
info!("Hydrating initial zones...");
|
||||||
|
for subnet in Subnet::all(&ctx).await? {
|
||||||
|
// A records
|
||||||
|
if let Err(err) = ctx.zones.new_zone(&subnet).await {
|
||||||
|
error!("Couldn't create zone for {}: {}", &subnet.ifname, err);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
match Instance::all_in_subnet(&ctx, &subnet).await {
|
||||||
|
Ok(leases) => {
|
||||||
|
for lease in leases {
|
||||||
|
let Ok(lease_addr) = subnet.network.make_ip(lease.host_num as u32) else {
|
||||||
|
warn!("Ignoring {} due to lease address issue", &lease.name);
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
if bad_loglevel {
|
if let Err(err) = ctx
|
||||||
tracing::warn!("Couldn't parse log level from config, defaulting to {log_level}");
|
.zones
|
||||||
|
.new_record(&subnet.ifname.to_string(), &lease.name, lease_addr)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
error!(
|
||||||
|
"Failed to set up lease for {} in {}: {}",
|
||||||
|
&lease.name, &subnet.ifname, err
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!("Couldn't get leases for {}: {}", &subnet.ifname, err);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run both the RPC and events servers
|
// DNS init
|
||||||
|
let mut dns_listener = ServerFuture::new(ctx.zones.catalog());
|
||||||
|
let dns_socket = {
|
||||||
|
let dns_ip: IpAddr = ctx.config.dns.listen_addr.parse()?;
|
||||||
|
UdpSocket::bind((dns_ip, ctx.config.dns.port)).await?
|
||||||
|
};
|
||||||
|
dns_listener.register_socket(dns_socket);
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
res = rpc::serve(ctx.clone()) => {
|
res = rpc::serve(ctx.clone()) => {
|
||||||
if let Err(err) = res {
|
if let Err(err) = res {
|
||||||
tracing::error!("RPC server error: {err}");
|
error!("Error from RPC: {}", err);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
res = event::event_server(ctx.clone()) => {
|
res = dns_listener.block_until_done() => {
|
||||||
if let Err(err) = res {
|
if let Err(err) = res {
|
||||||
tracing::error!("Event server error: {err}");
|
error!("Error from DNS: {}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,14 +20,12 @@ use tx::Transactable;
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum ModelError {
|
pub enum ModelError {
|
||||||
#[error("{0}")]
|
#[error("Database error occurred: {0}")]
|
||||||
Db(#[from] diesel::result::Error),
|
Db(#[from] diesel::result::Error),
|
||||||
#[error("Database pool error ({0})")]
|
#[error("Unable to get database handle: {0}")]
|
||||||
Pool(#[from] diesel::r2d2::PoolError),
|
Pool(#[from] diesel::r2d2::PoolError),
|
||||||
#[error("{0}")]
|
#[error("{0}")]
|
||||||
Cidr(#[from] cidr::Error),
|
Cidr(#[from] cidr::Error),
|
||||||
#[error("Instance belongs to a subnet that has since disappeared")]
|
|
||||||
NoSubnet,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
diesel::table! {
|
diesel::table! {
|
||||||
|
@ -249,7 +247,10 @@ impl Instance {
|
||||||
|
|
||||||
/// Creates an [nzr_api::model::Instance] from the information available in
|
/// Creates an [nzr_api::model::Instance] from the information available in
|
||||||
/// the database.
|
/// the database.
|
||||||
pub async fn api_model(&self, ctx: &Context) -> Result<nzr_api::model::Instance, ModelError> {
|
pub async fn api_model(
|
||||||
|
&self,
|
||||||
|
ctx: &Context,
|
||||||
|
) -> Result<nzr_api::model::Instance, Box<dyn std::error::Error>> {
|
||||||
let netid = self.subnet_id;
|
let netid = self.subnet_id;
|
||||||
let Some(subnet) = ctx
|
let Some(subnet) = ctx
|
||||||
.spawn_db(move |mut db| Subnet::table().find(netid).load::<Subnet>(&mut db))
|
.spawn_db(move |mut db| Subnet::table().find(netid).load::<Subnet>(&mut db))
|
||||||
|
@ -257,7 +258,7 @@ impl Instance {
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.next()
|
.next()
|
||||||
else {
|
else {
|
||||||
return Err(ModelError::NoSubnet);
|
todo!("something went horribly wrong");
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(nzr_api::model::Instance {
|
Ok(nzr_api::model::Instance {
|
||||||
|
@ -403,7 +404,7 @@ impl Subnet {
|
||||||
// the API.
|
// the API.
|
||||||
Ok(addr) => Some(addr),
|
Ok(addr) => Some(addr),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
tracing::error!(
|
log::error!(
|
||||||
"Error parsing DNS server '{}' for {}: {}",
|
"Error parsing DNS server '{}' for {}: {}",
|
||||||
s,
|
s,
|
||||||
&self.name,
|
&self.name,
|
||||||
|
@ -415,7 +416,7 @@ impl Subnet {
|
||||||
.collect(),
|
.collect(),
|
||||||
domain_name: self.domain_name.as_ref().map(|s| {
|
domain_name: self.domain_name.as_ref().map(|s| {
|
||||||
Name::from_str(s).unwrap_or_else(|e| {
|
Name::from_str(s).unwrap_or_else(|e| {
|
||||||
tracing::error!("Error parsing DNS name for {}: {}", &self.name, e);
|
log::error!("Error parsing DNS name for {}: {}", &self.name, e);
|
||||||
Name::default()
|
Name::default()
|
||||||
})
|
})
|
||||||
}),
|
}),
|
||||||
|
|
|
@ -48,7 +48,7 @@ impl<'a, T: Transactable> Drop for Transaction<'a, T> {
|
||||||
let ctx = self.ctx.clone();
|
let ctx = self.ctx.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(err) = inner.undo_tx(&ctx).await {
|
if let Err(err) = inner.undo_tx(&ctx).await {
|
||||||
tracing::error!("Error undoing transaction: {err}");
|
log::error!("Error undoing transaction: {err}");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
158
nzrd/src/rpc.rs
158
nzrd/src/rpc.rs
|
@ -1,6 +1,5 @@
|
||||||
use futures::{future, StreamExt};
|
use futures::{future, StreamExt};
|
||||||
use nzr_api::error::{ApiError, ErrorType, ToApiResult};
|
use nzr_api::{args, model, InstanceQuery, Nazrin};
|
||||||
use nzr_api::{args, model, nzr_event, InstanceQuery, Nazrin};
|
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tarpc::server::{BaseChannel, Channel};
|
use tarpc::server::{BaseChannel, Channel};
|
||||||
|
@ -14,8 +13,8 @@ use uuid::Uuid;
|
||||||
use crate::cmd;
|
use crate::cmd;
|
||||||
use crate::ctx::Context;
|
use crate::ctx::Context;
|
||||||
use crate::model::{Instance, SshPubkey, Subnet};
|
use crate::model::{Instance, SshPubkey, Subnet};
|
||||||
|
use log::*;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use tracing::*;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct NzrServer {
|
pub struct NzrServer {
|
||||||
|
@ -37,7 +36,7 @@ impl Nazrin for NzrServer {
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
build_args: args::NewInstance,
|
build_args: args::NewInstance,
|
||||||
) -> Result<uuid::Uuid, ApiError> {
|
) -> Result<uuid::Uuid, String> {
|
||||||
let progress = Arc::new(RwLock::new(crate::ctrl::vm::Progress {
|
let progress = Arc::new(RwLock::new(crate::ctrl::vm::Progress {
|
||||||
status_text: "Starting...".to_owned(),
|
status_text: "Starting...".to_owned(),
|
||||||
percentage: 0.0,
|
percentage: 0.0,
|
||||||
|
@ -45,11 +44,13 @@ impl Nazrin for NzrServer {
|
||||||
let prog_task = progress.clone();
|
let prog_task = progress.clone();
|
||||||
let build_task = tokio::spawn(async move {
|
let build_task = tokio::spawn(async move {
|
||||||
let (inst, dom) =
|
let (inst, dom) =
|
||||||
cmd::vm::new_instance(self.ctx.clone(), prog_task.clone(), &build_args).await?;
|
cmd::vm::new_instance(self.ctx.clone(), prog_task.clone(), &build_args)
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("Instance creation failed: {}", e))?;
|
||||||
let mut api_model = inst
|
let mut api_model = inst
|
||||||
.api_model(&self.ctx)
|
.api_model(&self.ctx)
|
||||||
.await
|
.await
|
||||||
.to_api_with("Couldn't generate API response")?;
|
.map_err(|e| format!("Couldn't generate API response: {e}"))?;
|
||||||
match dom.state().await {
|
match dom.state().await {
|
||||||
Ok(state) => {
|
Ok(state) => {
|
||||||
api_model.state = state.into();
|
api_model.state = state.into();
|
||||||
|
@ -58,9 +59,6 @@ impl Nazrin for NzrServer {
|
||||||
warn!("Unable to get instance state: {err}");
|
warn!("Unable to get instance state: {err}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Inform event listeners
|
|
||||||
nzr_event!(self.ctx.events, Created, api_model);
|
|
||||||
Ok(api_model)
|
Ok(api_model)
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -96,7 +94,7 @@ impl Nazrin for NzrServer {
|
||||||
Some(
|
Some(
|
||||||
task.inner
|
task.inner
|
||||||
.await
|
.await
|
||||||
.to_api_with("Task failed with panic")
|
.map_err(|err| format!("Task failed with panic: {}", err))
|
||||||
.and_then(|res| res),
|
.and_then(|res| res),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
|
@ -110,16 +108,10 @@ impl Nazrin for NzrServer {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_instance(
|
async fn delete_instance(self, _: tarpc::context::Context, name: String) -> Result<(), String> {
|
||||||
self,
|
cmd::vm::delete_instance(self.ctx.clone(), name)
|
||||||
_: tarpc::context::Context,
|
.await
|
||||||
name: String,
|
.map_err(|e| format!("Couldn't delete instance: {}", e))?;
|
||||||
) -> Result<(), ApiError> {
|
|
||||||
let api_model = cmd::vm::delete_instance(self.ctx.clone(), name).await?;
|
|
||||||
|
|
||||||
if let Some(api_model) = api_model {
|
|
||||||
nzr_event!(self.ctx.events, Deleted, api_model);
|
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,16 +119,18 @@ impl Nazrin for NzrServer {
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
query: nzr_api::InstanceQuery,
|
query: nzr_api::InstanceQuery,
|
||||||
) -> Result<Option<model::Instance>, ApiError> {
|
) -> Result<Option<model::Instance>, String> {
|
||||||
let res = match query {
|
let res = match query {
|
||||||
InstanceQuery::Name(name) => Instance::get_by_name(&self.ctx, name).await,
|
InstanceQuery::Name(name) => Instance::get_by_name(&self.ctx, name).await,
|
||||||
InstanceQuery::MacAddr(addr) => Instance::get_by_mac(&self.ctx, addr).await,
|
InstanceQuery::MacAddr(addr) => Instance::get_by_mac(&self.ctx, addr).await,
|
||||||
InstanceQuery::Ipv4Addr(addr) => Instance::get_by_ip4(&self.ctx, addr).await,
|
InstanceQuery::Ipv4Addr(addr) => Instance::get_by_ip4(&self.ctx, addr).await,
|
||||||
}
|
}
|
||||||
.to_api()?;
|
.map_err(|e| e.to_string())?;
|
||||||
|
|
||||||
if let Some(inst) = res {
|
if let Some(inst) = res {
|
||||||
inst.api_model(&self.ctx).await.to_api().map(Some)
|
inst.api_model(&self.ctx)
|
||||||
|
.await
|
||||||
|
.map_or_else(|e| Err(e.to_string()), |m| Ok(Some(m)))
|
||||||
} else {
|
} else {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
@ -146,10 +140,10 @@ impl Nazrin for NzrServer {
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
with_status: bool,
|
with_status: bool,
|
||||||
) -> Result<Vec<model::Instance>, ApiError> {
|
) -> Result<Vec<model::Instance>, String> {
|
||||||
let db_models = Instance::all(&self.ctx)
|
let db_models = Instance::all(&self.ctx)
|
||||||
.await
|
.await
|
||||||
.to_api_type(ErrorType::Database)?;
|
.map_err(|e| format!("Unable to get all instances: {e}"))?;
|
||||||
let mut models = Vec::new();
|
let mut models = Vec::new();
|
||||||
for inst in db_models {
|
for inst in db_models {
|
||||||
let mut api_model = match inst.api_model(&self.ctx).await {
|
let mut api_model = match inst.api_model(&self.ctx).await {
|
||||||
|
@ -187,39 +181,34 @@ impl Nazrin for NzrServer {
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
build_args: model::Subnet,
|
build_args: model::Subnet,
|
||||||
) -> Result<model::Subnet, ApiError> {
|
) -> Result<model::Subnet, String> {
|
||||||
let subnet = Subnet::insert(&self.ctx, build_args.name, build_args.data)
|
cmd::net::add_subnet(&self.ctx, build_args)
|
||||||
.await
|
.await
|
||||||
.to_api_type(ErrorType::Database)?
|
.map_err(|e| e.to_string())?
|
||||||
.api_model()
|
.api_model()
|
||||||
.to_api_with("Unable to generate API model")?;
|
.map_err(|e| e.to_string())
|
||||||
|
|
||||||
// inform event listeners
|
|
||||||
nzr_event!(self.ctx.events, Created, subnet);
|
|
||||||
Ok(subnet)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn modify_subnet(
|
async fn modify_subnet(
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
edit_args: model::Subnet,
|
edit_args: model::Subnet,
|
||||||
) -> Result<model::Subnet, ApiError> {
|
) -> Result<model::Subnet, String> {
|
||||||
if let Some(subnet) = Subnet::get_by_name(&self.ctx, &edit_args.name)
|
if let Some(subnet) = Subnet::get_by_name(&self.ctx, &edit_args.name)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| e.to_string())?
|
.map_err(|e| e.to_string())?
|
||||||
{
|
{
|
||||||
Err("Modifying subnets not yet supported".into())
|
todo!("support updating Subnets")
|
||||||
} else {
|
} else {
|
||||||
Err(ErrorType::NotFound.into())
|
Err(format!("Subnet {} not found", &edit_args.name))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_subnets(self, _: tarpc::context::Context) -> Result<Vec<model::Subnet>, ApiError> {
|
async fn get_subnets(self, _: tarpc::context::Context) -> Result<Vec<model::Subnet>, String> {
|
||||||
Subnet::all(&self.ctx)
|
Subnet::all(&self.ctx).await.map_or_else(
|
||||||
.await
|
|e| Err(e.to_string()),
|
||||||
.to_api_with("Couldn't get list of subnets")
|
|v| {
|
||||||
.map(|v| {
|
Ok(v.into_iter()
|
||||||
v.into_iter()
|
|
||||||
.filter_map(|s| match s.api_model() {
|
.filter_map(|s| match s.api_model() {
|
||||||
Ok(model) => Some(model),
|
Ok(model) => Some(model),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
@ -227,43 +216,23 @@ impl Nazrin for NzrServer {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect()
|
.collect())
|
||||||
})
|
},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_subnet(
|
async fn delete_subnet(
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
subnet_name: String,
|
subnet_name: String,
|
||||||
) -> Result<(), ApiError> {
|
) -> Result<(), String> {
|
||||||
if let Some(subnet) = Subnet::get_by_name(&self.ctx, subnet_name)
|
cmd::net::delete_subnet(&self.ctx, &subnet_name)
|
||||||
.await
|
.await
|
||||||
.to_api_type(ErrorType::Database)?
|
.map_err(|e| e.to_string())?;
|
||||||
{
|
|
||||||
let api_model = match subnet.api_model() {
|
|
||||||
Ok(model) => Some(model),
|
|
||||||
Err(err) => {
|
|
||||||
tracing::error!("Unable to generate model for clients: {err}");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
subnet
|
|
||||||
.delete(&self.ctx)
|
|
||||||
.await
|
|
||||||
.to_api_type(ErrorType::Database)?;
|
|
||||||
|
|
||||||
if let Some(api_model) = api_model {
|
|
||||||
nzr_event!(&self.ctx.events, Deleted, api_model);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
|
||||||
Err(ErrorType::NotFound.into())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn garbage_collect(self, _: tarpc::context::Context) -> Result<(), ApiError> {
|
async fn garbage_collect(self, _: tarpc::context::Context) -> Result<(), String> {
|
||||||
cmd::vm::prune_instances(&self.ctx)
|
cmd::vm::prune_instances(&self.ctx)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| e.to_string())?;
|
.map_err(|e| e.to_string())?;
|
||||||
|
@ -274,52 +243,50 @@ impl Nazrin for NzrServer {
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
id: i32,
|
id: i32,
|
||||||
) -> Result<Vec<u8>, ApiError> {
|
) -> Result<Vec<u8>, String> {
|
||||||
if let Some(db_model) = Instance::get(&self.ctx, id)
|
let Some(db_model) = Instance::get(&self.ctx, id)
|
||||||
.await
|
.await
|
||||||
.to_api_type(ErrorType::Database)?
|
.map_err(|e| e.to_string())?
|
||||||
{
|
else {
|
||||||
|
return Err("Instance doesn't exist".to_owned());
|
||||||
|
};
|
||||||
|
|
||||||
Ok(db_model.ci_userdata.unwrap_or_default())
|
Ok(db_model.ci_userdata.unwrap_or_default())
|
||||||
} else {
|
|
||||||
Err(ErrorType::NotFound.into())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_ssh_pubkeys(
|
async fn get_ssh_pubkeys(
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
) -> Result<Vec<model::SshPubkey>, ApiError> {
|
) -> Result<Vec<model::SshPubkey>, String> {
|
||||||
SshPubkey::all(&self.ctx)
|
SshPubkey::all(&self.ctx).await.map_or_else(
|
||||||
.await
|
|e| Err(e.to_string()),
|
||||||
.to_api_type(ErrorType::Database)
|
|k| Ok(k.iter().map(|k| k.api_model()).collect()),
|
||||||
.map(|k| k.iter().map(|k| k.api_model()).collect())
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn add_ssh_pubkey(
|
async fn add_ssh_pubkey(
|
||||||
self,
|
self,
|
||||||
_: tarpc::context::Context,
|
_: tarpc::context::Context,
|
||||||
pub_key: String,
|
pub_key: String,
|
||||||
) -> Result<model::SshPubkey, ApiError> {
|
) -> Result<model::SshPubkey, String> {
|
||||||
let pubkey = model::SshPubkey::from_str(&pub_key).to_api_type(ErrorType::Parse)?;
|
let pubkey = model::SshPubkey::from_str(&pub_key).map_err(|e| e.to_string())?;
|
||||||
|
|
||||||
SshPubkey::insert(&self.ctx, pubkey.algorithm, pubkey.key_data, pubkey.comment)
|
SshPubkey::insert(&self.ctx, pubkey.algorithm, pubkey.key_data, pubkey.comment)
|
||||||
.await
|
.await
|
||||||
.to_api_type(ErrorType::Database)
|
.map_err(|e| e.to_string())
|
||||||
.map(|k| k.api_model())
|
.map(|k| k.api_model())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_ssh_pubkey(self, _: tarpc::context::Context, id: i32) -> Result<(), ApiError> {
|
async fn delete_ssh_pubkey(self, _: tarpc::context::Context, id: i32) -> Result<(), String> {
|
||||||
if let Some(key) = SshPubkey::get(&self.ctx, id)
|
let Some(key) = SshPubkey::get(&self.ctx, id)
|
||||||
.await
|
.await
|
||||||
.to_api_type(ErrorType::Database)?
|
.map_err(|e| e.to_string())?
|
||||||
{
|
else {
|
||||||
key.delete(&self.ctx)
|
return Err("SSH key with ID doesn't exist".into());
|
||||||
.await
|
};
|
||||||
.to_api_type(ErrorType::Database)?;
|
|
||||||
|
key.delete(&self.ctx).await.map_err(|e| e.to_string())?;
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
|
||||||
Err(ErrorType::NotFound.into())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -357,6 +324,7 @@ pub async fn serve(ctx: Context) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
debug!("Listening for new connection...");
|
debug!("Listening for new connection...");
|
||||||
let (conn, _addr) = listener.accept().await?;
|
let (conn, _addr) = listener.accept().await?;
|
||||||
let ctx = ctx.clone();
|
let ctx = ctx.clone();
|
||||||
|
// hack?
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let framed = codec_builder.new_framed(conn);
|
let framed = codec_builder.new_framed(conn);
|
||||||
let transport = tarpc::serde_transport::new(framed, Bincode::default());
|
let transport = tarpc::serde_transport::new(framed, Bincode::default());
|
||||||
|
@ -372,6 +340,6 @@ pub async fn serve(ctx: Context) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
}
|
}
|
||||||
|
|
||||||
struct InstCreateStatus {
|
struct InstCreateStatus {
|
||||||
inner: JoinHandle<Result<model::Instance, ApiError>>,
|
inner: JoinHandle<Result<model::Instance, String>>,
|
||||||
progress: Arc<RwLock<crate::ctrl::vm::Progress>>,
|
progress: Arc<RwLock<crate::ctrl::vm::Progress>>,
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,7 +125,11 @@ async fn handle_message(ctx: &Context, from: SocketAddr, msg: &Message) {
|
||||||
|
|
||||||
{
|
{
|
||||||
let opts = response.opts_mut();
|
let opts = response.opts_mut();
|
||||||
let giaddr = msg.giaddr();
|
let giaddr = if msg.giaddr().is_unspecified() {
|
||||||
|
todo!("no relay??")
|
||||||
|
} else {
|
||||||
|
msg.giaddr()
|
||||||
|
};
|
||||||
|
|
||||||
opts.insert(DhcpOption::ServerIdentifier(giaddr));
|
opts.insert(DhcpOption::ServerIdentifier(giaddr));
|
||||||
if let Some(time) = lease_time {
|
if let Some(time) = lease_time {
|
||||||
|
|
|
@ -1,15 +0,0 @@
|
||||||
[package]
|
|
||||||
name = "nzrdns"
|
|
||||||
version = "0.1.0"
|
|
||||||
edition = "2021"
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
|
|
||||||
nzr-api = { path = "../nzr-api" }
|
|
||||||
hickory-server = "0.24"
|
|
||||||
hickory-proto = { version = "0.24", features = ["serde-config"] }
|
|
||||||
tracing = "0.1"
|
|
||||||
tracing-subscriber = "0.3"
|
|
||||||
async-trait = "0.1"
|
|
||||||
futures = "0.3"
|
|
||||||
anyhow = "1"
|
|
|
@ -1,169 +0,0 @@
|
||||||
use std::{net::IpAddr, process::ExitCode};
|
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
use dns::ZoneData;
|
|
||||||
use futures::StreamExt;
|
|
||||||
use hickory_server::ServerFuture;
|
|
||||||
use nzr_api::{
|
|
||||||
config::Config,
|
|
||||||
event::{client::EventClient, EventMessage, ResourceAction},
|
|
||||||
NazrinClient,
|
|
||||||
};
|
|
||||||
use tokio::{
|
|
||||||
io::AsyncRead,
|
|
||||||
net::{UdpSocket, UnixStream},
|
|
||||||
};
|
|
||||||
|
|
||||||
mod dns;
|
|
||||||
|
|
||||||
/// Function to handle incoming events from Nazrin and update the DNS database
|
|
||||||
/// accordingly.
|
|
||||||
async fn event_handler<T: AsyncRead>(zones: ZoneData, mut events: EventClient<T>) {
|
|
||||||
while let Some(event) = events.next().await {
|
|
||||||
match event {
|
|
||||||
Ok(EventMessage::Instance(event)) => {
|
|
||||||
let ent = &event.entity;
|
|
||||||
match event.action {
|
|
||||||
ResourceAction::Created => {
|
|
||||||
if let Err(err) = zones.new_record(ent).await {
|
|
||||||
tracing::error!("Unable to add record {}: {err}", ent.name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ResourceAction::Deleted => {
|
|
||||||
if let Err(err) = zones.delete_record(ent).await {
|
|
||||||
tracing::error!("Unable to delete record {}: {err}", ent.name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
misc => {
|
|
||||||
tracing::debug!("ignoring instance action {misc:?}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(EventMessage::Subnet(event)) => {
|
|
||||||
let ent = &event.entity;
|
|
||||||
match event.action {
|
|
||||||
ResourceAction::Created => {
|
|
||||||
if let Some(name) = ent.data.domain_name.as_ref() {
|
|
||||||
if let Err(err) = zones.new_zone(&ent.name, &ent.data).await {
|
|
||||||
tracing::error!("Unable to add zone {name}: {err}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ResourceAction::Deleted => {
|
|
||||||
if ent.data.domain_name.as_ref().is_some() {
|
|
||||||
zones.delete_zone(&ent.name).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
misc => {
|
|
||||||
tracing::debug!("ignoring subnet action {misc:?}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
tracing::error!("Error getting events: {err}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tracing::warn!("No more events! (did Nazrin shut down?)");
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Hydrates all existing DNS zones.
|
|
||||||
async fn hydrate_zones(zones: ZoneData, api_client: NazrinClient) -> anyhow::Result<()> {
|
|
||||||
tracing::info!("Hydrating initial zones...");
|
|
||||||
let subnets = api_client
|
|
||||||
.get_subnets(nzr_api::default_ctx())
|
|
||||||
.await
|
|
||||||
.context("RPC error getting subnets")?
|
|
||||||
.map_err(|e| anyhow::anyhow!("API error getting subnets: {e}"))?;
|
|
||||||
|
|
||||||
let instances = api_client
|
|
||||||
.get_instances(nzr_api::default_ctx(), false)
|
|
||||||
.await
|
|
||||||
.context("RPC error getting instances")?
|
|
||||||
.map_err(|e| anyhow::anyhow!("API error getting instances: {e}"))?;
|
|
||||||
|
|
||||||
for subnet in subnets {
|
|
||||||
if let Err(err) = zones.new_zone(&subnet.name, &subnet.data).await {
|
|
||||||
tracing::warn!("Couldn't create zone for {}: {err}", &subnet.name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for instance in instances {
|
|
||||||
if let Err(err) = zones.new_record(&instance).await {
|
|
||||||
tracing::warn!("Couldn't create zone entry for {}: {err}", &instance.name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> ExitCode {
|
|
||||||
tracing_subscriber::fmt::init();
|
|
||||||
let cfg: Config = match Config::figment().extract() {
|
|
||||||
Ok(cfg) => cfg,
|
|
||||||
Err(err) => {
|
|
||||||
tracing::error!("Error parsing config: {err}");
|
|
||||||
return ExitCode::FAILURE;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let api_client = {
|
|
||||||
let sock = match UnixStream::connect(&cfg.rpc.socket_path).await {
|
|
||||||
Ok(sock) => sock,
|
|
||||||
Err(err) => {
|
|
||||||
tracing::error!("Connection to nzrd failed: {err}");
|
|
||||||
return ExitCode::FAILURE;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
nzr_api::new_client(sock)
|
|
||||||
};
|
|
||||||
let events = {
|
|
||||||
let sock = match UnixStream::connect(&cfg.rpc.events_sock).await {
|
|
||||||
Ok(sock) => sock,
|
|
||||||
Err(err) => {
|
|
||||||
tracing::error!("Connections to events stream failed: {err}");
|
|
||||||
return ExitCode::FAILURE;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
nzr_api::event::client::EventClient::new(sock)
|
|
||||||
};
|
|
||||||
|
|
||||||
let zones = ZoneData::new(&cfg.dns);
|
|
||||||
|
|
||||||
if let Err(err) = hydrate_zones(zones.clone(), api_client.clone()).await {
|
|
||||||
tracing::error!("{err}");
|
|
||||||
return ExitCode::FAILURE;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut dns_listener = ServerFuture::new(zones.catalog());
|
|
||||||
let dns_socket = {
|
|
||||||
let Ok(dns_ip) = cfg.dns.listen_addr.parse::<IpAddr>() else {
|
|
||||||
tracing::error!("Unable to parse listen_addr");
|
|
||||||
return ExitCode::FAILURE;
|
|
||||||
};
|
|
||||||
|
|
||||||
match UdpSocket::bind((dns_ip, cfg.dns.port)).await {
|
|
||||||
Ok(sock) => sock,
|
|
||||||
Err(err) => {
|
|
||||||
tracing::error!("Couldn't bind to {dns_ip}:{}: {err}", cfg.dns.port);
|
|
||||||
return ExitCode::FAILURE;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
dns_listener.register_socket(dns_socket);
|
|
||||||
|
|
||||||
tokio::select! {
|
|
||||||
_ = event_handler(zones.clone(), events) => {
|
|
||||||
// nothing to do here
|
|
||||||
},
|
|
||||||
res = dns_listener.block_until_done() => {
|
|
||||||
if let Err(err) = res {
|
|
||||||
tracing::error!("Error from DNS: {err}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ExitCode::SUCCESS
|
|
||||||
}
|
|
|
@ -57,7 +57,7 @@ impl Context {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_sshkeys(&self) -> Result<Vec<SshPubkey>> {
|
pub async fn get_sshkeys(&self) -> Result<Vec<SshPubkey>> {
|
||||||
// We don't cache SSH keys, so always get from the API server
|
// TODO: do we cache SSH keys? I don't like the idea of it
|
||||||
let ssh_keys = self
|
let ssh_keys = self
|
||||||
.api_client
|
.api_client
|
||||||
.get_ssh_pubkeys(nzr_api::default_ctx())
|
.get_ssh_pubkeys(nzr_api::default_ctx())
|
||||||
|
|
|
@ -41,6 +41,7 @@ async fn get_meta_data(
|
||||||
Ok(Some(inst)) => {
|
Ok(Some(inst)) => {
|
||||||
let meta = Metadata {
|
let meta = Metadata {
|
||||||
inst_name: &inst.name,
|
inst_name: &inst.name,
|
||||||
|
// XXX: this is very silly imo
|
||||||
ssh_pubkeys: ssh_pubkeys.iter().collect(),
|
ssh_pubkeys: ssh_pubkeys.iter().collect(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -98,7 +99,7 @@ async fn get_vendor_data(
|
||||||
// admin username from an unknown instance.
|
// admin username from an unknown instance.
|
||||||
if let IpAddr::V4(ip) = addr.ip() {
|
if let IpAddr::V4(ip) = addr.ip() {
|
||||||
match ctx.get_instance(ip).await {
|
match ctx.get_instance(ip).await {
|
||||||
Ok(Some(_)) => {
|
Ok(_) => {
|
||||||
let data = model::VendorData {
|
let data = model::VendorData {
|
||||||
username: Some(&ctx.cfg().cloud.admin_user),
|
username: Some(&ctx.cfg().cloud.admin_user),
|
||||||
};
|
};
|
||||||
|
@ -107,14 +108,14 @@ async fn get_vendor_data(
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
StatusCode::INTERNAL_SERVER_ERROR
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
|
||||||
tracing::warn!("Request from unregistered server {ip}");
|
|
||||||
Err(StatusCode::FORBIDDEN)
|
|
||||||
}
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
tracing::error!("{err}");
|
tracing::error!("{err}");
|
||||||
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
||||||
}
|
}
|
||||||
|
_ => {
|
||||||
|
tracing::warn!("Request from unregistered server {ip}");
|
||||||
|
Err(StatusCode::FORBIDDEN)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Err(StatusCode::BAD_REQUEST)
|
Err(StatusCode::BAD_REQUEST)
|
||||||
|
|
Loading…
Reference in a new issue