libvirt volume refactor

This commit is contained in:
snow flurry 2023-06-30 12:44:39 -07:00
parent f2c5d1073d
commit 4b6f469a58
10 changed files with 304 additions and 64 deletions

10
Cargo.lock generated
View file

@ -1590,6 +1590,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.7"
@ -1863,6 +1872,7 @@ dependencies = [
"mio",
"num_cpus",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys",

View file

@ -42,6 +42,7 @@ pub struct Config {
pub rpc: RPCConfig,
pub log_level: String,
pub db_path: PathBuf,
pub qemu_img_path: Option<PathBuf>,
pub libvirt_uri: String,
pub storage: StorageConfig,
pub dns: DNSConfig,
@ -51,6 +52,7 @@ impl Default for Config {
fn default() -> Self {
Self {
log_level: "WARN".to_owned(),
qemu_img_path: None,
rpc: RPCConfig {
socket_path: PathBuf::from("/var/run/nazrin/nzrd.sock"),
admin_group: None,

View file

@ -31,10 +31,10 @@ pub struct NewInstanceArgs {
/// Memory to assign, in MiB
#[arg(short, long, default_value_t = 1024)]
mem: u32,
/// Primary HDD size, in MiB
/// Primary HDD size, in GiB
#[arg(short, long, default_value_t = 20)]
primary_size: u32,
/// Secndary HDD size, in MiB
/// Secndary HDD size, in GiB
#[arg(short, long)]
secondary_size: Option<u32>,
/// File containing a list of SSH keys to use

View file

@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
tarpc = { version = "0.31", features = ["tokio1", "unix", "serde-transport", "serde-transport-bincode"] }
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "process"] }
tokio-serde = { version = "0.8.0", features = ["bincode"] }
sled = "0.34.7"
# virt = "0.2.11"
@ -34,9 +34,9 @@ trust-dns-server = "0.22.0"
log = "0.4.17"
syslog = "6.0.1"
nix = "0.26.1"
tempdir = "0.3.7"
[dev-dependencies]
tempdir = "0.3.7"
regex = "1"
[[bin]]

View file

@ -90,7 +90,7 @@ pub async fn new_instance(
// and upload it to a vol
let vol_data = Volume::new(&args.name, VolType::Raw, datasize!(1440 KiB));
let mut cidata_vol = VirtVolume::create_xml(&ctx.virt.pools.cidata, vol_data, 0)?;
let mut cidata_vol = VirtVolume::create_xml(&ctx.virt.pools.cidata, vol_data, 0).await?;
let cistream = Stream::new(&cidata_vol.get_connect()?, 0)?;
if let Err(er) = cidata_vol.upload(&cistream, 0, datasize!(1440 KiB).into(), 0) {
@ -120,6 +120,7 @@ pub async fn new_instance(
&args.name,
datasize!((args.disk_sizes.0) GiB),
)
.await
.map_err(|er| cmd_error!("Failed to clone base image: {}", er))?;
// and, if it exists: the second volume
@ -130,11 +131,7 @@ pub async fn new_instance(
ctx.virt.pools.secondary.xml.vol_type(),
datasize!(sec_size GiB),
);
Some(VirtVolume::create_xml(
&ctx.virt.pools.secondary,
voldata,
0,
)?)
Some(VirtVolume::create_xml(&ctx.virt.pools.secondary, voldata, 0).await?)
}
None => None,
};
@ -163,6 +160,7 @@ pub async fn new_instance(
.disk_device(|dsk| {
dsk.volume_source(pri_name, &pri_vol.name)
.target("vda", "virtio")
.qcow2()
.boot_order(1)
})
.disk_device(|fda| {
@ -183,6 +181,7 @@ pub async fn new_instance(
Some(vol) => instdata.disk_device(|dsk| {
dsk.volume_source(sec_name, &vol.name)
.target("vdb", "virtio")
.qcow2()
}),
None => instdata,
}

View file

@ -208,6 +208,14 @@ impl DiskBuilder {
self
}
pub fn qcow2(mut self) -> Self {
self.disk.driver = Some(DiskDriver {
name: "qemu".to_owned(),
r#type: Some("qcow2".to_owned()),
});
self
}
/// Set the source for a block device
pub fn block_source(mut self, dev: &str) -> Self {
self.disk.r#type = DiskType::Block;

View file

@ -239,6 +239,7 @@ pub enum IfaceType {
None,
}
#[skip_serializing_none]
#[derive(Default, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct NetDevice {
#[serde(rename = "@type")]
@ -381,7 +382,7 @@ impl Default for OsData {
}
}
#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq)]
pub enum SizeUnit {
#[serde(rename = "bytes")]
Bytes,
@ -409,6 +410,15 @@ impl std::fmt::Display for SizeUnit {
}
}
impl Serialize for SizeUnit {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.to_string())
}
}
#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SizeInfo {
#[serde(rename = "@unit")]
@ -437,6 +447,17 @@ impl std::fmt::Display for SizeInfo {
}
}
impl SizeInfo {
pub fn qemu_style(&self) -> String {
if self.unit == SizeUnit::Bytes {
self.amount.to_string()
} else {
let unit_str = self.unit.to_string();
format!("{}{}", self.amount, unit_str.chars().next().unwrap())
}
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct CpuTopology {
#[serde(rename = "@sockets")]
@ -472,6 +493,7 @@ pub struct Volume {
pub target: Option<VolTarget>,
}
#[derive(PartialEq, Eq)]
pub enum VolType {
Qcow2,
Block,
@ -513,23 +535,39 @@ impl Volume {
key: None,
}
}
pub fn vol_type(&self) -> Option<VolType> {
if let Some(target) = &self.target {
if let Some(format) = &target.format {
let t = match format.r#type.as_str() {
"qcow2" => VolType::Qcow2,
"raw" => VolType::Raw,
"block" => VolType::Block,
_ => VolType::Invalid,
};
return Some(t);
}
}
None
}
}
#[skip_serializing_none]
#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct VolTarget {
path: Option<String>,
format: Option<TargetFormat>,
pub format: Option<TargetFormat>,
}
#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct TargetFormat {
#[serde(rename = "@type")]
r#type: String,
pub r#type: String,
}
// =^..^= =^..^= =^..^= =^..^= =^..^= =^..^= =^..^= =^..^=
#[skip_serializing_none]
#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename = "pool")]
pub struct Pool {

150
nzrd/src/img.rs Normal file
View file

@ -0,0 +1,150 @@
use std::{
fs::File,
ops::Deref,
path::{Path, PathBuf},
};
use std::future::Future;
use tempdir::TempDir;
use tokio::process::Command;
use crate::ctrl::virtxml::SizeInfo;
#[derive(Debug)]
pub struct ImgError {
message: String,
command_output: Option<String>,
}
impl std::fmt::Display for ImgError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(output) = &self.command_output {
write!(f, "{}\n output from command: {}", self.message, output)
} else {
write!(f, "{}", self.message)
}
}
}
impl From<std::io::Error> for ImgError {
fn from(value: std::io::Error) -> Self {
Self {
message: format!("IO Error: {}", value),
command_output: None,
}
}
}
impl std::error::Error for ImgError {}
impl ImgError {
fn new<S>(message: S) -> Self
where
S: AsRef<str>,
{
Self {
message: message.as_ref().to_owned(),
command_output: None,
}
}
fn with_output<S>(message: S, output: S) -> Self
where
S: AsRef<str>,
{
Self {
message: message.as_ref().to_owned(),
command_output: Some(output.as_ref().to_owned()),
}
}
}
/// Locates where qemu-img is on the server system
fn qemu_img_path() -> Result<PathBuf, ImgError> {
if let Ok(path_var) = std::env::var("PATH") {
for path in path_var.split(':') {
let qemu_img = Path::new(path).join("qemu-img");
if qemu_img.exists() {
return Ok(qemu_img);
}
}
Err(ImgError::new("couldn't find qemu-img in PATH; you may want to define where qemu-img is in nazrin.conf"))
} else {
Err(ImgError::new(
"can't read PATH; you may want to define where qemu-img is in nazrin.conf",
))
}
}
// =^..^= =^..^= =^..^= =^..^= =^..^= =^..^= =^..^= =^..^=
pub struct Qcow2Img {
_img_dir: TempDir,
img_file: File,
}
impl Deref for Qcow2Img {
type Target = File;
fn deref(&self) -> &Self::Target {
&self.img_file
}
}
fn cmd_err(err: std::io::Error) -> ImgError {
ImgError::new(format!("failed to invoke qemu-img: {}", err))
}
async fn run_qemu_img<F, Fut>(func: F) -> Result<Qcow2Img, ImgError>
where
F: FnOnce(Command, &Path) -> Fut,
Fut: Future<Output = std::io::Result<std::process::Output>>,
{
let qi_path = qemu_img_path()?;
let img_dir = TempDir::new("nazrin")?;
let img_path = img_dir.path().join("img");
let qi_out = func(Command::new(&qi_path), &img_path)
.await
.map_err(cmd_err)?;
if !qi_out.status.success() {
Err(ImgError::with_output(
"qemu-img failed",
&String::from_utf8_lossy(&qi_out.stderr),
))
} else {
Ok(Qcow2Img {
_img_dir: img_dir,
img_file: File::open(&img_path)
.expect("Couldn't open image file after qemu-img wrote to it"),
})
}
}
pub async fn create_qcow2(size: SizeInfo) -> Result<Qcow2Img, ImgError> {
run_qemu_img(|mut cmd, img_path| {
cmd.arg("create")
.arg("-f")
.arg("qcow2")
.arg(img_path)
.arg(size.qemu_style())
.output()
})
.await
}
pub async fn clone_qcow2<P>(from: P, size: SizeInfo) -> Result<Qcow2Img, ImgError>
where
P: AsRef<Path>,
{
run_qemu_img(|mut cmd, img_path| {
std::fs::copy(&from, img_path).expect("Copy failed");
cmd.arg("resize")
.arg(img_path)
.arg(size.qemu_style())
.output()
})
.await
}

View file

@ -3,6 +3,7 @@ mod cmd;
mod ctrl;
mod ctx;
mod dns;
mod img;
mod prelude;
mod rpc;
#[cfg(test)]

View file

@ -2,6 +2,8 @@ use std::io::{prelude::*, BufReader};
use std::{fmt::Display, ops::Deref};
use virt::{storage_pool::StoragePool, storage_vol::StorageVol, stream::Stream};
use crate::ctrl::virtxml::VolType;
use crate::img;
use crate::{
ctrl::virtxml::{Pool, SizeInfo, Volume},
prelude::*,
@ -16,15 +18,86 @@ pub struct VirtVolume {
}
impl VirtVolume {
pub fn create_xml(
fn upload_img(from: &std::fs::File, to: Stream) -> Result<(), PoolError> {
let buf_cap: u64 = datasize!(4 MiB).into();
let mut reader = BufReader::with_capacity(buf_cap as usize, from);
loop {
let read_bytes = {
// read from the source file...
let data = match reader.fill_buf() {
Ok(buf) => buf,
Err(er) => {
if let Err(er) = to.abort() {
warn!("Stream abort failed: {}", er);
}
return Err(PoolError::FileError(er));
}
};
if data.is_empty() {
break;
}
debug!("pulled {} bytes", data.len());
// ... and then send upstream
let mut send_idx = 0;
while send_idx < data.len() {
match to.send(&data[send_idx..]) {
Ok(sz) => {
send_idx += sz;
}
Err(er) => {
if let Err(er) = to.abort() {
warn!("Stream abort failed: {}", er);
}
return Err(PoolError::UploadError(er));
}
}
}
data.len()
};
debug!("consuming {} bytes", read_bytes);
reader.consume(read_bytes);
}
Ok(())
}
pub async fn create_xml(
pool: &StoragePool,
xmldata: Volume,
flags: u32,
) -> Result<Self, Box<dyn std::error::Error>> {
let xml = quick_xml::se::to_string(&xmldata)?;
let svol = StorageVol::create_xml(pool, &xml, flags)?;
if xmldata.vol_type() == Some(VolType::Qcow2) {
let size = xmldata.capacity.unwrap().clone();
let src_img = img::create_qcow2(size).await?;
let stream = match Stream::new(&svol.get_connect().map_err(PoolError::VirtError)?, 0) {
Ok(s) => s,
Err(er) => {
svol.delete(0).ok();
return Err(Box::new(er));
}
};
let img_size = src_img.metadata().unwrap().len();
if let Err(er) = svol.upload(&stream, 0, img_size, 0) {
svol.delete(0).ok();
return Err(Box::new(PoolError::CantUpload(er)));
}
Self::upload_img(&*src_img, stream)?;
}
Ok(Self {
inner: StorageVol::create_xml(pool, &xml, flags)?,
inner: svol,
persist: false,
name: xmldata.name,
})
@ -42,7 +115,7 @@ impl VirtVolume {
})
}
pub fn clone_vol(
pub async fn clone_vol(
&mut self,
pool: &VirtPool,
vol_name: &str,
@ -52,7 +125,9 @@ impl VirtVolume {
let src_path = self.get_path().map_err(PoolError::NoPath)?;
let src_fd = std::fs::File::open(src_path).map_err(PoolError::FileError)?;
let src_img = img::clone_qcow2(src_path, size.clone())
.await
.map_err(PoolError::QemuError)?;
let newvol = Volume::new(vol_name, pool.xml.vol_type(), size);
let newxml_str = quick_xml::se::to_string(&newvol).map_err(PoolError::SerdeError)?;
@ -95,59 +170,14 @@ impl VirtVolume {
}
};
let img_size = src_fd.metadata().unwrap().len();
let img_size = src_img.metadata().unwrap().len();
if let Err(er) = cloned.upload(&stream, 0, img_size, 0) {
cloned.delete(0).ok();
return Err(PoolError::CantUpload(er));
}
let buf_cap: u64 = datasize!(4 MiB).into();
let mut reader = BufReader::with_capacity(buf_cap as usize, src_fd);
loop {
let read_bytes = {
// read from the source file...
let data = match reader.fill_buf() {
Ok(buf) => buf,
Err(er) => {
if let Err(er) = stream.abort() {
warn!("Stream abort failed: {}", er);
}
if let Err(er) = cloned.delete(0) {
warn!("Couldn't delete destination volume: {}", er);
}
return Err(PoolError::FileError(er));
}
};
if data.is_empty() {
break;
}
// ... and then send upstream
let mut send_idx = 0;
while send_idx < data.len() {
match stream.send(&data[send_idx..]) {
Ok(sz) => {
send_idx += sz;
}
Err(er) => {
if let Err(er) = stream.abort() {
warn!("Stream abort failed: {}", er);
}
if let Err(er) = cloned.delete(0) {
warn!("Couldn't delete destination volume: {}", er);
}
return Err(PoolError::UploadError(er));
}
}
}
data.len()
};
reader.consume(read_bytes);
}
Self::upload_img(&*src_img, stream)?;
Ok(Self {
inner: cloned,
@ -181,6 +211,7 @@ pub enum PoolError {
FileError(std::io::Error),
CantUpload(virt::error::Error),
UploadError(virt::error::Error),
QemuError(img::ImgError),
}
impl Display for PoolError {
@ -192,6 +223,7 @@ impl Display for PoolError {
Self::FileError(er) => er.fmt(f),
Self::CantUpload(er) => write!(f, "Unable to start upload to image: {}", er),
Self::UploadError(er) => write!(f, "Failed to upload image: {}", er),
Self::QemuError(er) => er.fmt(f),
}
}
}