use std::io::{prelude::*, BufReader}; use std::sync::Arc; use virt::{storage_pool::StoragePool, storage_vol::StorageVol, stream::Stream}; use crate::error::VirtError; use crate::xml::SizeInfo; use crate::{error::PoolError, xml}; use crate::{img, Connection}; /// An abstracted representation of a libvirt volume. pub struct Volume { virt: Arc, pub persist: bool, pub name: String, } impl Volume { /// Upload a disk image from libvirt in a blocking task async fn upload_img(from: impl Read + Send + 'static, to: Stream) -> Result<(), PoolError> { let mut reader = BufReader::with_capacity(4294967296, from); tokio::task::spawn_blocking(move || { loop { // We can't borrow reader as mut twice. As such, most of the function is stored in this let read_bytes = { // Read from file let data = match reader.fill_buf() { Ok(buf) => buf, Err(err) => { if let Err(err) = to.abort() { tracing::warn!("Failed to abort stream: {err}"); } return Err(PoolError::FileError(err)); } }; if data.is_empty() { break; } tracing::trace!("read {} bytes", data.len()); // Send to libvirt let mut send_idx = 0; while send_idx < data.len() { tracing::trace!("sending {} bytes", data.len() - send_idx); match to.send(&data[send_idx..]) { Ok(len) => { send_idx += len; } Err(err) => { if let Err(err) = to.abort() { tracing::warn!("Stream abort failed: {err}"); } return Err(PoolError::VirtError(err)); } } } data.len() }; reader.consume(read_bytes); } Ok(()) }) .await .unwrap() } /// Creates a [VirtVolume] from the given [Volume](crate::xml::Volume) XML data. pub async fn create(pool: &Pool, xml: xml::Volume, flags: u32) -> Result { let virt_pool = pool.virt.clone(); let xml_str = quick_xml::se::to_string(&xml).map_err(PoolError::XmlError)?; let vol = { let xml_str = xml_str.clone(); let vol = tokio::task::spawn_blocking(move || { StorageVol::create_xml(&virt_pool, &xml_str, flags).map_err(PoolError::VirtError) }) .await .unwrap()?; Arc::new(vol) }; if xml.vol_type() == Some(xml::VolType::Qcow2) { let size = xml.capacity.unwrap(); let src_img = img::create_qcow2(size) .await .map_err(PoolError::QemuError)?; let stream_vol = vol.clone(); let stream = tokio::task::spawn_blocking(move || { match Stream::new(&stream_vol.get_connect().map_err(PoolError::VirtError)?, 0) { Ok(s) => Ok(s), Err(err) => { stream_vol.delete(0).ok(); Err(PoolError::VirtError(err)) } } }) .await .unwrap()?; let img_size = src_img.metadata().unwrap().len(); if let Err(err) = vol.upload(&stream, 0, img_size, 0) { vol.delete(0).ok(); return Err(PoolError::CantUpload(err)); } let upload_fh = src_img.try_clone().map_err(PoolError::FileError)?; Self::upload_img(upload_fh, stream).await?; } let name = xml.name.clone(); Ok(Self { virt: vol, persist: false, name, }) } /// Finds a volume by the given pool and name. async fn get(pool: &Pool, name: &str) -> Result { let pool = pool.virt.clone(); let name = name.to_owned(); tokio::task::spawn_blocking(move || { let vol = StorageVol::lookup_by_name(&pool, &name).map_err(PoolError::VirtError)?; Ok(Self { virt: Arc::new(vol), // default to persisting when looking up by name persist: true, name, }) }) .await .unwrap() } /// Permanently deletes the volume. pub async fn delete(&self) -> Result<(), VirtError> { let virt = self.virt.clone(); tokio::task::spawn_blocking(move || virt.delete(0)) .await .unwrap() } /// Clones the data to a new libvirt volume. pub async fn clone_vol( &mut self, pool: &Pool, vol_name: impl AsRef, size: SizeInfo, ) -> Result { let vol_name = vol_name.as_ref(); tracing::debug!("Cloning volume to {vol_name} ({size})"); let virt = self.virt.clone(); let src_path = tokio::task::spawn_blocking(move || virt.get_path().map_err(PoolError::NoPath)) .await .unwrap()?; let src_img = img::clone_qcow2(src_path, size) .await .map_err(PoolError::QemuError)?; let newvol = xml::Volume::new(vol_name, pool.xml.vol_type(), size); let newxml_str = quick_xml::se::to_string(&newvol).map_err(PoolError::XmlError)?; tracing::debug!("Creating new vol..."); let pool_virt = pool.virt.clone(); let cloned = tokio::task::spawn_blocking(move || { StorageVol::create_xml(&pool_virt, &newxml_str, 0).map_err(PoolError::VirtError) }) .await .unwrap()?; match cloned.get_info() { Ok(info) => { if info.capacity != u64::from(size) { tracing::debug!( "libvirt set wrong size {}, trying this again...", info.capacity ); if let Err(er) = cloned.resize(size.into(), 0) { if let Err(er) = cloned.delete(0) { tracing::warn!("Resizing disk failed, and couldn't clean up: {}", er); } return Err(PoolError::VirtError(er)); } } else { tracing::debug!( "capacity is correct ({} bytes), allocation = {} bytes", info.capacity, info.allocation, ); } } Err(er) => { if let Err(er) = cloned.delete(0) { tracing::warn!("Couldn't clean up destination volume: {}", er); } return Err(PoolError::VirtError(er)); } } let stream = { let virt_conn = cloned.get_connect().map_err(PoolError::VirtError)?; let cloned = cloned.clone(); tokio::task::spawn_blocking(move || match Stream::new(&virt_conn, 0) { Ok(s) => Ok(s), Err(er) => { cloned.delete(0).ok(); Err(PoolError::VirtError(er)) } }) .await .unwrap() }?; let img_size = src_img.metadata().unwrap().len(); { let stream = stream.clone(); let cloned = cloned.clone(); tokio::task::spawn_blocking(move || { if let Err(er) = cloned.upload(&stream, 0, img_size, 0) { cloned.delete(0).ok(); Err(PoolError::CantUpload(er)) } else { Ok(()) } }) .await .unwrap()?; } let stream_fh = src_img.try_clone().map_err(PoolError::FileError)?; Self::upload_img(stream_fh, stream).await?; Ok(Self { virt: Arc::new(cloned), persist: false, name: vol_name.to_owned(), }) } } impl Drop for Volume { fn drop(&mut self) { if !self.persist { tracing::debug!("Deleting volume {}", &self.name); self.virt.delete(0).ok(); } } } pub struct Pool { virt: Arc, xml: xml::Pool, } impl AsRef for Pool { fn as_ref(&self) -> &StoragePool { &self.virt } } impl Pool { pub(crate) async fn get(conn: &Connection, id: impl AsRef) -> Result { let conn = conn.virtconn.clone(); let id = id.as_ref().to_owned(); tokio::task::spawn_blocking(move || { let inner = StoragePool::lookup_by_name(&conn, &id).map_err(PoolError::VirtError)?; if !inner.is_active().map_err(PoolError::VirtError)? { inner.create(0).map_err(PoolError::VirtError)?; } let xml_str = inner.get_xml_desc(0).map_err(PoolError::VirtError)?; let xml = quick_xml::de::from_str(&xml_str).map_err(PoolError::XmlError)?; Ok(Self { virt: Arc::new(inner), xml, }) }) .await .unwrap() } pub async fn volume(&self, name: impl AsRef) -> Result { Volume::get(self, name.as_ref()).await } }