Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CV2-2648: migrate tipline requests #1811

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/models/tipline_request.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class TiplineRequest < ApplicationRecord
end
24 changes: 24 additions & 0 deletions db/migrate/20231122054128_create_tipline_requests.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
class CreateTiplineRequests < ActiveRecord::Migration[6.1]
def change
create_table :tipline_requests do |t|
t.string :language, null: false, index: true
t.string :tipline_user_uid, index: true
t.string :platform, null: false, index: true
t.string :smooch_request_type, null: false
t.string :smooch_resource_id, null: true
t.string :smooch_message_id, null: true, default: ''
t.string :smooch_conversation_id, null: true
t.jsonb :smooch_data, null: false, default: {}
t.references :associated, polymorphic: true, null: false
t.references :team, null: false
t.references :user
t.integer :smooch_report_received_at, default: 0
t.integer :smooch_report_update_received_at, default: 0
t.integer :smooch_report_correction_sent_at, default: 0
t.integer :smooch_report_sent_at, default: 0
t.timestamps
end
add_index :tipline_requests, [:associated_type, :associated_id]
add_index :tipline_requests, :smooch_message_id, unique: true, where: "smooch_message_id IS NOT NULL AND smooch_message_id != ''"
end
end
353 changes: 353 additions & 0 deletions lib/tasks/migrate/20231122054128_migrate_tipline_requests.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
namespace :check do
namespace :migrate do
def parse_args(args)
output = {}
return output if args.blank?
args.each do |a|
arg = a.split('&')
arg.each do |pair|
key, value = pair.split(':')
output.merge!({ key => value })
end
end
output
end

def migrate_team_tipline_requests(team, batch_size)
total_count = Annotation.where(annotation_type: 'smooch', annotated_type: 'Team', annotated_id: team.id).count
failed_teams = []
if total_count > 0
puts "\nMigrating Team requests[#{team.slug}]: #{total_count} requests"
inserts = 0
Annotation.where(annotation_type: 'smooch', annotated_type: 'Team', annotated_id: team.id)
.find_in_batches(:batch_size => batch_size) do |annotations|
print '.'
smooch_obj = {}
smooch_user = {}
obj_requests = Hash.new {|hash, key| hash[key] = [] }
# Collect request associated id and user id
annotations.each do |d|
smooch_obj[d.id] = d.annotated_id
smooch_user[d.id] = d.annotator_id
end
DynamicAnnotation::Field.where(annotation_type: 'smooch', annotation_id: smooch_obj.keys).find_each do |f|
print '.'
value = f.value
# I mapped `smooch_report_received` field in two columns `smooch_report_received_at` & `smooch_report_update_received_at`
field_name = f.field_name == 'smooch_report_received' ? 'smooch_report_received_at' : f.field_name
if field_name == 'smooch_data'
value.gsub!('\u0000', '') if value.is_a?(String) # Avoid PG::UntranslatableCharacter exception
value = begin JSON.parse(value) rescue {} end
# These fields are indifferent TiplineRequest columns so collect these fields with their values
# N.B: I set smooch_data.id as a primary key for TiplineRequest table
# and set a default values for some columns to avoid PG error
sd_fields = [
{ 'id' => f.id },
{ 'tipline_user_uid' => value.dig('authorId') },
{ 'language' => value.dig('language') || 'en' },
{ 'platform' => value.dig('source', 'type') || 'whatsapp' },
{ 'created_at' => f.created_at },
{ 'updated_at' => f.updated_at },
]
obj_requests[f.annotation_id].concat(sd_fields)
end
obj_requests[f.annotation_id] << { field_name => value }
if field_name == 'smooch_report_received_at' && f.created_at != f.updated_at
# Get the value for `smooch_report_update_received_at` column
obj_requests[f.annotation_id] << { 'smooch_report_update_received_at' => value }
end
end
requests = []
obj_requests.each do |d_id, fields|
# Build TiplineRequest raw and should include all existing columns
r = {
associated_type: 'Team',
associated_id: smooch_obj[d_id],
user_id: smooch_user[d_id],
team_id: team.id,
smooch_request_type: 'default_requests',
smooch_resource_id: nil,
smooch_message_id: '',
smooch_conversation_id: nil,
smooch_report_received_at: 0,
smooch_report_update_received_at: 0,
smooch_report_correction_sent_at: 0,
smooch_report_sent_at: 0,
}.with_indifferent_access
fields.each do |raws|
raws.each{|k, v| r[k] = v }
end
requests << r
end
unless requests.blank?
inserts += requests.count
puts "\nImporting Team requests[#{team.slug}]: #{inserts}/#{total_count}\n"
begin
TiplineRequest.insert_all(requests)
rescue
failed_teams << team.id unless failed_teams.include?(team.id)
end
end
end
end
failed_teams
end

def bulk_import_requests_items(annotated_type, ids, team_id)
print '.'
smooch_obj = {}
smooch_user = {}
obj_requests = Hash.new {|hash, key| hash[key] = [] }
# Collect request associated id and user id
Annotation.where(annotation_type: 'smooch', annotated_type: annotated_type, annotated_id: ids).find_each do |d|
print '.'
smooch_obj[d.id] = d.annotated_id
smooch_user[d.id] = d.annotator_id
end
DynamicAnnotation::Field.where(annotation_type: 'smooch', annotation_id: smooch_obj.keys).find_each do |f|
print '.'
value = f.value
# I mapped `smooch_report_received` field in two columns `smooch_report_received_at` & `smooch_report_update_received_at`
field_name = f.field_name == 'smooch_report_received' ? 'smooch_report_received_at' : f.field_name
if field_name == 'smooch_data'
value.gsub!('\u0000', '') if value.is_a?(String) # Avoid PG::UntranslatableCharacter exception
value = begin JSON.parse(value) rescue {} end
# These fields are indifferent TiplineRequest columns so collect these fields with their values
# N.B: I set smooch_data.id as a primary key for TiplineRequest table
# and set a default values for some columns to avoid PG error
sd_fields = [
{ 'id' => f.id },
{ 'tipline_user_uid' => value.dig('authorId') },
{ 'language' => value.dig('language') || 'en' },
{ 'platform' => value.dig('source', 'type') || 'whatsapp' },
{ 'created_at' => f.created_at },
{ 'updated_at' => f.updated_at },
]
obj_requests[f.annotation_id].concat(sd_fields)
end
obj_requests[f.annotation_id] << { field_name => value }
if field_name == 'smooch_report_received_at' && f.created_at != f.updated_at
# Get the value for `smooch_report_update_received_at` column
obj_requests[f.annotation_id] << { 'smooch_report_update_received_at' => value }
end
end
requests = []
obj_requests.each do |d_id, fields|
# Build TiplineRequest raw and should include all existing columns
r = {
associated_type: annotated_type,
associated_id: smooch_obj[d_id],
user_id: smooch_user[d_id],
team_id: team_id,
smooch_request_type: 'default_requests',
smooch_resource_id: nil,
smooch_message_id: '',
smooch_conversation_id: nil,
smooch_report_received_at: 0,
smooch_report_update_received_at: 0,
smooch_report_correction_sent_at: 0,
smooch_report_sent_at: 0,
}.with_indifferent_access
fields.each do |raws|
raws.each{|k, v| r[k] = v }
end
requests << r
end
requests
end
# Migrate TiplineRequests
# bundle exec rails check:migrate:migrate_tipline_requests['slug:team_slug&batch_size:batch_size']
task migrate_tipline_requests: :environment do |_t, args|
started = Time.now.to_i
data_args = parse_args args.extras
batch_size = data_args['batch_size'] || 1500
batch_size = batch_size.to_i
slug = data_args['slug']
condition = {}
last_team_id = Rails.cache.read('check:migrate:migrate_tipline_requests:team_id') || 0
unless slug.blank?
last_team_id = 0
if slug == 'custom_slugs'
puts "Type team slugs separated by comma then press enter"
print ">> "
slug = begin STDIN.gets.chomp.split(',').map{ |s| s.to_s } rescue [] end
raise "You must call rake task with team slugs" if slug.blank?
end
condition = { slug: slug }
end
failed_project_media_requests = []
failed_team_requests = []
failed_tipline_resource_requests = []
Team.where(condition).where('id > ?', last_team_id).find_each do |team|
print '.'
migrate_teams = Rails.cache.read('check:migrate:migrate_tipline_requests:migrate_teams') || []
next if migrate_teams.include?(team.id)
# Migrated Team requests
failed_team_requests = migrate_team_tipline_requests(team, batch_size)
# Migrate TiplineResource requests
total_count = team.tipline_resources.joins("INNER JOIN annotations a ON a.annotated_id = tipline_resources.id")
.where("a.annotated_type = ? AND a.annotation_type = ?", 'TiplineResource', 'smooch').count
if total_count > 0
puts "\nMigrating TiplineResource requests[#{team.slug}]: #{total_count} requests"
inserts = 0
team.tipline_resources.find_in_batches(:batch_size => batch_size) do |items|
print '.'
ids = items.map(&:id)
requests = bulk_import_requests_items('TiplineResource', ids, team.id)
unless requests.blank?
inserts += requests.count
puts "\nImporting TiplineResource[#{team.slug}]: #{inserts}/#{total_count}\n"
begin
TiplineRequest.insert_all(requests)
rescue
failed_tipline_resource_requests << team.id unless failed_tipline_resource_requests.include?(team.id)
end
end
end
end
# Migrate ProjectMedia requests
# Get the total count for team requests
total_count = team.project_medias.joins("INNER JOIN annotations a ON a.annotated_id = project_medias.id")
.where("a.annotated_type = ? AND a.annotation_type = ?", 'ProjectMedia', 'smooch').count
if total_count > 0
puts "\nMigrating ProjectMedia requests[#{team.slug}]: #{total_count} requests"
inserts = 0
team.project_medias.find_in_batches(:batch_size => batch_size) do |pms|
print '.'
ids = pms.map(&:id)
requests = bulk_import_requests_items('ProjectMedia', ids, team.id)
unless requests.blank?
inserts += requests.count
puts "\nImporting ProjectMedia requests[#{team.slug}]: #{inserts}/#{total_count}\n"
begin
TiplineRequest.insert_all(requests)
rescue
failed_teams << team.id unless failed_teams.include?(team.id)
end
end
end
end
unless slug.blank?
migrate_teams << team.id
Rails.cache.write('check:migrate:migrate_tipline_requests:migrate_teams', migrate_teams)
end
Rails.cache.write('check:migrate:migrate_tipline_requests:team_id', team.id) if slug.blank?
end
puts "Failed to import some project media requests related to the following teams #{failed_project_media_requests.inspect}" if failed_project_media_requests.length > 0
puts "Failed to import some team requests related to the following teams #{failed_team_requests.inspect}" if failed_team_requests.length > 0
puts "Failed to import some tipline resource requests related to the following teams #{failed_tipline_resource_requests.inspect}" if failed_tipline_resource_requests.length > 0
minutes = ((Time.now.to_i - started) / 60).to_i
puts "[#{Time.now}] Done in #{minutes} minutes."
end

# list teams that have a different count between TiplineRequest and smooch annotation (list teams that not fully migrated)
# bundle exec rails check:migrate:migrate_tipline_requests_status[team_slug1, team_slug2, ...]
task migrate_tipline_requests_status: :environment do |_t, args|
# Get missing requests based on a comparison between TiplineRequest.id and smooch_data field id
slugs = args.extras
condition = {}
condition = { slug: slugs } unless slugs.blank?
logs = []
Team.where(condition).find_each do |team|
print '.'
requests_ids = TiplineRequest.where(team_id: team.id).map(&:id)
requests_count = requests_ids.count
smooch_ids = Annotation.where(annotation_type: 'smooch', annotated_type: 'ProjectMedia')
.joins("INNER JOIN project_medias pm ON pm.id = annotations.annotated_id")
.where('pm.team_id = ?', team.id)
sd_ids = DynamicAnnotation::Field.where(field_name: 'smooch_data', annotation_type: 'smooch', annotation_id: smooch_ids)
.where.not(id: requests_ids).map(&:id)
logs << {id: team.id, slug: team.slug, requests: requests_count, smooch: sd_ids} if sd_ids.length > 0
end
puts "List of teams that not fully migrated"
pp logs
end

# Migrate missing requests related to specific teams
# bundle exec rails check:migrate:migrate_tipline_requests_missing_requests['slug:team_slug&batch_size:batch_size']
task migrate_tipline_requests_missing_requests: :environment do |_t, args|
data_args = parse_args args.extras
slug = data_args['slug']
raise "You must call rake task with team slugs" if slug.blank?
condition = { slug: slug }
batch_size = data_args['batch_size'] || 1000
started = Time.now.to_i
Team.where(condition).find_each do |team|
print '.'
requests_ids = TiplineRequest.where(team_id: team.id).map(&:id)
smooch_ids = Annotation.where(annotation_type: 'smooch', annotated_type: 'ProjectMedia')
.joins("INNER JOIN project_medias pm ON pm.id = annotations.annotated_id")
.where('pm.team_id = ?', team.id).map(&:id)
sd_fields = DynamicAnnotation::Field.where(field_name: 'smooch_data', annotation_type: 'smooch', annotation_id: smooch_ids)
.where.not(id: requests_ids)
sd_ids = sd_fields.map(&:id)
if sd_ids.length > 0
total_count = sd_ids.length
inserts = 0
print '.'
smooch_ids = sd_fields.map(&:annotation_id)
Annotation.where(id: smooch_ids).find_in_batches(:batch_size => batch_size) do |annotations|
smooch_pm = {}
smooch_user = {}
pm_requests = Hash.new {|hash, key| hash[key] = [] }
annotations.each do |d|
print '.'
smooch_pm[d.id] = d.annotated_id
smooch_user[d.id] = d.annotator_id
end
DynamicAnnotation::Field.where(annotation_type: 'smooch', annotation_id: annotations.map(&:id)).find_each do |f|
print '.'
value = f.value
field_name = f.field_name == 'smooch_report_received' ? 'smooch_report_received_at' : f.field_name
if field_name == 'smooch_data'
value.gsub!('\u0000', '') if value.is_a?(String) # Avoid PG::UntranslatableCharacter exception
value = begin JSON.parse(value) rescue {} end
sd_fields = [
{ 'id' => f.id },
{ 'tipline_user_uid' => value.dig('authorId') },
{ 'language' => value.dig('language') || 'en' },
{ 'platform' => value.dig('source', 'type') || 'whatsapp' },
{ 'created_at' => f.created_at },
{ 'updated_at' => f.updated_at },
]
pm_requests[f.annotation_id].concat(sd_fields)
end
pm_requests[f.annotation_id] << { field_name => value }
if field_name == 'smooch_report_received_at' && f.created_at != f.updated_at
pm_requests[f.annotation_id] << { 'smooch_report_update_received_at' => value }
end
end
requests = []
pm_requests.each do |d_id, fields|
r = {
associated_type: 'ProjectMedia',
associated_id: smooch_pm[d_id],
user_id: smooch_user[d_id],
team_id: team.id,
smooch_request_type: 'default_requests',
smooch_resource_id: nil,
smooch_message_id: '',
smooch_conversation_id: nil,
smooch_report_received_at: 0,
smooch_report_update_received_at: 0,
smooch_report_correction_sent_at: 0,
smooch_report_sent_at: 0,
}.with_indifferent_access
fields.each do |raws|
raws.each{|k, v| r[k] = v }
end
requests << r
end
unless requests.blank?
inserts += requests.count
puts "\n#{team.slug}:: Importing #{inserts}/#{total_count} requests\n"
TiplineRequest.insert_all(requests) unless requests.blank?
end
end
end
end
minutes = ((Time.now.to_i - started) / 60).to_i
puts "[#{Time.now}] Done in #{minutes} minutes."
end
end
end
Loading