Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add api -> video processor job request after video is uploaded #8

Merged
merged 2 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .env

This file was deleted.

2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
uploads/
node_modules/
.DS_Store
.env*
!.env.example
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ tower-http = { version = "0.6", features = ["fs", "trace"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
db = { path = "../db" }
grpc = { path = "../grpc" }
tonic = "0.12"
prost = "0.13"
4 changes: 4 additions & 0 deletions api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ pub struct AppState {
config: Config,
}

pub enum AppError {
QueueConnectionError,
}

#[tokio::main]
async fn main() {
// Start the tracer
Expand Down
32 changes: 31 additions & 1 deletion api/src/upload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ use std::sync::Arc;

use axum::extract::Multipart;
use axum::extract::State;
use grpc::video_processing::raw_video_processor_client::RawVideoProcessorClient;
use grpc::video_processing::ProcessRawVideoRequest;
use sqlx::types::chrono;
use sqlx::Pool;
use sqlx::Postgres;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tonic::Request;

use crate::templates::UploadTemplate;
use crate::AppState;
Expand All @@ -31,10 +34,17 @@ pub async fn handle_upload_mp4(
}

if file_written {
// First, save the raw video
let video = save_raw_upload(&state.db, file_id, file_path)
.await
.map_err(|err| return UploadTemplate::new().upload_error(&err))
.unwrap();
.expect("Could not save raw video");
// Second, send a job to the video processing queue to process this video
send_video_processing_request(video.id.clone(), video.raw_file_path)
.await
.expect("Could send processing request");
// Lastly, render the upload complete template
// TODO: Add processing status into template
UploadTemplate::new().upload_id(&video.id)
} else {
UploadTemplate::new().upload_error("Some error uploading file")
Expand Down Expand Up @@ -82,3 +92,23 @@ async fn save_raw_upload(

Ok(video)
}

/// Sends a job to the video processor service to queue a video processing job
async fn send_video_processing_request(video_id: String, file_path: String) -> Result<(), String> {
let queue_url = std::env::var("QUEUE_URL").expect("QUEUE_URL must be set");
let mut client = RawVideoProcessorClient::connect(queue_url)
.await
.map_err(|e| format!("Could not connect to RawVideoProcessorClient {}", e))?;

let request = Request::new(ProcessRawVideoRequest {
id: video_id,
path: file_path,
});
let response = client
.process_raw_video(request)
.await
.map_err(|e| format!("Could not send video processing request {}", e));

tracing::debug!("Job processing response: {:?}", response);
Ok(())
}