Skip to content

Commit

Permalink
feat: gpu devices management (#18)
Browse files Browse the repository at this point in the history
Description

- Detect which devices work and which ones are not supported. Pass and
show that info to the user.
- add `use-devices` and `exclude-devices` params to turn on/off
specified devices, e.g
`xtrgpuminer --use-devices=1,3,4` which skips 2 device
 `--exclude-devices=2` which is another way of saying the same thing
  • Loading branch information
karczuRF authored Oct 1, 2024
1 parent 9ef81d5 commit 0faacf9
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ config.json
.DS_Store
.idea/
.vscode/*
!.vscode/extensions.json
!.vscode/extensions.json
4 changes: 3 additions & 1 deletion src/http/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use axum::{routing::get, Router};
use log::{error, info};
use tari_shutdown::ShutdownSignal;
use thiserror::Error;
use tokio::io;
Expand All @@ -13,7 +14,6 @@ use crate::{
stats_store::StatsStore,
};

use log::info;
const LOG_TARGET: &str = "tari::gpuminer::server";

/// An HTTP server that provides stats and other useful information.
Expand Down Expand Up @@ -55,6 +55,7 @@ impl HttpServer {

/// Starts the http server on the port passed in ['HttpServer::new']
pub async fn start(&self) -> Result<(), Error> {
info!(target: LOG_TARGET, "Http: starts the http server on the port {:?}", self.config.port);
let router = self.routes();
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", self.config.port))
.await
Expand All @@ -67,6 +68,7 @@ impl HttpServer {
.await
.map_err(Error::IO)?;
println!("HTTP server stopped!");
info!(target: LOG_TARGET, "HTTP server stopped!");
Ok(())
}
}
85 changes: 73 additions & 12 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ const LOG_TARGET: &str = "tari::gpuminer";
async fn main() {
match main_inner().await {
Ok(()) => {
info!(target: LOG_TARGET, "Starting gpu_miner successfully");
info!(target: LOG_TARGET, "Gpu miner startup process completed successfully");
std::process::exit(0);
},
Err(err) => {
error!(target: LOG_TARGET, "Gpu_miner error: {}", err);
error!(target: LOG_TARGET, "Gpu miner startup process error: {}", err);
std::process::exit(1);
},
}
Expand Down Expand Up @@ -130,14 +130,22 @@ struct Cli {
#[arg(short = 'd', long, alias = "detect")]
detect: Option<bool>,

/// (Optional) use only specific devices
#[arg(long, alias = "use-devices", num_args=0.., value_delimiter=',')]
use_devices: Option<Vec<u32>>,

/// (Optional) exclude specific devices from use
#[arg(long, alias = "exclude-devices", num_args=0.., value_delimiter=',')]
exclude_devices: Option<Vec<u32>>,

/// Gpu status file path
#[arg(short, long, value_name = "gpu-status")]
gpu_status_file: Option<PathBuf>,
}

async fn main_inner() -> Result<(), anyhow::Error> {
let cli = Cli::parse();

info!(target: LOG_TARGET, "Xtrgpuminer init");
if let Some(ref log_dir) = cli.log_dir {
tari_common::initialize_logging(
&log_dir.join("log4rs_config.yml"),
Expand Down Expand Up @@ -218,6 +226,9 @@ async fn main_inner() -> Result<(), anyhow::Error> {
tokio::spawn(async move {
if let Err(error) = http_server.start().await {
println!("Failed to start HTTP server: {error:?}");
error!(target: LOG_TARGET, "Failed to start HTTP server: {:?}", error);
} else {
info!(target: LOG_TARGET, "Success to start HTTP server");
}
});
}
Expand Down Expand Up @@ -286,18 +297,45 @@ async fn main_inner() -> Result<(), anyhow::Error> {
return Err(anyhow::anyhow!("No available gpu device detected"));
}

// create a list of devices (by index) to use
let devices_to_use: Vec<u32> = (0..num_devices)
.filter(|x| {
if let Some(use_devices) = &cli.use_devices {
use_devices.contains(x)
} else {
true
}
})
.filter(|x| {
if let Some(excluded_devices) = &cli.exclude_devices {
!excluded_devices.contains(x)
} else {
true
}
})
.collect();

info!(target: LOG_TARGET, "Device indexes to use: {:?} from the total number of devices: {:?}", devices_to_use, num_devices);

let mut threads = vec![];
for i in 0..num_devices {
let c = config.clone();
let gpu = gpu_engine.clone();
let curr_stats_store = stats_store.clone();
threads.push(thread::spawn(move || {
run_thread(gpu, num_devices as u64, i as u32, c, benchmark, curr_stats_store)
}));
if devices_to_use.contains(&i) {
let c = config.clone();
let gpu = gpu_engine.clone();
let curr_stats_store = stats_store.clone();
threads.push(thread::spawn(move || {
run_thread(gpu, num_devices as u64, i as u32, c, benchmark, curr_stats_store)
}));
}
}

// for t in threads {
// t.join().unwrap()?;
// }
for t in threads {
t.join().unwrap()?;
if let Err(err) = t.join() {
error!(target: LOG_TARGET, "Thread join failed: {:?}", err);
}
}

shutdown.trigger();
Expand Down Expand Up @@ -395,11 +433,13 @@ fn run_thread<T: EngineImpl>(
let mut max_diff = 0;
let mut last_printed = Instant::now();
loop {
info!(target: LOG_TARGET, "Inside loop");
if elapsed.elapsed().as_secs() > config.template_refresh_secs {
info!(target: LOG_TARGET, "Elapsed {:?} > {:?}", elapsed.elapsed().as_secs(), config.template_refresh_secs );
break;
}
let num_iterations = 16;
let (nonce, hashes, diff) = gpu_engine.mine(
let result = gpu_engine.mine(
&gpu_function,
&context,
&data,
Expand All @@ -415,15 +455,30 @@ fn run_thread<T: EngineImpl>(
* grid_size,
* data_buf.as_device_ptr(),
* &output_buf, */
)?;
);
let (nonce, hashes, diff) = match result {
Ok(values) => {
info!(target: LOG_TARGET,
"Mining successful: nonce={:?}, hashes={}, difficulty={}",
values.0, values.1, values.2
);
(values.0, values.1, values.2)
},
Err(e) => {
error!(target: LOG_TARGET, "Mining failed: {}", e);
return Err(e.into());
},
};
if let Some(ref n) = nonce {
header.nonce = *n;
}
if diff > max_diff {
max_diff = diff;
}
nonce_start = nonce_start + hashes as u64;
info!(target: LOG_TARGET, "Nonce start {:?}", nonce_start.to_formatted_string(&Locale::en));
if elapsed.elapsed().as_secs() > 1 {
info!(target: LOG_TARGET, "Elapsed {:?} > 1",elapsed.elapsed().as_secs());
if Instant::now() - last_printed > std::time::Duration::from_secs(2) {
last_printed = Instant::now();
let hash_rate = (nonce_start - first_nonce) / elapsed.elapsed().as_secs();
Expand All @@ -446,7 +501,9 @@ fn run_thread<T: EngineImpl>(
hash_rate.to_formatted_string(&Locale::en));
}
}
info!(target: LOG_TARGET, "Inside loop nonce {:?}", nonce.clone().is_some());
if nonce.is_some() {
info!(target: LOG_TARGET, "Inside loop nonce is some {:?}", nonce.clone().is_some());
header.nonce = nonce.unwrap();

let mut mined_block = block.clone();
Expand All @@ -462,8 +519,10 @@ fn run_thread<T: EngineImpl>(
println!("Error submitting block: {:?}", e);
},
}
info!(target: LOG_TARGET, "Inside thread loop (nonce) break {:?}", num_threads);
break;
}
info!(target: LOG_TARGET, "Inside thread loop break {:?}", num_threads);
// break;
}
}
Expand All @@ -475,6 +534,7 @@ async fn get_template(
round: u32,
benchmark: bool,
) -> Result<(u64, minotari_app_grpc::tari_rpc::Block, BlockHeader, FixedHash), anyhow::Error> {
info!(target: LOG_TARGET, "Getting block template round {:?}", round);
if benchmark {
return Ok((
u64::MAX,
Expand Down Expand Up @@ -538,6 +598,7 @@ async fn get_template(
RangeProofType::RevealedValue,
)
.await?;
info!(target: LOG_TARGET, "Getting block template difficulty {:?}", miner_data.target_difficulty.clone());
let body = block_template.body.as_mut().expect("no block body");
let grpc_output = GrpcTransactionOutput::try_from(coinbase_output.clone()).map_err(|s| anyhow!(s))?;
body.outputs.push(grpc_output);
Expand Down
71 changes: 64 additions & 7 deletions src/opencl_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ impl EngineImpl for OpenClEngine {
};
gpu_devices.push(gpu);
total_devices += 1;
info!(target: LOG_TARGET, "Device nr {:?}: {}", total_devices, dev.name()?);
println!("Device nr {:?}: {}", total_devices, dev.name()?);
}
}
if total_devices > 0 {
Expand All @@ -105,7 +107,14 @@ impl EngineImpl for OpenClEngine {

fn create_main_function(&self, context: &Self::Context) -> Result<Self::Function, anyhow::Error> {
info!(target: LOG_TARGET, "OpenClEngine: create function");
let program = create_program_from_source(&context.context).unwrap();
// let program = create_program_from_source(&context.context).unwrap();
let program = match create_program_from_source(&context.context) {
Some(program) => program,
None => {
error!(target: LOG_TARGET, "Failed to create program");
return Err(anyhow::Error::msg("Failed to create program"));
},
};
Ok(OpenClFunction { program })
}

Expand All @@ -130,41 +139,87 @@ impl EngineImpl for OpenClEngine {
// 0
// )?;
unsafe {
info!(target: LOG_TARGET, "OpenClEngine: mine unsafe");
let queue = CommandQueue::create_default(&context.context, CL_QUEUE_PROFILING_ENABLE)
.expect("could not create command queue");

info!(target: LOG_TARGET, "OpenClEngine: created queue");

let batch_size = 1 << 19; // According to tests, but we can try work this out
let global_dimensions = [batch_size as usize];
let max_workgroups = Device::new(context.context.devices()[0]).max_work_group_size().unwrap();
// dbg!(max_compute);
// let max_work_items = queue.max_work_item_dimensions();
// dbg!(max_work_items);
// dbg!("here");
info!(target: LOG_TARGET, "OpenClEngine: cmax workgroups {:?}", max_workgroups);

let mut buffer =
Buffer::<cl_ulong>::create(&context.context, CL_MEM_READ_ONLY, data.len(), ptr::null_mut())?;
queue.enqueue_write_buffer(&mut buffer, CL_TRUE, 0, data, &[])?;
let output_buffer = Buffer::<cl_ulong>::create(&context.context, CL_MEM_WRITE_ONLY, 2, ptr::null_mut())?;
match Buffer::<cl_ulong>::create(&context.context, CL_MEM_READ_ONLY, data.len(), ptr::null_mut()) {
Ok(buffer) => buffer,
Err(e) => {
error!(target: LOG_TARGET, "OpenClEngine: failed to create buffer: {}", e);
return Err(e.into());
},
};
match queue.enqueue_write_buffer(&mut buffer, CL_TRUE, 0, data, &[]) {
Ok(_) => info!(target: LOG_TARGET, "OpenClEngine: buffer created"),
Err(e) => {
error!(target: LOG_TARGET, "OpenClEngine: failed to enqueue write buffer: {}", e);
return Err(e.into());
},
};

info!(target: LOG_TARGET, "OpenClEngine: buffer created",);
let output_buffer =
match Buffer::<cl_ulong>::create(&context.context, CL_MEM_WRITE_ONLY, 2, ptr::null_mut()) {
Ok(buffer) => buffer,
Err(e) => {
error!(target: LOG_TARGET, "OpenClEngine: failed to create output buffer: {}", e);
return Err(e.into());
},
};
// dbg!(block_size);
// dbg!(grid_size);
info!(target: LOG_TARGET, "OpenClEngine: output buffer created",);
info!(target: LOG_TARGET, "OpenClEngine: kernel work_size: g:{:?}",(grid_size * block_size) as usize);
for kernel in kernels {
ExecuteKernel::new(&kernel)
match ExecuteKernel::new(&kernel)
.set_arg(&buffer)
.set_arg(&nonce_start)
.set_arg(&min_difficulty)
.set_arg(&num_iterations)
.set_arg(&output_buffer)

.set_global_work_size((grid_size * block_size) as usize)
// .set_local_work_size(max_workgroups)
.set_local_work_size((grid_size * block_size / 2) as usize)
// .set_wait_event(&y_write_event)
.enqueue_nd_range(&queue).expect("culd not queue");
.enqueue_nd_range(&queue)
{
Ok(_) => info!(target: LOG_TARGET, "Kernel enqueued successfully"),
Err(e) => {
error!(target: LOG_TARGET, "Failed to enqueue kernel: {}", e);
// TODO
// if e == opencl3::Error::OutOfResources {
// error!(target: LOG_TARGET, "CL_OUT_OF_RESOURCES: insufficient resources");
// // TODO Handle the error accordingly
// }
},
}
// .expect("could not queue")
// .map_err(|e| {
// error!(target: LOG_TARGET, "OpenClEngine: failed to enqueue kernel: {}", e);
// e
// });

// TODO: find out better workdim
// queue.enqueue_nd_range_kernel(kernel.get(), 1, 0 as *const usize, global_dimensions.as_ptr(), 0 as
// *const usize, &[]).expect("could not execute");
}
queue.finish()?;

let mut output = vec![0u64, 0u64];
info!(target: LOG_TARGET, "OpenClEngine: mine output {:?}", output[0] > 0);
queue.enqueue_read_buffer(&output_buffer, CL_TRUE, 0, output.as_mut_slice(), &[])?;
if output[0] > 0 {
return Ok((
Expand All @@ -173,8 +228,10 @@ impl EngineImpl for OpenClEngine {
u64::MAX / output[1],
));
}
info!(target: LOG_TARGET, "OpenClEngine: mine unsafe return ok");
return Ok((None, grid_size * block_size * num_iterations, u64::MAX / output[1]));
}
info!(target: LOG_TARGET, "OpenClEngine: mine return ok");
Ok((None, grid_size * block_size * num_iterations, 0))
}
}
Expand Down

0 comments on commit 0faacf9

Please sign in to comment.