From a9d5e03bc2eff2e23574ec20dea53bc3862deece Mon Sep 17 00:00:00 2001 From: doriancodes Date: Thu, 19 Dec 2024 00:12:35 +0100 Subject: [PATCH] partially fix mount --- src/main.rs | 33 +++++--- src/modules/mount.rs | 75 ++++++++++++++--- src/modules/session.rs | 182 ++++++++++++++++++++++++++--------------- 3 files changed, 203 insertions(+), 87 deletions(-) diff --git a/src/main.rs b/src/main.rs index 453b129..ef7f69c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,8 +2,9 @@ use anyhow::Result; use clap::{Parser, Subcommand}; use froggr::modules::namespace::BindMode; use froggr::modules::session::SessionManager; -use log::{error, info}; +use log::{debug, error, info}; use std::path::PathBuf; +use env_logger; #[derive(Parser)] #[command(author, version, about, long_about = None)] @@ -65,7 +66,17 @@ enum Commands { #[tokio::main] async fn main() -> Result<()> { + // Initialize the logger at the start of main + env_logger::init(); + + info!("Froggr starting up"); + let cli = Cli::parse(); + + if cli.verbose { + debug!("Verbose mode enabled"); + } + let session_manager = SessionManager::new()?; match &cli.command { @@ -93,13 +104,17 @@ async fn main() -> Result<()> { std::thread::sleep(std::time::Duration::from_secs(1)); } Commands::Mount { source, mount_point, node_id } => { - info!("Starting mount operation"); + info!("Starting mount operation in process {}", std::process::id()); let session_manager = SessionManager::new()?; + info!("Created session manager"); + let session_id = session_manager.create_session(mount_point.clone())?; + info!("Created session: {}", session_id); println!("Created new session: {}", session_id); - if let Some(session_info) = session_manager.get_session(&session_id)? { - info!("Found session with PID {}", session_info.pid); + if let Some(session) = session_manager.get_session(&session_id)? { + info!("Found session with PID {}", session.pid); + info!("Sending mount command..."); session_manager.send_mount_command( &session_id, source.clone(), @@ -107,15 +122,13 @@ async fn main() -> Result<()> { node_id.clone() )?; info!("Mount command sent to session"); - - std::thread::sleep(std::time::Duration::from_secs(1)); - - if let Some(updated_info) = session_manager.get_session(&session_id)? { - info!("Current mounts: {:?}", updated_info.mounts); - } } else { error!("No session found for mount operation"); } + + info!("Waiting for mount operation to complete"); + std::thread::sleep(std::time::Duration::from_secs(1)); + info!("Mount operation completed"); } Commands::Session { list, kill, purge, session_id } => { if *list { diff --git a/src/modules/mount.rs b/src/modules/mount.rs index 7919e2a..8326f30 100644 --- a/src/modules/mount.rs +++ b/src/modules/mount.rs @@ -9,7 +9,7 @@ use super::constants::BLOCK_SIZE; use super::namespace::{BindMode, NamespaceEntry}; use super::proto::{BoundEntry, NineP}; use anyhow::{anyhow, Result}; -use fuser::{FileAttr, FileType}; +use fuser::{FileAttr, FileType, MountOption}; use std::collections::HashMap; use std::collections::VecDeque; use std::ffi::CString; @@ -17,10 +17,11 @@ use std::ffi::OsString; use std::fs; use std::path::Path; use std::time::UNIX_EPOCH; -use log::{info, debug, warn}; +use log::{info, debug, warn, error}; use std::cell::RefCell; use std::sync::Arc; use crate::session::Session; +use std::thread; #[cfg(target_os = "macos")] extern "C" { @@ -38,7 +39,7 @@ extern "C" { } /// Manages filesystem mounting and binding operations. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct FilesystemManager { /// The underlying 9P filesystem implementation. pub fs: NineP, @@ -335,47 +336,65 @@ impl FilesystemManager { pub fn mount(&self, source: &Path, target: &Path, node_id: &str) -> Result<()> { info!("Mounting {} to {} for node {}", source.display(), target.display(), node_id); - // Resolve paths + // Check if we have a current session + debug!("Checking current session..."); + if let Some(session) = self.get_session() { + info!("Found existing session: {:?}", session); + } else { + warn!("No current session found before mount"); + } + + debug!("Checking paths..."); let abs_source = fs::canonicalize(source)?; let abs_target = fs::canonicalize(target)?; + info!("Resolved paths - source: {:?}, target: {:?}", abs_source, abs_target); // Verify paths exist if !abs_source.exists() { + error!("Source path does not exist: {:?}", abs_source); return Err(anyhow!("Source path does not exist: {:?}", abs_source)); } if !abs_target.exists() { + error!("Target path does not exist: {:?}", abs_target); return Err(anyhow!("Target path does not exist: {:?}", abs_target)); } - // Create mount entry + thread::sleep(std::time::Duration::from_millis(100)); + + // Update namespace first + debug!("Updating namespace..."); let entry = NamespaceEntry { source: abs_source.clone(), target: abs_target.clone(), - bind_mode: BindMode::Before, // Default mode for mounts + bind_mode: BindMode::Before, remote_node: Some(node_id.to_string()), }; // Update namespace + debug!("Updating namespace..."); let mut namespace = self.fs.namespace_manager.namespace.write().unwrap(); namespace .entry(abs_target.clone()) .or_insert_with(Vec::new) .push(entry); - - // Perform the mount operation - self.mount_directory(abs_target.to_str().unwrap(), &abs_source)?; + + // Update bindings + self.update_bindings(abs_target.to_str().unwrap(), &abs_source)?; // Notify session of successful mount info!("Mount operation successful, notifying session"); - if let Some(session) = FilesystemManager::get_current_session() { + if let Some(session) = self.get_session() { info!("Found current session, sending notification"); - session.notify_mount_success(source.to_path_buf(), target.to_path_buf())?; - info!("Mount success notification sent"); + match session.notify_mount_success(source.to_path_buf(), target.to_path_buf()) { + Ok(_) => info!("Mount success notification sent"), + Err(e) => warn!("Failed to send mount notification: {:?}", e), + } } else { warn!("No current session found for mount notification"); } + info!("Mount operation completed successfully"); Ok(()) } @@ -503,6 +522,38 @@ impl FilesystemManager { } Ok(()) } + + fn update_bindings(&self, dir_path: &str, source_path: &Path) -> Result<()> { + debug!("Updating bindings for: {} from source: {:?}", dir_path, source_path); + + let mut bindings = self.fs.namespace_manager.bindings.lock().unwrap(); + let mut next_inode = self.fs.namespace_manager.next_inode.lock().unwrap(); + + // Convert paths to absolute paths + let abs_source = fs::canonicalize(source_path)?; + let abs_target = fs::canonicalize(Path::new(dir_path))?; + + // Clear existing bindings but keep root + bindings.retain(|&ino, _| ino == 1); + + // Read source directory recursively + self.read_directory_entries_recursive( + &abs_source, + &abs_source, + 1, + &mut next_inode, + &mut bindings, + )?; + + info!("Final bindings: {:?}", bindings.keys().collect::>()); + for (inode, (name, entry)) in bindings.iter() { + debug!( + "inode: {}, name: {:?}, kind: {:?}", + inode, name, entry.attr.kind + ); + } + Ok(()) + } } #[cfg(test)] diff --git a/src/modules/session.rs b/src/modules/session.rs index c769626..2c82350 100644 --- a/src/modules/session.rs +++ b/src/modules/session.rs @@ -8,7 +8,7 @@ use crate::FilesystemManager; use anyhow::Result; -use log::{error, info}; +use log::{error, info, warn}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; @@ -70,16 +70,20 @@ impl SessionManager { /// * `Ok(String)` - Session ID of the created session /// * `Err` if session creation fails pub fn create_session(&self, root: PathBuf) -> Result { + info!("Creating new session for root: {}", root.display()); + // First, check if there's an existing session for this root + info!("Checking for existing sessions..."); let existing_sessions = self.list_sessions()?; for session in existing_sessions { if session.root == root { - // Verify the session is still active by checking the process + info!("Found existing session {} for root {}", session.id, root.display()); + // Verify the session is still active if let Ok(_) = signal::kill(Pid::from_raw(session.pid), Signal::SIGCONT) { - info!("Reusing existing session {} for root {}", session.id, root.display()); + info!("Reusing existing session {}", session.id); return Ok(session.id); } else { - // Session is dead, remove its file and continue to create new session + info!("Existing session is dead, removing it"); let session_file = self.sessions_dir.join(&session.id); if let Err(e) = fs::remove_file(session_file) { error!("Failed to remove dead session file: {}", e); @@ -88,11 +92,17 @@ impl SessionManager { } } - // If no existing valid session found, create a new one + info!("No existing session found, creating new one"); let session_id = Uuid::new_v4().to_string(); + info!("Generated new session ID: {}", session_id); + + info!("Attempting to fork process..."); + let fork_result = unsafe { fork() }; + info!("Fork completed"); - match unsafe { fork() } { + match fork_result { Ok(ForkResult::Parent { child }) => { + info!("In parent process. Child PID: {}", child); let session_info = SessionInfo { id: session_id.clone(), pid: child.as_raw(), @@ -102,42 +112,37 @@ impl SessionManager { }; let session_file = self.sessions_dir.join(&session_id); - fs::write(&session_file, serde_json::to_string(&session_info)?)?; - - // Wait a moment for the child process to set up - std::thread::sleep(std::time::Duration::from_millis(100)); + info!("Saving session info to: {}", session_file.display()); + match fs::write(&session_file, serde_json::to_string(&session_info)?) { + Ok(_) => info!("Session info saved successfully"), + Err(e) => error!("Failed to save session info: {}", e), + } - info!("Created new session: {} for root {}", session_id, root.display()); + info!("Parent process completed successfully"); Ok(session_id) } Ok(ForkResult::Child) => { - let runtime = tokio::runtime::Runtime::new()?; - let session = Session::new(root.clone(), session_id.clone())?; - let session_clone = session.clone(); - - FilesystemManager::set_current_session(session.clone()); - - info!("Session stored in thread-local storage"); - + info!("In child process"); + // Create the pipe immediately let pipe_path = self.sessions_dir.join(format!("{}.pipe", session_id)); + info!("Creating pipe at: {}", pipe_path.display()); if !pipe_path.exists() { - nix::unistd::mkfifo(&pipe_path, nix::sys::stat::Mode::S_IRWXU)?; + match nix::unistd::mkfifo(&pipe_path, nix::sys::stat::Mode::S_IRWXU) { + Ok(_) => info!("Pipe created successfully"), + Err(e) => error!("Failed to create pipe: {}", e), + } } - std::thread::spawn(move || { - runtime.block_on(async { - if let Err(e) = session_clone.run().await { - error!("Session error: {}", e); - } - }); - }); - + info!("Child process entering main loop"); loop { std::thread::sleep(std::time::Duration::from_secs(1)); } } - Err(e) => Err(anyhow::anyhow!("Fork failed: {}", e)), + Err(e) => { + error!("Fork failed with error: {}", e); + Err(anyhow::anyhow!("Fork failed: {}", e)) + } } } @@ -147,15 +152,43 @@ impl SessionManager { /// * `Ok(Vec)` - Information about all active sessions /// * `Err` if reading session information fails pub fn list_sessions(&self) -> Result> { + info!("Starting to list sessions from: {}", self.sessions_dir.display()); let mut sessions = Vec::new(); - for entry in fs::read_dir(&self.sessions_dir)? { - let entry = entry?; - if let Ok(content) = fs::read_to_string(entry.path()) { - if let Ok(info) = serde_json::from_str(&content) { - sessions.push(info); + + match fs::read_dir(&self.sessions_dir) { + Ok(entries) => { + info!("Successfully read sessions directory"); + for entry_result in entries { + match entry_result { + Ok(entry) => { + info!("Processing entry: {:?}", entry.path()); + if entry.path().extension().map_or(false, |ext| ext == "json") { + match fs::read_to_string(entry.path()) { + Ok(content) => { + info!("Read session file content"); + match serde_json::from_str(&content) { + Ok(info) => { + info!("Successfully parsed session info"); + sessions.push(info); + } + Err(e) => error!("Failed to parse session info: {}", e), + } + } + Err(e) => error!("Failed to read session file: {}", e), + } + } + } + Err(e) => error!("Failed to process directory entry: {}", e), + } } } + Err(e) => { + error!("Failed to read sessions directory: {}", e); + return Err(anyhow::anyhow!("Failed to read sessions directory: {}", e)); + } } + + info!("Found {} sessions", sessions.len()); Ok(sessions) } @@ -307,23 +340,45 @@ impl SessionManager { /// * `Err` if the session doesn't exist or the command couldn't be sent pub fn send_mount_command(&self, session_id: &str, source: PathBuf, target: PathBuf, node_id: String) -> Result<()> { info!("Sending mount command to session {}", session_id); - if let Some(session) = self.get_session(session_id)? { - // Create a named pipe or socket for IPC - let pipe_path = self.sessions_dir.join(format!("{}.pipe", session_id)); - if !pipe_path.exists() { - nix::unistd::mkfifo(&pipe_path, nix::sys::stat::Mode::S_IRWXU)?; - } + if let Some(active_session) = self.get_active_session(session_id)? { + // Fork before mounting + match unsafe { fork() }? { + ForkResult::Parent { child } => { + info!("Started mount process with PID: {}", child); + + // Continue with sending the command through the pipe + let pipe_path = self.sessions_dir.join(format!("{}.pipe", session_id)); + if !pipe_path.exists() { + nix::unistd::mkfifo(&pipe_path, nix::sys::stat::Mode::S_IRWXU)?; + } - // Write the mount command to the pipe - let command = SessionCommand::Mount { - source, - target, - node_id, - }; - let command_str = serde_json::to_string(&command)?; - fs::write(&pipe_path, command_str)?; - info!("Mount command sent through pipe"); - Ok(()) + let command = SessionCommand::Mount { + source, + target, + node_id, + }; + let command_str = serde_json::to_string(&command)?; + + let mut file = std::fs::OpenOptions::new() + .write(true) + .open(&pipe_path)?; + + use std::io::Write; + file.write_all(command_str.as_bytes())?; + + info!("Mount command sent through pipe"); + Ok(()) + } + ForkResult::Child => { + // Child process handles the FUSE mount + let fs_manager = active_session.fs_manager.clone(); + if let Err(e) = fs_manager.mount(&source, &target, &node_id) { + error!("Mount failed in child process: {}", e); + std::process::exit(1); + } + std::process::exit(0); + } + } } else { Err(anyhow::anyhow!("Session not found")) } @@ -384,6 +439,7 @@ enum SessionMessage { /// # Ok(()) /// # } /// ``` +#[derive(Debug)] pub struct Session { /// The filesystem manager instance pub fs_manager: FilesystemManager, @@ -684,9 +740,18 @@ impl Session { /// * `Ok(())` if shutdown was successful /// * `Err` if there was an error during shutdown pub fn shutdown(&self) -> Result<()> { - info!("Shutting down session"); - self.message_tx.send(SessionMessage::Shutdown)?; + info!("Shutting down session..."); self.is_running.store(false, Ordering::SeqCst); + + // Send shutdown message + self.message_tx.send(SessionMessage::Shutdown)?; + + // Clean up session file + let session_file = format!("/tmp/froggr/sessions/{}", self.state.read().id); + if let Err(e) = fs::remove_file(&session_file) { + warn!("Failed to remove session file: {}", e); + } + Ok(()) } @@ -856,19 +921,6 @@ impl Session { } } -/// Implements cleanup on drop. -/// -/// When a Session is dropped, it ensures the message processing thread -/// is properly shut down. -impl Drop for Session { - fn drop(&mut self) { - self.is_running.store(false, Ordering::SeqCst); - if let Err(e) = self.message_tx.send(SessionMessage::Shutdown) { - error!("Error sending shutdown message: {}", e); - } - } -} - #[derive(Debug, Clone, Default)] struct SessionState { id: String,