Skip to content

Commit

Permalink
Cv2 5082 article indexing to presto (#1994)
Browse files Browse the repository at this point in the history
* CV2-5087 move Articles side effecting saves to to it via presto

* CV2-5082 move article indexing to presto

* resolve test errors

* updates for broken tests

* small tweak

* set to sync

* more fixes

* rename function and revert request

* add response suppression and move to specific path for side effecting requests

* extend similar media to allow for temporary texts

* fix broken test fixture

* revert back to async

* fix another test

* fixes per PR review

* fixes per PR review

* more fixes after review
  • Loading branch information
DGaffney committed Aug 29, 2024
1 parent 0598a50 commit ff95711
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 39 deletions.
95 changes: 67 additions & 28 deletions app/models/concerns/alegre_v2.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require 'active_support/concern'
class AlegreTimeoutError < StandardError; end
class TemporaryProjectMedia
attr_accessor :team_id, :id, :url, :type
attr_accessor :team_id, :id, :url, :text, :type, :field
def media
media_type_map = {
"claim" => "Claim",
Expand Down Expand Up @@ -36,6 +36,10 @@ def is_video?
def is_audio?
self.type == "audio"
end

def is_uploaded_media?
self.is_image? || self.is_audio? || self.is_video?
end
end

module AlegreV2
Expand All @@ -55,11 +59,18 @@ def sync_path_for_type(type)
end

def async_path(project_media)
"/similarity/async/#{get_type(project_media)}"
self.async_path_for_type(get_type(project_media))
end

def async_path_for_type(type)
"/similarity/async/#{type}"
end

def delete_path(project_media)
type = get_type(project_media)
self.delete_path_for_type(get_type(project_media))
end

def delete_path_for_type(type)
"/#{type}/similarity/"
end

Expand Down Expand Up @@ -122,6 +133,10 @@ def request(method, path, params, retries=3)
end
end

def request_delete_from_raw(params, type)
request("delete", delete_path_for_type(type), params)
end

def request_delete(data, project_media)
request("delete", delete_path(project_media), data)
end
Expand All @@ -148,28 +163,32 @@ def get_type(project_media)
type
end

def content_hash_for_value(value)
value.nil? ? nil : Digest::MD5.hexdigest(value)
end

def content_hash(project_media, field)
if Bot::Alegre::ALL_TEXT_SIMILARITY_FIELDS.include?(field)
Digest::MD5.hexdigest(project_media.send(field))
content_hash_for_value(project_media.send(field))
elsif project_media.is_link?
return content_hash_for_value(project_media.media.url)
elsif project_media.is_a?(TemporaryProjectMedia)
return Rails.cache.read("url_sha:#{project_media.url}")
elsif project_media.is_uploaded_media?
return project_media.media.file.filename.split(".").first
else
if project_media.is_link?
return Digest::MD5.hexdigest(project_media.media.url)
elsif project_media.is_a?(TemporaryProjectMedia)
return Rails.cache.read("url_sha:#{project_media.url}")
elsif !project_media.is_text?
return project_media.media.file.filename.split(".").first
else
return Digest::MD5.hexdigest(project_media.send(field).to_s)
end
return content_hash_for_value(project_media.send(field).to_s)
end
end

def generic_package(project_media, field)
{
content_hash: content_hash(project_media, field),
content_hash_value = content_hash(project_media, field)
params = {
doc_id: item_doc_id(project_media, field),
context: get_context(project_media, field)
}
params[:content_hash] = content_hash_value if !content_hash_value.nil?
params
end

def delete_package(project_media, field, params={}, quiet=false)
Expand Down Expand Up @@ -267,6 +286,18 @@ def store_package_text(project_media, field, params)
generic_package_text(project_media, field, params)
end

def index_async_with_params(params, type, suppress_search_response=true)
request("post", async_path_for_type(type), params.merge(suppress_search_response: suppress_search_response))
end

def get_sync_with_params(params, type)
request("post", sync_path_for_type(type), params)
end

def get_async_with_params(params, type)
request("post", async_path_for_type(type), params)
end

def get_sync(project_media, field=nil, params={})
request_sync(
store_package(project_media, field, params),
Expand All @@ -286,6 +317,10 @@ def delete(project_media, field=nil, params={})
delete_package(project_media, field, params),
project_media
)
rescue StandardError => e
error = Error.new(e)
Rails.logger.error("[AutoTagger Bot] Exception for event `#{body['event']}`: #{error.class} - #{error.message}")
CheckSentry.notify(error, bot: "alegre", project_media: project_media, params: params, field: field)
end

def get_per_model_threshold(project_media, threshold)
Expand Down Expand Up @@ -485,25 +520,27 @@ def wait_for_results(project_media, args)
end

def get_items_with_similar_media_v2(args={})
text = args[:text]
field = args[:field]
media_url = args[:media_url]
project_media = args[:project_media]
threshold = args[:threshold]
team_ids = args[:team_ids]
type = args[:type]
if ['audio', 'image', 'video'].include?(type)
if project_media.nil?
project_media = TemporaryProjectMedia.new
project_media.url = media_url
project_media.id = Digest::MD5.hexdigest(project_media.url).to_i(16)
project_media.team_id = team_ids
project_media.type = type
end
get_similar_items_v2_async(project_media, nil, threshold)
wait_for_results(project_media, args)
response = get_similar_items_v2_callback(project_media, nil)
delete(project_media, nil) if project_media.is_a?(TemporaryProjectMedia)
return response
if project_media.nil?
project_media = TemporaryProjectMedia.new
project_media.text = text
project_media.field = field
project_media.url = media_url
project_media.id = Digest::MD5.hexdigest(project_media.url).to_i(16)
project_media.team_id = team_ids
project_media.type = type
end
get_similar_items_v2_async(project_media, nil, threshold)
wait_for_results(project_media, args)
response = get_similar_items_v2_callback(project_media, nil)
delete(project_media, nil) if project_media.is_a?(TemporaryProjectMedia)
return response
end

def process_alegre_callback(params)
Expand All @@ -512,9 +549,11 @@ def process_alegre_callback(params)
should_relate = true
if project_media.nil?
project_media = TemporaryProjectMedia.new
project_media.text = params.dig('data', 'item', 'raw', 'text')
project_media.url = params.dig('data', 'item', 'raw', 'url')
project_media.id = params.dig('data', 'item', 'raw', 'context', 'project_media_id')
project_media.team_id = params.dig('data', 'item', 'raw', 'context', 'team_id')
project_media.field = params.dig('data', 'item', 'raw', 'context', 'field')
project_media.type = params['model_type']
should_relate = false
end
Expand Down
4 changes: 4 additions & 0 deletions app/models/concerns/project_media_getters.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def is_image?
self.is_uploaded_image?
end

def is_uploaded_media?
self.is_image? || self.is_audio? || self.is_video?
end

def is_text?
self.is_claim? || self.is_link?
end
Expand Down
14 changes: 8 additions & 6 deletions app/models/explainer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,26 @@ def self.update_paragraphs_in_alegre(id, previous_paragraphs_count, timestamp)

# Index title
params = {
content_hash: Bot::Alegre.content_hash_for_value(explainer.title),
doc_id: Digest::MD5.hexdigest(['explainer', explainer.id, 'title'].join(':')),
context: base_context.merge({ field: 'title' }),
text: explainer.title,
models: ALEGRE_MODELS_AND_THRESHOLDS.keys,
context: base_context.merge({ field: 'title' })
}
Bot::Alegre.request('post', '/text/similarity/', params)
Bot::Alegre.index_async_with_params(params, "text")

# Index paragraphs
count = 0
explainer.description.to_s.gsub(/\r\n?/, "\n").split(/\n+/).reject{ |paragraph| paragraph.strip.blank? }.each do |paragraph|
count += 1
params = {
content_hash: Bot::Alegre.content_hash_for_value(paragraph.strip),
doc_id: Digest::MD5.hexdigest(['explainer', explainer.id, 'paragraph', count].join(':')),
context: base_context.merge({ paragraph: count }),
text: paragraph.strip,
models: ALEGRE_MODELS_AND_THRESHOLDS.keys,
context: base_context.merge({ paragraph: count })
}
Bot::Alegre.request('post', '/text/similarity/', params)
Bot::Alegre.index_async_with_params(params, "text")
end

# Remove paragraphs that don't exist anymore (we delete after updating in order to avoid race conditions)
Expand All @@ -99,7 +101,7 @@ def self.update_paragraphs_in_alegre(id, previous_paragraphs_count, timestamp)
quiet: true,
context: base_context.merge({ paragraph: count })
}
Bot::Alegre.request('delete', '/text/similarity/', params)
Bot::Alegre.request_delete_from_raw(params, "text")
end
end

Expand All @@ -114,7 +116,7 @@ def self.search_by_similarity(text, language, team_id)
language: language
}
}
response = Bot::Alegre.request('post', '/text/similarity/search/', params)
response = Bot::Alegre.get_async_with_params(params, "text")
results = response['result'].to_a.sort_by{ |result| result['_score'] }
explainer_ids = results.collect{ |result| result.dig('_source', 'context', 'explainer_id').to_i }.uniq.first(3)
explainer_ids.empty? ? Explainer.none : Explainer.where(team_id: team_id, id: explainer_ids)
Expand Down
8 changes: 5 additions & 3 deletions test/models/bot/smooch_6_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def send_message_outside_24_hours_window(template, pm = nil)

test "should submit query without details on tipline bot v2" do
WebMock.stub_request(:post, /\/text\/similarity\/search\//).to_return(body: {}.to_json) # For explainers
WebMock.stub_request(:post, /\/similarity\/async\/text/).to_return(body: {}.to_json) # For explainers
claim = 'This is a test claim'
send_message 'hello', '1', '1', random_string, random_string, claim, random_string, random_string, '1'
assert_saved_query_type 'default_requests'
Expand Down Expand Up @@ -208,6 +209,7 @@ def send_message_outside_24_hours_window(template, pm = nil)
end

test "should submit query with details on tipline bot v2" do
WebMock.stub_request(:post, /\/similarity\/async\/text/).to_return(body: {}.to_json) # For explainers
WebMock.stub_request(:post, /\/text\/similarity\/search\//).to_return(body: {}.to_json) # For explainers
claim = 'This is a test claim'
send_message 'hello', '1', '1', random_string, '2', random_string, claim, '1'
Expand Down Expand Up @@ -285,7 +287,7 @@ def send_message_outside_24_hours_window(template, pm = nil)
end

test "should submit query and handle search error on tipline bot v2" do
WebMock.stub_request(:post, /\/text\/similarity\/search\//).to_return(body: {}.to_json) # For explainers
WebMock.stub_request(:post, /\/similarity\/async\/text/).to_return(body: {}.to_json) # For explainers
CheckSearch.any_instance.stubs(:medias).raises(StandardError)
Sidekiq::Testing.inline! do
send_message 'hello', '1', '1', 'Foo bar', '1'
Expand Down Expand Up @@ -384,7 +386,7 @@ def send_message_outside_24_hours_window(template, pm = nil)
ProjectMedia.any_instance.stubs(:report_status).returns('published')
ProjectMedia.any_instance.stubs(:analysis_published_article_url).returns(random_url)
Bot::Alegre.stubs(:get_merged_similar_items).returns({ create_project_media.id => { score: 0.9 } })
WebMock.stub_request(:post, /\/text\/similarity\/search\//).to_return(body: {}.to_json) # For explainers
WebMock.stub_request(:post, /\/similarity\/async\/text/).to_return(body: {}.to_json) # For explainers
Sidekiq::Testing.inline! do
send_message 'hello', '1', '1', "Foo bar foo bar #{url} foo bar", '1'
end
Expand Down Expand Up @@ -693,7 +695,7 @@ def send_message_outside_24_hours_window(template, pm = nil)
pm = create_project_media team: @team
publish_report(pm, {}, nil, { language: 'pt', use_visual_card: false })
Bot::Smooch.stubs(:get_search_results).returns([pm])
WebMock.stub_request(:post, /\/text\/similarity\/search\//).to_return(body: {}.to_json) # For explainers
WebMock.stub_request(:post, /\/similarity\/async\/text/).to_return(body: {}.to_json) # For explainers
Sidekiq::Testing.inline! do
send_message 'hello', '1', '1', 'Foo bar', '1'
end
Expand Down
1 change: 1 addition & 0 deletions test/models/bot/smooch_7_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ def teardown
end

test "should include claim_description_content in smooch search" do
WebMock.stub_request(:post, 'http://alegre:3100/similarity/async/image').to_return(body: {}.to_json)
WebMock.stub_request(:post, 'http://alegre:3100/text/similarity/').to_return(body: {}.to_json)
RequestStore.store[:skip_cached_field_update] = false
t = create_team
Expand Down
4 changes: 2 additions & 2 deletions test/models/explainer_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ def setup
}

# Index two paragraphs and title when the explainer is created
Bot::Alegre.stubs(:request).with('post', '/text/similarity/', anything).times(3)
Bot::Alegre.stubs(:request).with('post', '/similarity/async/text', anything).times(3)
Bot::Alegre.stubs(:request).with('delete', '/text/similarity/', anything).never
ex = create_explainer description: description

# Update the index when paragraphs change
Bot::Alegre.stubs(:request).with('post', '/text/similarity/', anything).times(2)
Bot::Alegre.stubs(:request).with('post', '/similarity/async/text', anything).times(2)
Bot::Alegre.stubs(:request).with('delete', '/text/similarity/', anything).once
ex = Explainer.find(ex.id)
ex.description = 'Now this is the only paragraph'
Expand Down

0 comments on commit ff95711

Please sign in to comment.