Skip to content

Commit

Permalink
CV2-2648: migrate tipline requests (#1811)
Browse files Browse the repository at this point in the history
  • Loading branch information
melsawy authored Feb 28, 2024
1 parent 248a78e commit 2f00193
Show file tree
Hide file tree
Showing 3 changed files with 379 additions and 0 deletions.
2 changes: 2 additions & 0 deletions app/models/tipline_request.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class TiplineRequest < ApplicationRecord
end
24 changes: 24 additions & 0 deletions db/migrate/20231122054128_create_tipline_requests.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
class CreateTiplineRequests < ActiveRecord::Migration[6.1]
def change
create_table :tipline_requests do |t|
t.string :language, null: false, index: true
t.string :tipline_user_uid, index: true
t.string :platform, null: false, index: true
t.string :smooch_request_type, null: false
t.string :smooch_resource_id, null: true
t.string :smooch_message_id, null: true, default: ''
t.string :smooch_conversation_id, null: true
t.jsonb :smooch_data, null: false, default: {}
t.references :associated, polymorphic: true, null: false
t.references :team, null: false
t.references :user
t.integer :smooch_report_received_at, default: 0
t.integer :smooch_report_update_received_at, default: 0
t.integer :smooch_report_correction_sent_at, default: 0
t.integer :smooch_report_sent_at, default: 0
t.timestamps
end
add_index :tipline_requests, [:associated_type, :associated_id]
add_index :tipline_requests, :smooch_message_id, unique: true, where: "smooch_message_id IS NOT NULL AND smooch_message_id != ''"
end
end
353 changes: 353 additions & 0 deletions lib/tasks/migrate/20231122054128_migrate_tipline_requests.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
namespace :check do
namespace :migrate do
def parse_args(args)
output = {}
return output if args.blank?
args.each do |a|
arg = a.split('&')
arg.each do |pair|
key, value = pair.split(':')
output.merge!({ key => value })
end
end
output
end

def migrate_team_tipline_requests(team, batch_size)
total_count = Annotation.where(annotation_type: 'smooch', annotated_type: 'Team', annotated_id: team.id).count
failed_teams = []
if total_count > 0
puts "\nMigrating Team requests[#{team.slug}]: #{total_count} requests"
inserts = 0
Annotation.where(annotation_type: 'smooch', annotated_type: 'Team', annotated_id: team.id)
.find_in_batches(:batch_size => batch_size) do |annotations|
print '.'
smooch_obj = {}
smooch_user = {}
obj_requests = Hash.new {|hash, key| hash[key] = [] }
# Collect request associated id and user id
annotations.each do |d|
smooch_obj[d.id] = d.annotated_id
smooch_user[d.id] = d.annotator_id
end
DynamicAnnotation::Field.where(annotation_type: 'smooch', annotation_id: smooch_obj.keys).find_each do |f|
print '.'
value = f.value
# I mapped `smooch_report_received` field in two columns `smooch_report_received_at` & `smooch_report_update_received_at`
field_name = f.field_name == 'smooch_report_received' ? 'smooch_report_received_at' : f.field_name
if field_name == 'smooch_data'
value.gsub!('\u0000', '') if value.is_a?(String) # Avoid PG::UntranslatableCharacter exception
value = begin JSON.parse(value) rescue {} end
# These fields are indifferent TiplineRequest columns so collect these fields with their values
# N.B: I set smooch_data.id as a primary key for TiplineRequest table
# and set a default values for some columns to avoid PG error
sd_fields = [
{ 'id' => f.id },
{ 'tipline_user_uid' => value.dig('authorId') },
{ 'language' => value.dig('language') || 'en' },
{ 'platform' => value.dig('source', 'type') || 'whatsapp' },
{ 'created_at' => f.created_at },
{ 'updated_at' => f.updated_at },
]
obj_requests[f.annotation_id].concat(sd_fields)
end
obj_requests[f.annotation_id] << { field_name => value }
if field_name == 'smooch_report_received_at' && f.created_at != f.updated_at
# Get the value for `smooch_report_update_received_at` column
obj_requests[f.annotation_id] << { 'smooch_report_update_received_at' => value }
end
end
requests = []
obj_requests.each do |d_id, fields|
# Build TiplineRequest raw and should include all existing columns
r = {
associated_type: 'Team',
associated_id: smooch_obj[d_id],
user_id: smooch_user[d_id],
team_id: team.id,
smooch_request_type: 'default_requests',
smooch_resource_id: nil,
smooch_message_id: '',
smooch_conversation_id: nil,
smooch_report_received_at: 0,
smooch_report_update_received_at: 0,
smooch_report_correction_sent_at: 0,
smooch_report_sent_at: 0,
}.with_indifferent_access
fields.each do |raws|
raws.each{|k, v| r[k] = v }
end
requests << r
end
unless requests.blank?
inserts += requests.count
puts "\nImporting Team requests[#{team.slug}]: #{inserts}/#{total_count}\n"
begin
TiplineRequest.insert_all(requests)
rescue
failed_teams << team.id unless failed_teams.include?(team.id)
end
end
end
end
failed_teams
end

def bulk_import_requests_items(annotated_type, ids, team_id)
print '.'
smooch_obj = {}
smooch_user = {}
obj_requests = Hash.new {|hash, key| hash[key] = [] }
# Collect request associated id and user id
Annotation.where(annotation_type: 'smooch', annotated_type: annotated_type, annotated_id: ids).find_each do |d|
print '.'
smooch_obj[d.id] = d.annotated_id
smooch_user[d.id] = d.annotator_id
end
DynamicAnnotation::Field.where(annotation_type: 'smooch', annotation_id: smooch_obj.keys).find_each do |f|
print '.'
value = f.value
# I mapped `smooch_report_received` field in two columns `smooch_report_received_at` & `smooch_report_update_received_at`
field_name = f.field_name == 'smooch_report_received' ? 'smooch_report_received_at' : f.field_name
if field_name == 'smooch_data'
value.gsub!('\u0000', '') if value.is_a?(String) # Avoid PG::UntranslatableCharacter exception
value = begin JSON.parse(value) rescue {} end
# These fields are indifferent TiplineRequest columns so collect these fields with their values
# N.B: I set smooch_data.id as a primary key for TiplineRequest table
# and set a default values for some columns to avoid PG error
sd_fields = [
{ 'id' => f.id },
{ 'tipline_user_uid' => value.dig('authorId') },
{ 'language' => value.dig('language') || 'en' },
{ 'platform' => value.dig('source', 'type') || 'whatsapp' },
{ 'created_at' => f.created_at },
{ 'updated_at' => f.updated_at },
]
obj_requests[f.annotation_id].concat(sd_fields)
end
obj_requests[f.annotation_id] << { field_name => value }
if field_name == 'smooch_report_received_at' && f.created_at != f.updated_at
# Get the value for `smooch_report_update_received_at` column
obj_requests[f.annotation_id] << { 'smooch_report_update_received_at' => value }
end
end
requests = []
obj_requests.each do |d_id, fields|
# Build TiplineRequest raw and should include all existing columns
r = {
associated_type: annotated_type,
associated_id: smooch_obj[d_id],
user_id: smooch_user[d_id],
team_id: team_id,
smooch_request_type: 'default_requests',
smooch_resource_id: nil,
smooch_message_id: '',
smooch_conversation_id: nil,
smooch_report_received_at: 0,
smooch_report_update_received_at: 0,
smooch_report_correction_sent_at: 0,
smooch_report_sent_at: 0,
}.with_indifferent_access
fields.each do |raws|
raws.each{|k, v| r[k] = v }
end
requests << r
end
requests
end
# Migrate TiplineRequests
# bundle exec rails check:migrate:migrate_tipline_requests['slug:team_slug&batch_size:batch_size']
task migrate_tipline_requests: :environment do |_t, args|
started = Time.now.to_i
data_args = parse_args args.extras
batch_size = data_args['batch_size'] || 1500
batch_size = batch_size.to_i
slug = data_args['slug']
condition = {}
last_team_id = Rails.cache.read('check:migrate:migrate_tipline_requests:team_id') || 0
unless slug.blank?
last_team_id = 0
if slug == 'custom_slugs'
puts "Type team slugs separated by comma then press enter"
print ">> "
slug = begin STDIN.gets.chomp.split(',').map{ |s| s.to_s } rescue [] end
raise "You must call rake task with team slugs" if slug.blank?
end
condition = { slug: slug }
end
failed_project_media_requests = []
failed_team_requests = []
failed_tipline_resource_requests = []
Team.where(condition).where('id > ?', last_team_id).find_each do |team|
print '.'
migrate_teams = Rails.cache.read('check:migrate:migrate_tipline_requests:migrate_teams') || []
next if migrate_teams.include?(team.id)
# Migrated Team requests
failed_team_requests = migrate_team_tipline_requests(team, batch_size)
# Migrate TiplineResource requests
total_count = team.tipline_resources.joins("INNER JOIN annotations a ON a.annotated_id = tipline_resources.id")
.where("a.annotated_type = ? AND a.annotation_type = ?", 'TiplineResource', 'smooch').count
if total_count > 0
puts "\nMigrating TiplineResource requests[#{team.slug}]: #{total_count} requests"
inserts = 0
team.tipline_resources.find_in_batches(:batch_size => batch_size) do |items|
print '.'
ids = items.map(&:id)
requests = bulk_import_requests_items('TiplineResource', ids, team.id)
unless requests.blank?
inserts += requests.count
puts "\nImporting TiplineResource[#{team.slug}]: #{inserts}/#{total_count}\n"
begin
TiplineRequest.insert_all(requests)
rescue
failed_tipline_resource_requests << team.id unless failed_tipline_resource_requests.include?(team.id)
end
end
end
end
# Migrate ProjectMedia requests
# Get the total count for team requests
total_count = team.project_medias.joins("INNER JOIN annotations a ON a.annotated_id = project_medias.id")
.where("a.annotated_type = ? AND a.annotation_type = ?", 'ProjectMedia', 'smooch').count
if total_count > 0
puts "\nMigrating ProjectMedia requests[#{team.slug}]: #{total_count} requests"
inserts = 0
team.project_medias.find_in_batches(:batch_size => batch_size) do |pms|
print '.'
ids = pms.map(&:id)
requests = bulk_import_requests_items('ProjectMedia', ids, team.id)
unless requests.blank?
inserts += requests.count
puts "\nImporting ProjectMedia requests[#{team.slug}]: #{inserts}/#{total_count}\n"
begin
TiplineRequest.insert_all(requests)
rescue
failed_teams << team.id unless failed_teams.include?(team.id)
end
end
end
end
unless slug.blank?
migrate_teams << team.id
Rails.cache.write('check:migrate:migrate_tipline_requests:migrate_teams', migrate_teams)
end
Rails.cache.write('check:migrate:migrate_tipline_requests:team_id', team.id) if slug.blank?
end
puts "Failed to import some project media requests related to the following teams #{failed_project_media_requests.inspect}" if failed_project_media_requests.length > 0
puts "Failed to import some team requests related to the following teams #{failed_team_requests.inspect}" if failed_team_requests.length > 0
puts "Failed to import some tipline resource requests related to the following teams #{failed_tipline_resource_requests.inspect}" if failed_tipline_resource_requests.length > 0
minutes = ((Time.now.to_i - started) / 60).to_i
puts "[#{Time.now}] Done in #{minutes} minutes."
end

# list teams that have a different count between TiplineRequest and smooch annotation (list teams that not fully migrated)
# bundle exec rails check:migrate:migrate_tipline_requests_status[team_slug1, team_slug2, ...]
task migrate_tipline_requests_status: :environment do |_t, args|
# Get missing requests based on a comparison between TiplineRequest.id and smooch_data field id
slugs = args.extras
condition = {}
condition = { slug: slugs } unless slugs.blank?
logs = []
Team.where(condition).find_each do |team|
print '.'
requests_ids = TiplineRequest.where(team_id: team.id).map(&:id)
requests_count = requests_ids.count
smooch_ids = Annotation.where(annotation_type: 'smooch', annotated_type: 'ProjectMedia')
.joins("INNER JOIN project_medias pm ON pm.id = annotations.annotated_id")
.where('pm.team_id = ?', team.id)
sd_ids = DynamicAnnotation::Field.where(field_name: 'smooch_data', annotation_type: 'smooch', annotation_id: smooch_ids)
.where.not(id: requests_ids).map(&:id)
logs << {id: team.id, slug: team.slug, requests: requests_count, smooch: sd_ids} if sd_ids.length > 0
end
puts "List of teams that not fully migrated"
pp logs
end

# Migrate missing requests related to specific teams
# bundle exec rails check:migrate:migrate_tipline_requests_missing_requests['slug:team_slug&batch_size:batch_size']
task migrate_tipline_requests_missing_requests: :environment do |_t, args|
data_args = parse_args args.extras
slug = data_args['slug']
raise "You must call rake task with team slugs" if slug.blank?
condition = { slug: slug }
batch_size = data_args['batch_size'] || 1000
started = Time.now.to_i
Team.where(condition).find_each do |team|
print '.'
requests_ids = TiplineRequest.where(team_id: team.id).map(&:id)
smooch_ids = Annotation.where(annotation_type: 'smooch', annotated_type: 'ProjectMedia')
.joins("INNER JOIN project_medias pm ON pm.id = annotations.annotated_id")
.where('pm.team_id = ?', team.id).map(&:id)
sd_fields = DynamicAnnotation::Field.where(field_name: 'smooch_data', annotation_type: 'smooch', annotation_id: smooch_ids)
.where.not(id: requests_ids)
sd_ids = sd_fields.map(&:id)
if sd_ids.length > 0
total_count = sd_ids.length
inserts = 0
print '.'
smooch_ids = sd_fields.map(&:annotation_id)
Annotation.where(id: smooch_ids).find_in_batches(:batch_size => batch_size) do |annotations|
smooch_pm = {}
smooch_user = {}
pm_requests = Hash.new {|hash, key| hash[key] = [] }
annotations.each do |d|
print '.'
smooch_pm[d.id] = d.annotated_id
smooch_user[d.id] = d.annotator_id
end
DynamicAnnotation::Field.where(annotation_type: 'smooch', annotation_id: annotations.map(&:id)).find_each do |f|
print '.'
value = f.value
field_name = f.field_name == 'smooch_report_received' ? 'smooch_report_received_at' : f.field_name
if field_name == 'smooch_data'
value.gsub!('\u0000', '') if value.is_a?(String) # Avoid PG::UntranslatableCharacter exception
value = begin JSON.parse(value) rescue {} end
sd_fields = [
{ 'id' => f.id },
{ 'tipline_user_uid' => value.dig('authorId') },
{ 'language' => value.dig('language') || 'en' },
{ 'platform' => value.dig('source', 'type') || 'whatsapp' },
{ 'created_at' => f.created_at },
{ 'updated_at' => f.updated_at },
]
pm_requests[f.annotation_id].concat(sd_fields)
end
pm_requests[f.annotation_id] << { field_name => value }
if field_name == 'smooch_report_received_at' && f.created_at != f.updated_at
pm_requests[f.annotation_id] << { 'smooch_report_update_received_at' => value }
end
end
requests = []
pm_requests.each do |d_id, fields|
r = {
associated_type: 'ProjectMedia',
associated_id: smooch_pm[d_id],
user_id: smooch_user[d_id],
team_id: team.id,
smooch_request_type: 'default_requests',
smooch_resource_id: nil,
smooch_message_id: '',
smooch_conversation_id: nil,
smooch_report_received_at: 0,
smooch_report_update_received_at: 0,
smooch_report_correction_sent_at: 0,
smooch_report_sent_at: 0,
}.with_indifferent_access
fields.each do |raws|
raws.each{|k, v| r[k] = v }
end
requests << r
end
unless requests.blank?
inserts += requests.count
puts "\n#{team.slug}:: Importing #{inserts}/#{total_count} requests\n"
TiplineRequest.insert_all(requests) unless requests.blank?
end
end
end
end
minutes = ((Time.now.to_i - started) / 60).to_i
puts "[#{Time.now}] Done in #{minutes} minutes."
end
end
end

0 comments on commit 2f00193

Please sign in to comment.