From 452927a75f628e49a9d1c405b88a6a24aa01be12 Mon Sep 17 00:00:00 2001 From: Ziy1-Tan Date: Sun, 8 Sep 2024 16:43:41 +0800 Subject: [PATCH] support task & sandboxer instrument Signed-off-by: Ziy1-Tan --- vmm/common/src/tracer.rs | 24 +++---------- vmm/sandbox/config_clh.toml | 2 ++ vmm/sandbox/config_stratovirt_aarch64.toml | 3 +- vmm/sandbox/config_stratovirt_x86_64.toml | 1 + vmm/sandbox/src/bin/cloud_hypervisor/main.rs | 37 ++++++++------------ vmm/sandbox/src/bin/qemu/main.rs | 31 +++++----------- vmm/sandbox/src/bin/stratovirt/main.rs | 33 +++++------------ vmm/sandbox/src/cloud_hypervisor/mod.rs | 13 ++++++- vmm/sandbox/src/sandbox.rs | 31 +++++++++++++++- vmm/task/src/container.rs | 19 +++++++++- vmm/task/src/main.rs | 29 +++++++-------- 11 files changed, 117 insertions(+), 106 deletions(-) diff --git a/vmm/common/src/tracer.rs b/vmm/common/src/tracer.rs index 812d6c3d..c76fb634 100644 --- a/vmm/common/src/tracer.rs +++ b/vmm/common/src/tracer.rs @@ -14,27 +14,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -use std::str::FromStr; - use opentelemetry::sdk::trace::Tracer; use opentelemetry::sdk::{trace, Resource}; -use opentelemetry_otlp::WithExportConfig; -use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; -const DEFAULT_JAEGER_ENDPOINT: &str = "http://localhost:14268/api/traces"; - -pub fn create_otlp_tracer( - otlp_service_name: &str, - otlp_endpoint: Option, -) -> anyhow::Result { +pub fn init_otlp_tracer(otlp_service_name: &str) -> anyhow::Result { let tracer = opentelemetry_otlp::new_pipeline() .tracing() - .with_exporter( - opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint(otlp_endpoint.unwrap_or(DEFAULT_JAEGER_ENDPOINT.to_string())), - ) + .with_exporter(opentelemetry_otlp::new_exporter().tonic()) .with_trace_config(trace::config().with_resource(Resource::new(vec![ opentelemetry::KeyValue::new("service.name", otlp_service_name.to_string()), ]))) @@ -43,10 +30,9 @@ pub fn create_otlp_tracer( Ok(tracer) } -pub fn create_logger_filter(level: &str) -> anyhow::Result { - let log_level = LevelFilter::from_str(&level)?; +pub fn init_logger_filter(log_level: &str) -> anyhow::Result { let filter = EnvFilter::from_default_env() - .add_directive(format!("containerd_sandbox={:?}", log_level).parse()?) - .add_directive(format!("vmm_sandboxer={:?}", log_level).parse()?); + .add_directive(format!("containerd_sandbox={}", log_level).parse()?) + .add_directive(format!("vmm_sandboxer={}", log_level).parse()?); Ok(filter) } diff --git a/vmm/sandbox/config_clh.toml b/vmm/sandbox/config_clh.toml index 5e0d3ebc..ea6c7382 100644 --- a/vmm/sandbox/config_clh.toml +++ b/vmm/sandbox/config_clh.toml @@ -1,5 +1,6 @@ [sandbox] log_level = "info" +enable_tracing = false [hypervisor] path = "/usr/local/bin/cloud-hypervisor" @@ -15,6 +16,7 @@ debug = false [hypervisor.task] debug = false +enable_tracing = false [hypervisor.virtiofsd] path = "/usr/local/bin/virtiofsd" diff --git a/vmm/sandbox/config_stratovirt_aarch64.toml b/vmm/sandbox/config_stratovirt_aarch64.toml index 75878fc5..da4aa2db 100644 --- a/vmm/sandbox/config_stratovirt_aarch64.toml +++ b/vmm/sandbox/config_stratovirt_aarch64.toml @@ -1,5 +1,6 @@ [sandbox] log_level = "info" +enable_tracing = false [hypervisor] path = "/usr/bin/stratovirt" @@ -15,4 +16,4 @@ debug = true enable_mem_prealloc = false [hypervisor.virtiofsd_conf] -path = "/usr/bin/vhost_user_fs" \ No newline at end of file +path = "/usr/bin/vhost_user_fs" diff --git a/vmm/sandbox/config_stratovirt_x86_64.toml b/vmm/sandbox/config_stratovirt_x86_64.toml index ab592328..17e0f442 100644 --- a/vmm/sandbox/config_stratovirt_x86_64.toml +++ b/vmm/sandbox/config_stratovirt_x86_64.toml @@ -1,5 +1,6 @@ [sandbox] log_level = "info" +enable_tracing = false [hypervisor] path = "/usr/bin/stratovirt" diff --git a/vmm/sandbox/src/bin/cloud_hypervisor/main.rs b/vmm/sandbox/src/bin/cloud_hypervisor/main.rs index a50d1f01..193ba21e 100644 --- a/vmm/sandbox/src/bin/cloud_hypervisor/main.rs +++ b/vmm/sandbox/src/bin/cloud_hypervisor/main.rs @@ -16,11 +16,10 @@ limitations under the License. use clap::Parser; use opentelemetry::global; -use tracing::{error, info_span}; -use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::util::SubscriberInitExt; +use tracing::{info, info_span}; use tracing_subscriber::Layer; -use vmm_common::tracer::{create_logger_filter, create_otlp_tracer}; +use tracing_subscriber::{layer::SubscriberExt, Registry}; +use vmm_common::tracer::{init_logger_filter, init_otlp_tracer}; use vmm_sandboxer::{ args, cloud_hypervisor::{factory::CloudHypervisorVMFactory, hooks::CloudHypervisorHooks}, @@ -40,31 +39,19 @@ async fn main() { let config = Config::load_config(&args.config).await.unwrap(); // Update args log level if it not presents args but in config. - let env_filter = - match create_logger_filter(&args.log_level.unwrap_or(config.sandbox.log_level())) { - Ok(filter) => filter, - Err(e) => { - error!("failed to init logger filter: {:?}", e); - return; - } - }; + let env_filter = init_logger_filter(&args.log_level.unwrap_or(config.sandbox.log_level())) + .expect("failed to init logger filter"); let mut layers = vec![tracing_subscriber::fmt::layer().boxed()]; if config.sandbox.enable_tracing { - let tracer = match create_otlp_tracer("kuasar-vmm-sandboxer-clh-tracing-service", None) { - Ok(tracer) => tracer, - Err(e) => { - error!("failed to init otlp tracer: {:?}", e); - return; - } - }; + let tracer = init_otlp_tracer("kuasar-vmm-sandboxer-clh-tracing-service") + .expect("failed to init otlp tracer"); + layers.push(tracing_opentelemetry::layer().with_tracer(tracer).boxed()); } - tracing_subscriber::registry() - .with(env_filter) - .with(layers) - .init(); + let subscriber = Registry::default().with(env_filter).with(layers); + tracing::subscriber::set_global_default(subscriber).expect("unable to set global subscriber"); let root_span = info_span!("kuasar-vmm-sandboxer-clh-root").entered(); @@ -78,6 +65,8 @@ async fn main() { // Do recovery job sandboxer.recover(&args.dir).await; + info!("Kuasar vmm sandboxer clh is started"); + // Run the sandboxer containerd_sandbox::run( "kuasar-vmm-sandboxer-clh", @@ -88,6 +77,8 @@ async fn main() { .await .unwrap(); + info!("Kuasar vmm sandboxer clh is exited"); + root_span.exit(); global::shutdown_tracer_provider(); } diff --git a/vmm/sandbox/src/bin/qemu/main.rs b/vmm/sandbox/src/bin/qemu/main.rs index 388764b4..7d6c2299 100644 --- a/vmm/sandbox/src/bin/qemu/main.rs +++ b/vmm/sandbox/src/bin/qemu/main.rs @@ -16,11 +16,10 @@ limitations under the License. use clap::Parser; use opentelemetry::global; -use tracing::{error, info_span}; -use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::util::SubscriberInitExt; +use tracing::info_span; use tracing_subscriber::Layer; -use vmm_common::tracer::{create_logger_filter, create_otlp_tracer}; +use tracing_subscriber::{layer::SubscriberExt, Registry}; +use vmm_common::tracer::{init_logger_filter, init_otlp_tracer}; use vmm_sandboxer::{ args, config::Config, @@ -57,30 +56,18 @@ async fn main() { }; // Initialize log filter - let env_filter = match create_logger_filter(&config.sandbox.log_level()) { - Ok(filter) => filter, - Err(e) => { - error!("failed to init logger filter: {:?}", e); - return; - } - }; + let env_filter = + init_logger_filter(&config.sandbox.log_level()).expect("failed to init logger filter"); let mut layers = vec![tracing_subscriber::fmt::layer().boxed()]; if config.sandbox.enable_tracing { - let tracer = match create_otlp_tracer("kuasar-vmm-sandboxer-qemu-tracing-service", None) { - Ok(tracer) => tracer, - Err(e) => { - error!("failed to init otlp tracer: {:?}", e); - return; - } - }; + let tracer = init_otlp_tracer("kuasar-vmm-sandboxer-qemu-tracing-service") + .expect("failed to init otlp tracer"); layers.push(tracing_opentelemetry::layer().with_tracer(tracer).boxed()); } - tracing_subscriber::registry() - .with(env_filter) - .with(layers) - .init(); + let subscriber = Registry::default().with(env_filter).with(layers); + tracing::subscriber::set_global_default(subscriber).expect("unable to set global subscriber"); let root_span = info_span!("kuasar-vmm-sandboxer-qemu-root").entered(); diff --git a/vmm/sandbox/src/bin/stratovirt/main.rs b/vmm/sandbox/src/bin/stratovirt/main.rs index 35ea55fb..e65315c7 100644 --- a/vmm/sandbox/src/bin/stratovirt/main.rs +++ b/vmm/sandbox/src/bin/stratovirt/main.rs @@ -16,11 +16,10 @@ limitations under the License. use clap::Parser; use opentelemetry::global; -use tracing::{error, info_span}; -use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::util::SubscriberInitExt; +use tracing::info_span; +use tracing_subscriber::{layer::SubscriberExt, Registry}; use tracing_subscriber::Layer; -use vmm_common::tracer::{create_logger_filter, create_otlp_tracer}; +use vmm_common::tracer::{init_logger_filter, init_otlp_tracer}; use vmm_sandboxer::{ args, config::Config, @@ -42,32 +41,18 @@ async fn main() { let config: Config = Config::load_config(&args.config).await.unwrap(); // Update args log level if it not presents args but in config. - let env_filter = - match create_logger_filter(&args.log_level.unwrap_or(config.sandbox.log_level())) { - Ok(filter) => filter, - Err(e) => { - error!("failed to init logger filter: {:?}", e); - return; - } - }; + let env_filter = init_logger_filter(&args.log_level.unwrap_or(config.sandbox.log_level())) + .expect("failed to init logger filter"); let mut layers = vec![tracing_subscriber::fmt::layer().boxed()]; if config.sandbox.enable_tracing { - let tracer = match create_otlp_tracer("kuasar-vmm-sandboxer-stratovirt-otlp-service", None) - { - Ok(tracer) => tracer, - Err(e) => { - error!("failed to init otlp tracer: {:?}", e); - return; - } - }; + let tracer = init_otlp_tracer("kuasar-vmm-sandboxer-stratovirt-otlp-service") + .expect("failed to init otlp tracer"); layers.push(tracing_opentelemetry::layer().with_tracer(tracer).boxed()); } - tracing_subscriber::registry() - .with(env_filter) - .with(layers) - .init(); + let subscriber = Registry::default().with(env_filter).with(layers); + tracing::subscriber::set_global_default(subscriber).expect("unable to set global subscriber"); let root_span = info_span!("kuasar-vmm-sandboxer-stratovirt-root").entered(); diff --git a/vmm/sandbox/src/cloud_hypervisor/mod.rs b/vmm/sandbox/src/cloud_hypervisor/mod.rs index b1a9540c..57285ae5 100644 --- a/vmm/sandbox/src/cloud_hypervisor/mod.rs +++ b/vmm/sandbox/src/cloud_hypervisor/mod.rs @@ -28,7 +28,7 @@ use tokio::{ sync::watch::{channel, Receiver, Sender}, task::JoinHandle, }; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, instrument, warn}; use vmm_common::SHARED_DIR_SUFFIX; use crate::{ @@ -156,6 +156,7 @@ impl CloudHypervisorVM { #[async_trait] impl VM for CloudHypervisorVM { + #[instrument(skip(self))] async fn start(&mut self) -> Result { create_dir_all(&self.base_dir).await?; let virtiofsd_pid = self.start_virtiofsd().await?; @@ -209,6 +210,7 @@ impl VM for CloudHypervisorVM { Ok(pid.unwrap_or_default()) } + #[instrument(skip(self))] async fn stop(&mut self, force: bool) -> Result<()> { let signal = if force { signal::SIGKILL @@ -240,6 +242,7 @@ impl VM for CloudHypervisorVM { Ok(()) } + #[instrument(skip(self))] async fn attach(&mut self, device_info: DeviceInfo) -> Result<()> { match device_info { DeviceInfo::Block(blk_info) => { @@ -274,31 +277,37 @@ impl VM for CloudHypervisorVM { Ok(()) } + #[instrument(skip(self))] async fn hot_attach(&mut self, device_info: DeviceInfo) -> Result<(BusType, String)> { let client = self.get_client()?; let addr = client.hot_attach(device_info)?; Ok((BusType::PCI, addr)) } + #[instrument(skip(self))] async fn hot_detach(&mut self, id: &str) -> Result<()> { let client = self.get_client()?; client.hot_detach(id)?; Ok(()) } + #[instrument(skip(self))] async fn ping(&self) -> Result<()> { // TODO Ok(()) } + #[instrument(skip(self))] fn socket_address(&self) -> String { self.agent_socket.to_string() } + #[instrument(skip(self))] async fn wait_channel(&self) -> Option> { self.wait_chan.clone() } + #[instrument(skip(self))] async fn vcpus(&self) -> Result { // Refer to https://github.com/firecracker-microvm/firecracker/issues/718 Ok(VcpuThreads { @@ -320,6 +329,7 @@ impl VM for CloudHypervisorVM { }) } + #[instrument(skip(self))] fn pids(&self) -> Pids { self.pids.clone() } @@ -327,6 +337,7 @@ impl VM for CloudHypervisorVM { #[async_trait] impl crate::vm::Recoverable for CloudHypervisorVM { + #[instrument(skip(self))] async fn recover(&mut self) -> Result<()> { self.client = Some(self.create_client().await?); let pid = self.pid()?; diff --git a/vmm/sandbox/src/sandbox.rs b/vmm/sandbox/src/sandbox.rs index 28d5dc73..97d22eea 100644 --- a/vmm/sandbox/src/sandbox.rs +++ b/vmm/sandbox/src/sandbox.rs @@ -32,7 +32,7 @@ use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, sync::{Mutex, RwLock}, }; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, instrument, warn}; use vmm_common::{ api::sandbox_ttrpc::SandboxServiceClient, storage::Storage, ETC_HOSTS, ETC_RESOLV, HOSTNAME_FILENAME, HOSTS_FILENAME, RESOLV_FILENAME, SHARED_DIR_SUFFIX, @@ -83,6 +83,7 @@ where H: Hooks, F::VM: VM + DeserializeOwned + Recoverable + Sync + Send + 'static, { + #[instrument(skip(self))] pub async fn recover(&mut self, dir: &str) { let mut subs = match tokio::fs::read_dir(dir).await { Ok(subs) => subs, @@ -153,6 +154,7 @@ where { type Sandbox = KuasarSandbox; + #[instrument(skip(self))] async fn create(&self, id: &str, s: SandboxOption) -> Result<()> { if self.sandboxes.read().await.get(id).is_some() { return Err(Error::AlreadyExist("sandbox".to_string())); @@ -203,6 +205,7 @@ where Ok(()) } + #[instrument(skip(self))] async fn start(&self, id: &str) -> Result<()> { let sandbox_mutex = self.sandbox(id).await?; let mut sandbox = sandbox_mutex.lock().await; @@ -251,6 +254,7 @@ where Ok(()) } + #[instrument(skip(self))] async fn sandbox(&self, id: &str) -> Result>> { Ok(self .sandboxes @@ -261,6 +265,7 @@ where .clone()) } + #[instrument(skip(self))] async fn stop(&self, id: &str, force: bool) -> Result<()> { let sandbox_mutex = self.sandbox(id).await?; let mut sandbox = sandbox_mutex.lock().await; @@ -271,6 +276,7 @@ where Ok(()) } + #[instrument(skip(self))] async fn delete(&self, id: &str) -> Result<()> { let sb_clone = self.sandboxes.read().await.clone(); if let Some(sb_mutex) = sb_clone.get(id) { @@ -303,14 +309,17 @@ where { type Container = KuasarContainer; + #[instrument(skip(self))] fn status(&self) -> Result { Ok(self.status.clone()) } + #[instrument(skip(self))] async fn ping(&self) -> Result<()> { self.vm.ping().await } + #[instrument(skip(self))] async fn container(&self, id: &str) -> Result<&Self::Container> { let container = self .containers @@ -319,6 +328,7 @@ where Ok(container) } + #[instrument(skip(self))] async fn append_container(&mut self, id: &str, options: ContainerOption) -> Result<()> { let handler_chain = self.container_append_handlers(id, options)?; handler_chain.handle(self).await?; @@ -326,6 +336,7 @@ where Ok(()) } + #[instrument(skip(self))] async fn update_container(&mut self, id: &str, options: ContainerOption) -> Result<()> { let handler_chain = self.container_update_handlers(id, options).await?; handler_chain.handle(self).await?; @@ -333,6 +344,7 @@ where Ok(()) } + #[instrument(skip(self))] async fn remove_container(&mut self, id: &str) -> Result<()> { self.deference_container_storages(id).await?; @@ -356,10 +368,12 @@ where Ok(()) } + #[instrument(skip(self))] async fn exit_signal(&self) -> Result> { Ok(self.exit_signal.clone()) } + #[instrument(skip(self))] fn get_data(&self) -> Result { Ok(self.data.clone()) } @@ -369,6 +383,7 @@ impl KuasarSandbox where V: VM + Sync + Send, { + #[instrument(skip(self))] async fn dump(&self) -> Result<()> { let dump_data = serde_json::to_vec(&self).map_err(|e| anyhow!("failed to serialize sandbox, {}", e))?; @@ -392,6 +407,7 @@ impl KuasarSandbox where V: VM + DeserializeOwned + Recoverable + Sync + Send, { + #[instrument(skip(base_dir))] async fn recover>(base_dir: P) -> Result { let dump_path = base_dir.as_ref().join("sandbox.json"); let mut dump_file = OpenOptions::new() @@ -436,6 +452,8 @@ impl KuasarSandbox where V: VM + Sync + Send, { + + #[instrument(skip(self))] async fn start(&mut self) -> Result<()> { let pid = self.vm.start().await?; @@ -459,6 +477,7 @@ where Ok(()) } + #[instrument(skip(self))] async fn stop(&mut self, force: bool) -> Result<()> { match self.status { // If a sandbox is created: @@ -494,17 +513,20 @@ where Ok(()) } + #[instrument(skip(self))] pub(crate) fn container_mut(&mut self, id: &str) -> Result<&mut KuasarContainer> { self.containers .get_mut(id) .ok_or_else(|| Error::NotFound(format!("no container with id {}", id))) } + #[instrument(skip(self))] pub(crate) fn increment_and_get_id(&mut self) -> u32 { self.id_generator += 1; self.id_generator } + #[instrument(skip(self))] async fn init_client(&mut self) -> Result<()> { let mut client_guard = self.client.lock().await; if client_guard.is_none() { @@ -520,6 +542,7 @@ where Ok(()) } + #[instrument(skip(self))] pub(crate) async fn setup_network(&mut self) -> Result<()> { if let Some(network) = self.network.as_ref() { let client_guard = self.client.lock().await; @@ -531,6 +554,7 @@ where Ok(()) } + #[instrument(skip(self))] pub(crate) async fn sync_clock(&self) { let client_guard = self.client.lock().await; if let Some(client) = &*client_guard { @@ -538,6 +562,7 @@ where } } + #[instrument(skip(self))] async fn setup_sandbox_files(&self) -> Result<()> { let shared_path = self.get_sandbox_shared_path(); create_dir_all(&shared_path) @@ -587,10 +612,12 @@ where Ok(()) } + #[instrument(skip(self))] pub fn get_sandbox_shared_path(&self) -> String { format!("{}/{}", self.base_dir, SHARED_DIR_SUFFIX) } + #[instrument(skip(self))] pub async fn prepare_network(&mut self) -> Result<()> { // get vcpu for interface queue, at least one vcpu let mut vcpu = 1; @@ -613,6 +640,7 @@ where } // If a sandbox is still running, destroy network may hang with its running + #[instrument(skip(self))] pub async fn destroy_network(&mut self) { // Network should be destroyed only once, take it out here. if let Some(mut network) = self.network.take() { @@ -620,6 +648,7 @@ where } } + #[instrument(skip(self))] pub async fn add_to_cgroup(&self) -> Result<()> { // Currently only support cgroup V1, cgroup V2 is not supported now if !cgroups_rs::hierarchies::is_cgroup2_unified_mode() { diff --git a/vmm/task/src/container.rs b/vmm/task/src/container.rs index e604e09d..18cb44a1 100644 --- a/vmm/task/src/container.rs +++ b/vmm/task/src/container.rs @@ -52,7 +52,7 @@ use tokio::{ process::Command, sync::Mutex, }; -use tracing::{debug, error}; +use tracing::{debug, error, instrument}; use vmm_common::{mount::get_mount_type, storage::Storage, KUASAR_STATE_DIR}; use crate::{ @@ -108,6 +108,7 @@ pub struct Log { #[async_trait] impl ContainerFactory for KuasarFactory { + #[instrument(skip(self))] async fn create( &self, ns: &str, @@ -180,6 +181,7 @@ impl ContainerFactory for KuasarFactory { Ok(container) } + #[instrument(skip(self, c))] async fn cleanup(&self, _ns: &str, c: &KuasarContainer) -> containerd_shim::Result<()> { self.sandbox.lock().await.defer_storages(&c.id).await?; Ok(()) @@ -191,6 +193,7 @@ impl KuasarFactory { Self { sandbox } } + #[instrument(skip_all)] async fn do_create(&self, init: &mut InitProcess) -> Result<()> { let id = init.id.to_string(); let stdio = &init.stdio; @@ -348,6 +351,7 @@ impl ProcessLifecycle for KuasarInitLifecycle { Ok(()) } + #[instrument(skip_all)] async fn kill( &self, p: &mut InitProcess, @@ -370,6 +374,7 @@ impl ProcessLifecycle for KuasarInitLifecycle { Ok(()) } + #[instrument(skip_all)] async fn delete(&self, p: &mut InitProcess) -> containerd_shim::Result<()> { if let Err(e) = self .runtime @@ -390,6 +395,7 @@ impl ProcessLifecycle for KuasarInitLifecycle { } #[cfg(target_os = "linux")] + #[instrument(skip_all)] async fn update(&self, p: &mut InitProcess, resources: &LinuxResources) -> Result<()> { if p.pid <= 0 { return Err(other!( @@ -401,11 +407,13 @@ impl ProcessLifecycle for KuasarInitLifecycle { } #[cfg(not(target_os = "linux"))] + #[instrument(skip_all)] async fn update(&self, _p: &mut InitProcess, _resources: &LinuxResources) -> Result<()> { Err(Error::Unimplemented("update resource".to_string())) } #[cfg(target_os = "linux")] + #[instrument(skip_all)] async fn stats(&self, p: &InitProcess) -> Result { if p.pid <= 0 { return Err(other!( @@ -417,10 +425,12 @@ impl ProcessLifecycle for KuasarInitLifecycle { } #[cfg(not(target_os = "linux"))] + #[instrument(skip_all)] async fn stats(&self, _p: &InitProcess) -> Result { Err(Error::Unimplemented("process stats".to_string())) } + #[instrument(skip_all)] async fn ps(&self, p: &InitProcess) -> Result> { let pids = self .runtime @@ -438,6 +448,7 @@ impl ProcessLifecycle for KuasarInitLifecycle { } impl KuasarInitLifecycle { + #[instrument(skip_all)] pub fn new(runtime: Runc, opts: Options, bundle: &str) -> Self { let work_dir = Path::new(bundle).join("work"); let mut opts = opts; @@ -455,6 +466,7 @@ impl KuasarInitLifecycle { #[async_trait] impl ProcessLifecycle for KuasarExecLifecycle { + #[instrument(skip_all)] async fn start(&self, p: &mut ExecProcess) -> containerd_shim::Result<()> { rescan_pci_bus().await?; let bundle = self.bundle.to_string(); @@ -492,6 +504,7 @@ impl ProcessLifecycle for KuasarExecLifecycle { Ok(()) } + #[instrument(skip_all)] async fn kill( &self, p: &mut ExecProcess, @@ -514,6 +527,7 @@ impl ProcessLifecycle for KuasarExecLifecycle { } } + #[instrument(skip_all)] async fn delete(&self, p: &mut ExecProcess) -> Result<()> { self.exit_signal.signal(); let exec_pid_path = Path::new(self.bundle.as_str()).join(format!("{}.pid", p.id)); @@ -521,14 +535,17 @@ impl ProcessLifecycle for KuasarExecLifecycle { Ok(()) } + #[instrument(skip_all)] async fn update(&self, _p: &mut ExecProcess, _resources: &LinuxResources) -> Result<()> { Err(Error::Unimplemented("exec update".to_string())) } + #[instrument(skip_all)] async fn stats(&self, _p: &ExecProcess) -> Result { Err(Error::Unimplemented("exec stats".to_string())) } + #[instrument(skip_all)] async fn ps(&self, _p: &ExecProcess) -> Result> { Err(Error::Unimplemented("exec ps".to_string())) } diff --git a/vmm/task/src/main.rs b/vmm/task/src/main.rs index 946c3b92..0094e307 100644 --- a/vmm/task/src/main.rs +++ b/vmm/task/src/main.rs @@ -37,17 +37,19 @@ use nix::{ }; use opentelemetry::global; use signal_hook_tokio::Signals; -use std::{collections::HashMap, convert::TryFrom, path::Path, process::exit, sync::Arc, thread}; +use std::{ + collections::HashMap, convert::TryFrom, fmt::Debug, path::Path, process::exit, sync::Arc, + thread, +}; use tokio::fs::File; use tracing::{debug, error, info, info_span, warn}; -use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::Layer; use tracing_subscriber::{self, EnvFilter}; +use tracing_subscriber::{layer::SubscriberExt, Registry}; use vmm_common::{ - api::sandbox_ttrpc::create_sandbox_service, mount::mount, tracer::create_otlp_tracer, - ETC_RESOLV, HOSTNAME_FILENAME, IPC_NAMESPACE, KUASAR_STATE_DIR, RESOLV_FILENAME, - SANDBOX_NS_PATH, UTS_NAMESPACE, + api::sandbox_ttrpc::create_sandbox_service, mount::mount, tracer::init_otlp_tracer, ETC_RESOLV, + HOSTNAME_FILENAME, IPC_NAMESPACE, KUASAR_STATE_DIR, RESOLV_FILENAME, SANDBOX_NS_PATH, + UTS_NAMESPACE, }; use crate::{ @@ -148,6 +150,7 @@ async fn start_task_server() -> anyhow::Result<()> { let config = TaskConfig::new().await?; init_logger(&config.log_level, config.enable_tracing)?; + info!("Task server start with config: {:?}", config); match &*config.sharefs_type { @@ -175,21 +178,19 @@ async fn start_task_server() -> anyhow::Result<()> { Ok(()) } -fn init_logger(level: &str, enable_tracing: bool) -> anyhow::Result<()> { +fn init_logger(log_level: &str, enable_tracing: bool) -> anyhow::Result<()> { let env_filter = EnvFilter::from_default_env() - .add_directive(format!("containerd_shim={:?}", level).parse()?) - .add_directive(format!("vmm_task={:?}", level).parse()?); + .add_directive(format!("containerd_shim={:?}", log_level).parse()?) + .add_directive(format!("vmm_task={:?}", log_level).parse()?); let mut layers = vec![tracing_subscriber::fmt::layer().boxed()]; if enable_tracing { - let tracer = create_otlp_tracer("kuasar-vmm-task-otlp-service", None)?; + let tracer = init_otlp_tracer("kuasar-vmm-task-otlp-service")?; layers.push(tracing_opentelemetry::layer().with_tracer(tracer).boxed()); } - tracing_subscriber::registry() - .with(env_filter) - .with(layers) - .init(); + let subscriber = Registry::default().with(env_filter).with(layers); + tracing::subscriber::set_global_default(subscriber).expect("unable to set global subscriber"); Ok(()) }