Skip to content

Commit

Permalink
Implementing download and the full task
Browse files Browse the repository at this point in the history
  • Loading branch information
caiosba committed Mar 3, 2024
1 parent 98f11ca commit c73e840
Showing 1 changed file with 69 additions and 23 deletions.
92 changes: 69 additions & 23 deletions lib/tasks/check_khousheh.rake
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ namespace :check do

PER_PAGE = Rails.env.development? ? 10 : 2000

TIMESTAMP = Time.now.to_f.to_s.gsub('.', '')

def print_task_title(title)
puts '----------------------------------------------------------------'
puts title.upcase + '...'
puts '----------------------------------------------------------------'
puts
end

# FIXME: Load only the claims we need
def claim_uuid_for_duplicate_quote
puts 'Collecting claim media UUIDs for duplicate quotes...'
claim_uuid = {}
Expand All @@ -17,14 +27,16 @@ namespace :check do
# docker-compose exec -e elasticsearch_log=0 api bundle exec rake check:khousheh:generate_input
desc 'Generate input files in JSON format.'
task generate_input: :environment do
print_task_title 'Generating input files'
FileUtils.mkdir_p(File.join(Rails.root, 'tmp', 'feed-clusters-input'))
started = Time.now.to_i
# Collect claim media UUIDs for duplicate quote
claim_uuid = claim_uuid_for_duplicate_quote
sort = [{ annotated_id: { order: :asc } }]
Feed.find_each do |feed|
# Only feeds that are sharing media
if feed.data_points.to_a.include?(2)
output = { call_id: feed.uuid, nodes: [], edges: [] }
output = { call_id: "#{TIMESTAMP}-#{feed.uuid}", nodes: [], edges: [] }
Team.current = feed.team
query = { feed_id: feed.id, feed_view: 'media', show_similar: true }
es_query = CheckSearch.new(query.to_json).medias_query
Expand Down Expand Up @@ -92,7 +104,7 @@ namespace :check do
search_after = [pm_ids.max]
puts "\nDone for page #{page}/#{pages}\n"
end
file = File.open(File.join(Rails.root, 'tmp', "feed-#{feed.uuid}.json"), 'w+')
file = File.open(File.join(Rails.root, 'tmp', 'feed-clusters-input', "#{TIMESTAMP}-#{feed.uuid}.json"), 'w+')
file.puts output.to_json
file.close
Team.current = nil
Expand All @@ -104,34 +116,30 @@ namespace :check do

# docker-compose exec -e elasticsearch_log=0 -e CLUSTER_INPUT_BUCKET=bucket-name api bundle exec rake check:khousheh:upload
desc 'Upload input JSON files to S3.'
task upload: :environment do
task upload: [:environment, :generate_input] do
print_task_title 'Uploading input files'
started = Time.now.to_i
bucket_name = ENV.fetch('CLUSTER_INPUT_BUCKET')
region = CheckConfig.get('storage_bucket_region') || 'eu-west-1'
begin
s3_client = Aws::S3::Client.new(region: region)
rescue Aws::Sigv4::Errors::MissingCredentialsError
puts 'Please provide the AWS credentials.'
exit 1
end
Feed.find_each do |feed|
# Only feeds that are sharing media
if feed.data_points.to_a.include?(2)
filename = "feed-#{feed.uuid}.json"
filepath = File.join(Rails.root, 'tmp', filename)
begin
response = s3_client.put_object(
bucket: bucket_name,
key: filename,
body: File.read(filepath)
)
if response.etag
puts "Uploaded #{filename}."
else
puts "Error uploading #{filename} to S3. Check credentials?"
end
rescue StandardError => e
puts "Error uploading S3 object: #{e.message}."
filename = "#{TIMESTAMP}-#{feed.uuid}.json"
filepath = File.join(Rails.root, 'tmp', 'feed-clusters-input', filename)
response = s3_client.put_object(
bucket: bucket_name,
key: filename,
body: File.read(filepath)
)
if response.etag
puts "Uploaded #{filename}."
else
puts "Error uploading #{filename} to S3."
end
end
end
Expand All @@ -141,16 +149,47 @@ namespace :check do

# docker-compose exec -e elasticsearch_log=0 -e CLUSTER_OUTPUT_BUCKET=bucket-name api bundle exec rake check:khousheh:download
desc 'Download json file from S3'
task download: :environment do
task download: [:environment, :upload] do
print_task_title 'Downloading output files'
FileUtils.mkdir_p(File.join(Rails.root, 'tmp', 'feed-clusters-output'))
started = Time.now.to_i
# TODO: Download output files from S3.
bucket_name = ENV.fetch('CLUSTER_OUTPUT_BUCKET')
region = CheckConfig.get('storage_bucket_region') || 'eu-west-1'
s3_client = Aws::S3::Client.new(region: region)
Feed.find_each do |feed|
# Only feeds that are sharing media
if feed.data_points.to_a.include?(2)
filename = "#{TIMESTAMP}-#{feed.uuid}.json"
filepath = File.join(Rails.root, 'tmp', 'feed-clusters-output', filename)
# Try during one hour
attempts = 0
object = nil
while attempts < 60 && object.nil?
begin
object = s3_client.get_object(bucket: bucket_name, key: filename)
rescue StandardError => e
puts "File #{filename} not found in bucket #{bucket_name}, trying again in 1 minute..."
sleep 60
attempts += 1
end
end
if object.nil?
puts "Aborting. File #{filename} not found in bucket #{bucket_name}."
else
file = File.open(File.join(Rails.root, 'tmp', 'feed-clusters-output', "#{TIMESTAMP}-#{feed.uuid}.json"), 'w+')
file.puts object.body.read
file.close
end
end
end
minutes = ((Time.now.to_i - started) / 60).to_i
puts "[#{Time.now}] Done in #{minutes} minutes."
end

# docker-compose exec -e elasticsearch_log=0 api bundle exec rake check:khousheh:parse_output
desc 'Parse output files (JSON format) and recreate clusters.'
task parse_output: :environment do
task parse_output: [:environment, :download] do
print_task_title 'Parsing output files'
started = Time.now.to_i
claim_uuid = claim_uuid_for_duplicate_quote
sort = [{ annotated_id: { order: :asc } }]
Expand All @@ -161,7 +200,7 @@ namespace :check do
puts "Parsing feed #{feed.name}..."
begin
last_old_cluster_id = Cluster.where(feed_id: feed.id).order('id ASC').last&.id
clusters = JSON.parse(File.read(File.join(Rails.root, 'tmp', "#{feed.uuid}.json")))
clusters = JSON.parse(File.read(File.join(Rails.root, 'tmp', 'feed-clusters-output', "#{TIMESTAMP}-#{feed.uuid}.json")))
started_at = Time.now.to_f
Cluster.transaction do
# Create clusters
Expand Down Expand Up @@ -244,6 +283,7 @@ namespace :check do
updated_cluster_attributes[:project_media_id] = cluster.project_media_id || pm.id
updated_cluster_attributes[:title] = cluster.title || pm.title
# Update cluster
# FIXME: Update clusters in batches
cluster.update_columns(updated_cluster_attributes)
end
end
Expand Down Expand Up @@ -272,5 +312,11 @@ namespace :check do
minutes = ((Time.now.to_i - started) / 60).to_i
puts "[#{Time.now}] Done in #{minutes} minutes."
end

# docker-compose exec -e elasticsearch_log=0 -e CLUSTER_INPUT_BUCKET=bucket-name -e CLUSTER_OUTPUT_BUCKET=bucket-name api bundle exec rake check:khousheh:rebuild_clusters
desc 'Rebuild clusters.'
task rebuild: [:environment, :parse_output] do
print_task_title "[#{TIMESTAMP}] Rebuilding clusters"
end
end
end

0 comments on commit c73e840

Please sign in to comment.