diff --git a/Gemfile.lock b/Gemfile.lock deleted file mode 100644 index bfa88ddc..00000000 --- a/Gemfile.lock +++ /dev/null @@ -1,217 +0,0 @@ -GIT - remote: https://github.com/ncbo/goo.git - revision: 5d911f587f22059581ae3e713e5fe9bde9de82e7 - branch: develop - specs: - goo (0.0.2) - addressable (~> 2.8) - pry - rdf (= 1.0.8) - redis - request_store - rest-client - rsolr - sparql-client - uuid - -GIT - remote: https://github.com/ncbo/sparql-client.git - revision: 1657f0dd69fd4b522d3549a6848670175f5e98cc - branch: develop - specs: - sparql-client (1.0.1) - json_pure (>= 1.4) - net-http-persistent (= 2.9.4) - rdf (>= 1.0) - -GEM - remote: https://rubygems.org/ - specs: - activesupport (4.0.13) - i18n (~> 0.6, >= 0.6.9) - minitest (~> 4.2) - multi_json (~> 1.3) - thread_safe (~> 0.1) - tzinfo (~> 0.3.37) - addressable (2.8.6) - public_suffix (>= 2.0.2, < 6.0) - ansi (1.5.0) - ast (2.4.2) - base64 (0.2.0) - bcrypt (3.1.20) - bigdecimal (3.1.7) - builder (3.2.4) - coderay (1.1.3) - concurrent-ruby (1.2.3) - connection_pool (2.4.1) - cube-ruby (0.0.3) - daemons (1.4.1) - date (3.3.4) - docile (1.4.0) - domain_name (0.6.20240107) - email_spec (2.2.2) - htmlentities (~> 4.3.3) - launchy (~> 2.1) - mail (~> 2.7) - eventmachine (1.2.7) - faraday (2.8.1) - base64 - faraday-net_http (>= 2.0, < 3.1) - ruby2_keywords (>= 0.0.4) - faraday-net_http (3.0.2) - ffi (1.16.3) - hashie (5.0.0) - htmlentities (4.3.4) - http-accept (1.7.0) - http-cookie (1.0.5) - domain_name (~> 0.5) - i18n (0.9.5) - concurrent-ruby (~> 1.0) - json (2.7.2) - json_pure (2.7.2) - language_server-protocol (3.17.0.3) - launchy (2.5.2) - addressable (~> 2.8) - libxml-ruby (5.0.3) - logger (1.6.0) - macaddr (1.7.2) - systemu (~> 2.6.5) - mail (2.8.1) - mini_mime (>= 0.1.1) - net-imap - net-pop - net-smtp - method_source (1.1.0) - mime-types (3.5.2) - mime-types-data (~> 3.2015) - mime-types-data (3.2024.0305) - mini_mime (1.1.5) - minitest (4.7.5) - minitest-reporters (0.14.24) - ansi - builder - minitest (>= 2.12, < 5.0) - powerbar - multi_json (1.15.0) - net-http-persistent (2.9.4) - net-imap (0.4.10) - date - net-protocol - net-pop (0.1.2) - net-protocol - net-protocol (0.2.2) - timeout - net-smtp (0.5.0) - net-protocol - netrc (0.11.0) - oj (3.16.3) - bigdecimal (>= 3.0) - omni_logger (0.1.4) - logger - parallel (1.24.0) - parser (3.3.0.5) - ast (~> 2.4.1) - racc - pony (1.13.1) - mail (>= 2.0) - powerbar (2.0.1) - hashie (>= 1.1.0) - pry (0.14.2) - coderay (~> 1.1) - method_source (~> 1.0) - public_suffix (5.0.5) - racc (1.7.3) - rack (2.2.9) - rack-test (0.8.3) - rack (>= 1.0, < 3) - rainbow (3.1.1) - rake (10.5.0) - rdf (1.0.8) - addressable (>= 2.2) - redis (5.2.0) - redis-client (>= 0.22.0) - redis-client (0.22.1) - connection_pool - regexp_parser (2.9.0) - request_store (1.6.0) - rack (>= 1.4) - rest-client (2.1.0) - http-accept (>= 1.7.0, < 2.0) - http-cookie (>= 1.0.2, < 2.0) - mime-types (>= 1.16, < 4.0) - netrc (~> 0.8) - rexml (3.2.6) - rsolr (2.6.0) - builder (>= 2.1.2) - faraday (>= 0.9, < 3, != 2.0.0) - rubocop (1.63.3) - json (~> 2.3) - language_server-protocol (>= 3.17.0) - parallel (~> 1.10) - parser (>= 3.3.0.2) - rainbow (>= 2.2.2, < 4.0) - regexp_parser (>= 1.8, < 3.0) - rexml (>= 3.2.5, < 4.0) - rubocop-ast (>= 1.31.1, < 2.0) - ruby-progressbar (~> 1.7) - unicode-display_width (>= 2.4.0, < 3.0) - rubocop-ast (1.31.2) - parser (>= 3.3.0.4) - ruby-progressbar (1.13.0) - ruby2_keywords (0.0.5) - rubyzip (1.3.0) - simplecov (0.22.0) - docile (~> 1.1) - simplecov-html (~> 0.11) - simplecov_json_formatter (~> 0.1) - simplecov-cobertura (2.1.0) - rexml - simplecov (~> 0.19) - simplecov-html (0.12.3) - simplecov_json_formatter (0.1.4) - systemu (2.6.5) - thin (1.8.2) - daemons (~> 1.0, >= 1.0.9) - eventmachine (~> 1.0, >= 1.0.4) - rack (>= 1, < 3) - thread_safe (0.3.6) - timeout (0.4.1) - tzinfo (0.3.62) - unicode-display_width (2.5.0) - uuid (2.3.9) - macaddr (~> 1.0) - -PLATFORMS - ruby - x86_64-darwin-18 - -DEPENDENCIES - activesupport (~> 4) - addressable (~> 2.8) - bcrypt (~> 3.0) - cube-ruby - email_spec - ffi - goo! - libxml-ruby - minitest (~> 4) - minitest-reporters (>= 0.5.0) - multi_json (~> 1.0) - oj (~> 3.0) - omni_logger - pony - pry - rack - rack-test (~> 0.6) - rake (~> 10.0) - rest-client - rsolr - rubocop - rubyzip (~> 1.0) - simplecov - simplecov-cobertura - sparql-client! - thin - -BUNDLED WITH - 2.4.22 diff --git a/lib/ontologies_linked_data.rb b/lib/ontologies_linked_data.rb index fda99285..15c2fee9 100644 --- a/lib/ontologies_linked_data.rb +++ b/lib/ontologies_linked_data.rb @@ -1,11 +1,11 @@ -require "goo" +require 'goo' # Make sure we're in the load path -lib_dir = File.dirname(__FILE__)+"/../lib" +lib_dir = "#{File.dirname(__FILE__)}/../lib" $LOAD_PATH.unshift lib_dir unless $LOAD_PATH.include?(lib_dir) # Setup Goo (repo connection and namespaces) -require "ontologies_linked_data/config/config" +require 'ontologies_linked_data/config/config' # Include other dependent code require "ontologies_linked_data/security/authorization" @@ -31,27 +31,47 @@ require "ontologies_linked_data/metrics/metrics" # Require base model -require "ontologies_linked_data/models/base" +require 'ontologies_linked_data/models/base' -# Require all models + + + +# Require all models and services project_root = File.dirname(File.absolute_path(__FILE__)) +# Require base services +require 'ontologies_linked_data/services/submission_process/submission_process' + +# We need to require deterministic - that is why we have the sort. + +models = Dir.glob("#{project_root}/ontologies_linked_data/services/**/*.rb").sort +models.each do |m| + require m +end + +# We need to require deterministic - that is why we have the sort. +models = Dir.glob("#{project_root}/ontologies_linked_data/models/concerns//**/*.rb").sort +models.each do |m| + require m +end models = Dir.glob("#{project_root}/ontologies_linked_data/concerns/**/*.rb").sort models.each do |m| require m end # We need to require deterministic - that is why we have the sort. -models = Dir.glob(project_root + '/ontologies_linked_data/models/**/*.rb').sort +models = Dir.glob("#{project_root}/ontologies_linked_data/models/**/*.rb").sort models.each do |m| require m end + + module LinkedData def rootdir File.dirname(File.absolute_path(__FILE__)) end def bindir - File.expand_path(rootdir + '/../bin') + File.expand_path("#{rootdir}/../bin") end end diff --git a/lib/ontologies_linked_data/diff/bubastis_diff.rb b/lib/ontologies_linked_data/diff/bubastis_diff.rb index 9a21b2d6..ff72ffb4 100644 --- a/lib/ontologies_linked_data/diff/bubastis_diff.rb +++ b/lib/ontologies_linked_data/diff/bubastis_diff.rb @@ -10,7 +10,7 @@ class InputFileNotFoundError < Diff::DiffException class DiffFileNotGeneratedException < Diff::DiffException end - class BubastisDiffCommand + class BubastisDiffCommand < DiffTool # Bubastis version 1.2 # 18th December 2014 @@ -37,15 +37,33 @@ class BubastisDiffCommand # Loading one file locally and one from the web and outputting results to plain text: # java -jar bubastis_1_2.jar -ontology1 "H://disease_ontology_version_1.owl" -ontology2 "http://www.disease.org/diseaseontology_latest.owl" -output "C://my_diff.txt" - def initialize(input_fileOld, input_fileNew, output_repo) + def initialize(old_file_path, new_file_path) @bubastis_jar_path = LinkedData.bindir + "/bubastis.jar" - @input_fileOld = input_fileOld - @input_fileNew = input_fileNew - @output_repo = output_repo + @input_fileOld = old_file_path + @input_fileNew = new_file_path + @output_repo = File.expand_path(@input_fileNew).gsub(File.basename(@input_fileNew),'') @file_diff_path = nil @java_heap_size = LinkedData.settings.java_max_heap_size end + + def file_diff_path + @file_diff_path + end + + def diff + setup_environment + call_bubastis_java_cmd + if @file_diff_path.nil? + raise DiffFileNotGeneratedException, "Diff file nil" + elsif not File.exist?(@file_diff_path) + raise DiffFileNotGeneratedException, "Diff file not found in #{@file_diff_path}" + end + return @file_diff_path + end + + private + def setup_environment if @input_fileOld.nil? or (not File.exist?(@input_fileOld)) raise InputFileNotFoundError, "#{@input_fileOld} not found." @@ -105,21 +123,6 @@ def call_bubastis_java_cmd end return @file_diff_path end - - def file_diff_path - @file_diff_path - end - - def diff - setup_environment - call_bubastis_java_cmd - if @file_diff_path.nil? - raise DiffFileNotGeneratedException, "Diff file nil" - elsif not File.exist?(@file_diff_path) - raise DiffFileNotGeneratedException, "Diff file not found in #{@file_diff_path}" - end - return @file_diff_path - end end end end diff --git a/lib/ontologies_linked_data/diff/diff.rb b/lib/ontologies_linked_data/diff/diff.rb index 32b054f8..3c89326e 100644 --- a/lib/ontologies_linked_data/diff/diff.rb +++ b/lib/ontologies_linked_data/diff/diff.rb @@ -1,8 +1,20 @@ module LinkedData module Diff - class <") + hop = hop.sub('parent', "<#{root.to_s}>") else - hop = hop.sub("parent", "?x#{i-1}") + hop = hop.sub('parent', "?x#{i-1}") end hops << hop vars << "?x#{i}" end joins = hops.join(".\n") - vars = vars.join(" ") + vars = vars.join(' ') query = < { @@ -257,7 +263,7 @@ def self.hierarchy_depth?(graph,root,n,treeProp) def self.query_count_definitions(subId,defProps) propFilter = defProps.map { |x| "?p = <#{x.to_s}>" } - propFilter = propFilter.join " || " + propFilter = propFilter.join ' || ' query = <<-eos SELECT (count(DISTINCT ?s) as ?c) WHERE { GRAPH <#{subId.to_s}> { @@ -268,7 +274,7 @@ def self.query_count_definitions(subId,defProps) FILTER (?s != <#{Goo.namespaces[:owl][:Thing]}>) }} eos - query = query.sub("properties", propFilter) + query = query.sub('properties', propFilter) rs = Goo.sparql_query_client.query(query) rs.each do |sol| return sol[:c].object diff --git a/lib/ontologies_linked_data/models/concerns/submission_process.rb b/lib/ontologies_linked_data/models/concerns/submission_process.rb new file mode 100644 index 00000000..fa19df40 --- /dev/null +++ b/lib/ontologies_linked_data/models/concerns/submission_process.rb @@ -0,0 +1,40 @@ +module LinkedData + module Concerns + module SubmissionProcessable + + def process_submission(logger, options={}) + LinkedData::Services::OntologyProcessor.new(self).process(logger, options) + end + + def diff(logger, older) + LinkedData::Services::SubmissionDiffGenerator.new(self).diff(logger, older) + end + + def generate_diff(logger) + LinkedData::Services::SubmissionDiffGenerator.new(self).process(logger) + end + + def index(logger, commit: true, optimize: true) + LinkedData::Services::OntologySubmissionIndexer.new(self).process(logger, commit: commit, optimize: optimize) + end + + def index_properties(logger, commit: true, optimize: true) + LinkedData::Services::SubmissionPropertiesIndexer.new(self).process(logger, commit: commit, optimize: optimize) + end + + def archive + LinkedData::Services::OntologySubmissionArchiver.new(self ).process + end + + def generate_rdf(logger, reasoning: true) + LinkedData::Services::SubmissionRDFGenerator.new(self).process(logger, reasoning: reasoning) + end + + def generate_metrics(logger) + LinkedData::Services::SubmissionMetricsCalculator.new(self).process(logger) + end + + end + end +end + diff --git a/lib/ontologies_linked_data/models/ontology_submission.rb b/lib/ontologies_linked_data/models/ontology_submission.rb index 41691c1a..2cfe58c6 100644 --- a/lib/ontologies_linked_data/models/ontology_submission.rb +++ b/lib/ontologies_linked_data/models/ontology_submission.rb @@ -12,6 +12,7 @@ module Models class OntologySubmission < LinkedData::Models::Base + include LinkedData::Concerns::SubmissionProcessable include LinkedData::Concerns::OntologySubmission::MetadataExtractor FILES_TO_DELETE = ['labels.ttl', 'mappings.ttl', 'obsolete.ttl', 'owlapi.xrdf', 'errors.log'] @@ -329,37 +330,6 @@ def unzip_submission(logger) zip_dst end - def delete_old_submission_files - path_to_repo = data_folder - submission_files = FILES_TO_DELETE.map { |f| File.join(path_to_repo, f) } - submission_files.push(csv_path) - submission_files.push(parsing_log_path) unless parsing_log_path.nil? - FileUtils.rm(submission_files, force: true) - end - - # accepts another submission in 'older' (it should be an 'older' ontology version) - def diff(logger, older) - begin - bring_remaining - bring :diffFilePath if bring? :diffFilePath - older.bring :uploadFilePath if older.bring? :uploadFilePath - - LinkedData::Diff.logger = logger - bubastis = LinkedData::Diff::BubastisDiffCommand.new( - File.expand_path(older.master_file_path), - File.expand_path(self.master_file_path), - data_folder - ) - self.diffFilePath = bubastis.diff - save - logger.info("Bubastis diff generated successfully for #{self.id}") - logger.flush - rescue Exception => e - logger.error("Bubastis diff for #{self.id} failed - #{e.class}: #{e.message}") - logger.flush - raise e - end - end def class_count(logger=nil) logger ||= LinkedData::Parser.logger || Logger.new($stderr) @@ -420,397 +390,6 @@ def metrics_from_file(logger=nil) metrics end - def generate_metrics_file(class_count, indiv_count, prop_count, max_depth) - CSV.open(self.metrics_path, "wb") do |csv| - csv << ["Class Count", "Individual Count", "Property Count", "Max Depth"] - csv << [class_count, indiv_count, prop_count, max_depth] - end - end - - def generate_umls_metrics_file(tr_file_path=nil) - tr_file_path ||= triples_file_path - class_count = 0 - indiv_count = 0 - prop_count = 0 - max_depth = 0 - - File.foreach(tr_file_path) do |line| - class_count += 1 if line =~ /owl:Class/ - indiv_count += 1 if line =~ /owl:NamedIndividual/ - prop_count += 1 if line =~ /owl:ObjectProperty/ - prop_count += 1 if line =~ /owl:DatatypeProperty/ - end - - # Get max depth from the metrics.csv file which is already generated - # by owlapi_wrapper when new submission of UMLS ontology is created. - # Ruby code/sparql for calculating max_depth fails for large UMLS - # ontologie with AllegroGraph backend - metrics_from_owlapi = metrics_from_file - max_depth = metrics_from_owlapi[1][3] unless metrics_from_owlapi.empty? - - generate_metrics_file(class_count, indiv_count, prop_count, max_depth) - end - - def generate_rdf(logger, reasoning: true) - mime_type = nil - - if self.hasOntologyLanguage.umls? - triples_file_path = self.triples_file_path - logger.info("UMLS turtle file found; doing OWLAPI parse to extract metrics") - logger.flush - mime_type = LinkedData::MediaTypes.media_type_from_base(LinkedData::MediaTypes::TURTLE) - generate_umls_metrics_file(triples_file_path) - else - output_rdf = self.rdf_path - - if File.exist?(output_rdf) - logger.info("deleting old owlapi.xrdf ..") - deleted = FileUtils.rm(output_rdf) - - if deleted.length > 0 - logger.info("deleted") - else - logger.info("error deleting owlapi.rdf") - end - end - owlapi = owlapi_parser(logger: logger) - owlapi.disable_reasoner unless reasoning - triples_file_path, missing_imports = owlapi.parse - - if missing_imports && missing_imports.length > 0 - self.missingImports = missing_imports - - missing_imports.each do |imp| - logger.info("OWL_IMPORT_MISSING: #{imp}") - end - else - self.missingImports = nil - end - logger.flush - # debug code when you need to avoid re-generating the owlapi.xrdf file, - # comment out the block above and uncomment the line below - # triples_file_path = output_rdf - end - - begin - delete_and_append(triples_file_path, logger, mime_type) - rescue => e - logger.error("Error sending data to triple store - #{e.response.code} #{e.class}: #{e.response.body}") if e.response&.body - raise e - end - end - - - def process_callbacks(logger, callbacks, action_name, &block) - callbacks.delete_if do |_, callback| - begin - if callback[action_name] - callable = self.method(callback[action_name]) - yield(callable, callback) - end - false - rescue Exception => e - logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") - logger.flush - - if callback[:status] - add_submission_status(callback[:status].get_error_status) - self.save - end - - # halt the entire processing if :required is set to true - raise e if callback[:required] - # continue processing of other callbacks, but not this one - true - end - end - end - - - def loop_classes(logger, raw_paging, callbacks) - page = 1 - size = 2500 - count_classes = 0 - acr = self.id.to_s.split("/")[-1] - operations = callbacks.values.map { |v| v[:op_name] }.join(", ") - - time = Benchmark.realtime do - paging = raw_paging.page(page, size) - cls_count_set = false - cls_count = class_count(logger) - - if cls_count > -1 - # prevent a COUNT SPARQL query if possible - paging.page_count_set(cls_count) - cls_count_set = true - else - cls_count = 0 - end - - iterate_classes = false - # 1. init artifacts hash if not explicitly passed in the callback - # 2. determine if class-level iteration is required - callbacks.each { |_, callback| callback[:artifacts] ||= {}; iterate_classes = true if callback[:caller_on_each] } - - process_callbacks(logger, callbacks, :caller_on_pre) { - |callable, callback| callable.call(callback[:artifacts], logger, paging) } - - page_len = -1 - prev_page_len = -1 - - begin - t0 = Time.now - page_classes = paging.page(page, size).all - total_pages = page_classes.total_pages - page_len = page_classes.length - - # nothing retrieved even though we're expecting more records - if total_pages > 0 && page_classes.empty? && (prev_page_len == -1 || prev_page_len == size) - j = 0 - num_calls = LinkedData.settings.num_retries_4store - - while page_classes.empty? && j < num_calls do - j += 1 - logger.error("Empty page encountered. Retrying #{j} times...") - sleep(2) - page_classes = paging.page(page, size).all - logger.info("Success retrieving a page of #{page_classes.length} classes after retrying #{j} times...") unless page_classes.empty? - end - - if page_classes.empty? - msg = "Empty page #{page} of #{total_pages} persisted after retrying #{j} times. #{operations} of #{acr} aborted..." - logger.error(msg) - raise msg - end - end - - if page_classes.empty? - if total_pages > 0 - logger.info("The number of pages reported for #{acr} - #{total_pages} is higher than expected #{page - 1}. Completing #{operations}...") - else - logger.info("Ontology #{acr} contains #{total_pages} pages...") - end - break - end - - prev_page_len = page_len - logger.info("#{acr}: page #{page} of #{total_pages} - #{page_len} ontology terms retrieved in #{Time.now - t0} sec.") - logger.flush - count_classes += page_classes.length - - process_callbacks(logger, callbacks, :caller_on_pre_page) { - |callable, callback| callable.call(callback[:artifacts], logger, paging, page_classes, page) } - - page_classes.each { |c| - process_callbacks(logger, callbacks, :caller_on_each) { - |callable, callback| callable.call(callback[:artifacts], logger, paging, page_classes, page, c) } - } if iterate_classes - - process_callbacks(logger, callbacks, :caller_on_post_page) { - |callable, callback| callable.call(callback[:artifacts], logger, paging, page_classes, page) } - cls_count += page_classes.length unless cls_count_set - - page = page_classes.next? ? page + 1 : nil - # page = nil if page > 20 # uncomment for testing fewer pages - end while !page.nil? - - callbacks.each { |_, callback| callback[:artifacts][:count_classes] = cls_count } - process_callbacks(logger, callbacks, :caller_on_post) { - |callable, callback| callable.call(callback[:artifacts], logger, paging) } - end - - logger.info("Completed #{operations}: #{acr} in #{time} sec. #{count_classes} classes.") - logger.flush - - # set the status on actions that have completed successfully - callbacks.each do |_, callback| - if callback[:status] - add_submission_status(callback[:status]) - self.save - end - end - end - - def generate_missing_labels_pre(artifacts={}, logger, paging) - file_path = artifacts[:file_path] - artifacts[:save_in_file] = File.join(File.dirname(file_path), "labels.ttl") - artifacts[:save_in_file_mappings] = File.join(File.dirname(file_path), "mappings.ttl") - # troubleshooting code to output the class ids used in pagination - # class_list_file = File.join(File.dirname(file_path), "class_ids.ttl") - # f_class_list = File.open(class_list_file, "w") - # artifacts[:class_list] = f_class_list - property_triples = LinkedData::Utils::Triples.rdf_for_custom_properties(self) - Goo.sparql_data_client.append_triples(self.id, property_triples, mime_type="application/x-turtle") - fsave = File.open(artifacts[:save_in_file], "w") - fsave.write("#{property_triples}\n") - fsave_mappings = File.open(artifacts[:save_in_file_mappings], "w") - artifacts[:fsave] = fsave - artifacts[:fsave_mappings] = fsave_mappings - end - - def generate_missing_labels_pre_page(artifacts={}, logger, paging, page_classes, page) - artifacts[:label_triples] = [] - artifacts[:mapping_triples] = [] - end - - def generate_missing_labels_each(artifacts={}, logger, paging, page_classes, page, c) - prefLabel = nil - # troubleshooting code to output the class ids used in pagination - # logger.info("Generated label for class: #{c.id.to_s}") - # artifacts[:class_list].write(c.id.to_s + "\n") - - if c.prefLabel.nil? - rdfs_labels = c.label - - if rdfs_labels && rdfs_labels.length > 1 && c.synonym.length > 0 - rdfs_labels = (Set.new(c.label) - Set.new(c.synonym)).to_a.first - - if rdfs_labels.nil? || rdfs_labels.length == 0 - rdfs_labels = c.label - end - end - - if rdfs_labels and not (rdfs_labels.instance_of? Array) - rdfs_labels = [rdfs_labels] - end - label = nil - - if rdfs_labels && rdfs_labels.length > 0 - # this sort is needed for a predictable label selection - label = rdfs_labels.sort[0] - else - label = LinkedData::Utils::Triples.last_iri_fragment c.id.to_s - end - artifacts[:label_triples] << LinkedData::Utils::Triples.label_for_class_triple( - c.id, Goo.vocabulary(:metadata_def)[:prefLabel], label) - prefLabel = label - else - prefLabel = c.prefLabel - end - - if self.ontology.viewOf.nil? - loomLabel = OntologySubmission.loom_transform_literal(prefLabel.to_s) - - if loomLabel.length > 2 - artifacts[:mapping_triples] << LinkedData::Utils::Triples.loom_mapping_triple( - c.id, Goo.vocabulary(:metadata_def)[:mappingLoom], loomLabel) - end - artifacts[:mapping_triples] << LinkedData::Utils::Triples.uri_mapping_triple( - c.id, Goo.vocabulary(:metadata_def)[:mappingSameURI], c.id) - end - end - - def generate_missing_labels_post_page(artifacts={}, logger, paging, page_classes, page) - rest_mappings = LinkedData::Mappings.migrate_rest_mappings(self.ontology.acronym) - artifacts[:mapping_triples].concat(rest_mappings) - - if artifacts[:label_triples].length > 0 - logger.info("Writing #{artifacts[:label_triples].length} labels to file for #{self.id.to_ntriples}") - logger.flush - artifacts[:label_triples] = artifacts[:label_triples].join("\n") - artifacts[:fsave].write(artifacts[:label_triples]) - else - logger.info("No labels generated in page #{page}.") - logger.flush - end - - if artifacts[:mapping_triples].length > 0 - logger.info("Writing #{artifacts[:mapping_triples].length} mapping labels to file for #{self.id.to_ntriples}") - logger.flush - artifacts[:mapping_triples] = artifacts[:mapping_triples].join("\n") - artifacts[:fsave_mappings].write(artifacts[:mapping_triples]) - end - end - - def generate_missing_labels_post(artifacts={}, logger, paging) - logger.info("end generate_missing_labels traversed #{artifacts[:count_classes]} classes") - logger.info("Saved generated labels in #{artifacts[:save_in_file]}") - artifacts[:fsave].close() - artifacts[:fsave_mappings].close() - - all_labels = File.read(artifacts[:fsave].path) - - unless all_labels.empty? - t0 = Time.now - Goo.sparql_data_client.append_triples(self.id, all_labels, mime_type="application/x-turtle") - t1 = Time.now - logger.info("Wrote #{all_labels.lines.count} labels for #{self.id.to_ntriples} to triple store in #{t1 - t0} sec.") - logger.flush - end - - all_mapping_labels = File.read(artifacts[:fsave_mappings].path) - - unless all_mapping_labels.empty? - t0 = Time.now - Goo.sparql_data_client.append_triples(self.id, all_mapping_labels, mime_type="application/x-turtle") - t1 = Time.now - logger.info("Wrote #{all_mapping_labels.lines.count} mapping labels for #{self.id.to_ntriples} to triple store in #{t1 - t0} sec.") - logger.flush - end - - # troubleshooting code to output the class ids used in pagination - # artifacts[:class_list].close() - end - - def generate_obsolete_classes(logger, file_path) - self.bring(:obsoleteProperty) if self.bring?(:obsoleteProperty) - self.bring(:obsoleteParent) if self.bring?(:obsoleteParent) - classes_deprecated = [] - if self.obsoleteProperty && - self.obsoleteProperty.to_s != "http://www.w3.org/2002/07/owl#deprecated" - - predicate_obsolete = RDF::URI.new(self.obsoleteProperty.to_s) - query_obsolete_predicate = < 0 - classes_deprecated.uniq! - logger.info("Asserting owl:deprecated statement for #{classes_deprecated} classes") - save_in_file = File.join(File.dirname(file_path), "obsolete.ttl") - fsave = File.open(save_in_file,"w") - classes_deprecated.each do |class_id| - fsave.write(LinkedData::Utils::Triples.obselete_class_triple(class_id) + "\n") - end - fsave.close() - result = Goo.sparql_data_client.append_triples_from_file( - self.id, - save_in_file, - mime_type="application/x-turtle") - end - end - def add_submission_status(status) valid = status.is_a?(LinkedData::Models::SubmissionStatus) raise ArgumentError, "The status being added is not SubmissionStatus object" unless valid @@ -891,499 +470,6 @@ def archived? return ready?(status: [:archived]) end - ################################################################ - # Possible options with their defaults: - # process_rdf = false - # generate_labels = true - # index_search = false - # index_properties = false - # index_commit = false - # run_metrics = false - # reasoning = false - # diff = false - # archive = false - # if no options passed, ALL actions, except for archive = true - ################################################################ - def process_submission(logger, options={}) - # Wrap the whole process so we can email results - begin - process_rdf = false - generate_labels = false - index_search = false - index_properties = false - index_commit = false - run_metrics = false - reasoning = false - diff = false - archive = false - - if options.empty? - process_rdf = true - generate_labels = true - index_search = true - index_properties = true - index_commit = true - run_metrics = true - reasoning = true - diff = true - archive = false - else - process_rdf = options[:process_rdf] == true ? true : false - - if options.has_key?(:generate_labels) - generate_labels = options[:generate_labels] == false ? false : true - else - generate_labels = process_rdf - end - index_search = options[:index_search] == true ? true : false - index_properties = options[:index_properties] == true ? true : false - run_metrics = options[:run_metrics] == true ? true : false - - if !process_rdf || options[:reasoning] == false - reasoning = false - else - reasoning = true - end - - if (!index_search && !index_properties) || options[:index_commit] == false - index_commit = false - else - index_commit = true - end - - diff = options[:diff] == true ? true : false - archive = options[:archive] == true ? true : false - end - - self.bring_remaining - self.ontology.bring_remaining - - logger.info("Starting to process #{self.ontology.acronym}/submissions/#{self.submissionId}") - logger.flush - LinkedData::Parser.logger = logger - status = nil - - if archive - self.submissionStatus = nil - status = LinkedData::Models::SubmissionStatus.find("ARCHIVED").first - add_submission_status(status) - - # Delete everything except for original ontology file. - ontology.bring(:submissions) - submissions = ontology.submissions - unless submissions.nil? - submissions.each { |s| s.bring(:submissionId) } - submission = submissions.sort { |a,b| b.submissionId <=> a.submissionId }[0] - # Don't perform deletion if this is the most recent submission. - if (self.submissionId < submission.submissionId) - delete_old_submission_files - end - end - else - if process_rdf - # Remove processing status types before starting RDF parsing etc. - self.submissionStatus = nil - status = LinkedData::Models::SubmissionStatus.find("UPLOADED").first - add_submission_status(status) - self.save - - # Parse RDF - begin - if not self.valid? - error = "Submission is not valid, it cannot be processed. Check errors." - raise ArgumentError, error - end - if not self.uploadFilePath - error = "Submission is missing an ontology file, cannot parse." - raise ArgumentError, error - end - status = LinkedData::Models::SubmissionStatus.find("RDF").first - remove_submission_status(status) #remove RDF status before starting - generate_rdf(logger, reasoning: reasoning) - extract_metadata - add_submission_status(status) - self.save - rescue Exception => e - logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") - logger.flush - add_submission_status(status.get_error_status) - self.save - # If RDF generation fails, no point of continuing - raise e - end - - status = LinkedData::Models::SubmissionStatus.find("OBSOLETE").first - begin - generate_obsolete_classes(logger, self.uploadFilePath.to_s) - add_submission_status(status) - self.save - rescue Exception => e - logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") - logger.flush - add_submission_status(status.get_error_status) - self.save - # if obsolete fails the parsing fails - raise e - end - end - - if generate_labels - parsed_rdf = ready?(status: [:rdf]) - raise Exception, "Labels for submission #{self.ontology.acronym}/submissions/#{self.submissionId} cannot be generated because it has not been successfully entered into the triple store" unless parsed_rdf - status = LinkedData::Models::SubmissionStatus.find("RDF_LABELS").first - begin - callbacks = { - missing_labels: { - op_name: "Missing Labels Generation", - required: true, - status: status, - artifacts: { - file_path: self.uploadFilePath.to_s - }, - caller_on_pre: :generate_missing_labels_pre, - caller_on_pre_page: :generate_missing_labels_pre_page, - caller_on_each: :generate_missing_labels_each, - caller_on_post_page: :generate_missing_labels_post_page, - caller_on_post: :generate_missing_labels_post - } - } - - raw_paging = LinkedData::Models::Class.in(self).include(:prefLabel, :synonym, :label) - loop_classes(logger, raw_paging, callbacks) - rescue Exception => e - logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") - logger.flush - add_submission_status(status.get_error_status) - ensure - self.save - end - end - - parsed = ready?(status: [:rdf, :rdf_labels]) - - if index_search - raise Exception, "The submission #{self.ontology.acronym}/submissions/#{self.submissionId} cannot be indexed because it has not been successfully parsed" unless parsed - status = LinkedData::Models::SubmissionStatus.find("INDEXED").first - begin - index(logger, index_commit, false) - add_submission_status(status) - rescue Exception => e - logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") - logger.flush - add_submission_status(status.get_error_status) - if File.file?(self.csv_path) - FileUtils.rm(self.csv_path) - end - ensure - self.save - end - end - - if index_properties - raise Exception, "The properties for the submission #{self.ontology.acronym}/submissions/#{self.submissionId} cannot be indexed because it has not been successfully parsed" unless parsed - status = LinkedData::Models::SubmissionStatus.find("INDEXED_PROPERTIES").first - begin - index_properties(logger, index_commit, false) - add_submission_status(status) - rescue Exception => e - logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") - logger.flush - add_submission_status(status.get_error_status) - ensure - self.save - end - end - - if run_metrics - raise Exception, "Metrics cannot be generated on the submission #{self.ontology.acronym}/submissions/#{self.submissionId} because it has not been successfully parsed" unless parsed - status = LinkedData::Models::SubmissionStatus.find("METRICS").first - begin - process_metrics(logger) - add_submission_status(status) - rescue Exception => e - logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") - logger.flush - self.metrics = nil - add_submission_status(status.get_error_status) - ensure - self.save - end - end - - if diff - status = LinkedData::Models::SubmissionStatus.find("DIFF").first - # Get previous submission from ontology.submissions - self.ontology.bring(:submissions) - submissions = self.ontology.submissions - - unless submissions.nil? - submissions.each {|s| s.bring(:submissionId, :diffFilePath)} - # Sort submissions in descending order of submissionId, extract last two submissions - recent_submissions = submissions.sort {|a, b| b.submissionId <=> a.submissionId}[0..1] - - if recent_submissions.length > 1 - # validate that the most recent submission is the current submission - if self.submissionId == recent_submissions.first.submissionId - prev = recent_submissions.last - - # Ensure that prev is older than the current submission - if self.submissionId > prev.submissionId - # generate a diff - begin - self.diff(logger, prev) - add_submission_status(status) - rescue Exception => e - logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") - logger.flush - add_submission_status(status.get_error_status) - ensure - self.save - end - end - end - else - logger.info("Bubastis diff: no older submissions available for #{self.id}.") - end - else - logger.info("Bubastis diff: no submissions available for #{self.id}.") - end - end - end - - self.save - logger.info("Submission processing of #{self.id} completed successfully") - logger.flush - ensure - # make sure results get emailed - begin - LinkedData::Utils::Notifications.submission_processed(self) unless archive - rescue StandardError => e - logger.error("Email sending failed: #{e.message}\n#{e.backtrace.join("\n\t")}"); logger.flush - end - end - self - end - - def process_metrics(logger) - metrics = LinkedData::Metrics.metrics_for_submission(self, logger) - metrics.id = RDF::URI.new(self.id.to_s + "/metrics") - exist_metrics = LinkedData::Models::Metric.find(metrics.id).first - exist_metrics.delete if exist_metrics - metrics.save - self.metrics = metrics - self - end - - def index(logger, commit = true, optimize = true) - page = 0 - size = 1000 - count_classes = 0 - - time = Benchmark.realtime do - self.bring(:ontology) if self.bring?(:ontology) - self.ontology.bring(:acronym) if self.ontology.bring?(:acronym) - self.ontology.bring(:provisionalClasses) if self.ontology.bring?(:provisionalClasses) - csv_writer = LinkedData::Utils::OntologyCSVWriter.new - csv_writer.open(self.ontology, self.csv_path) - - begin - logger.info("Indexing ontology terms: #{self.ontology.acronym}...") - t0 = Time.now - self.ontology.unindex(false) - logger.info("Removed ontology terms index (#{Time.now - t0}s)"); logger.flush - - paging = LinkedData::Models::Class.in(self).include(:unmapped).aggregate(:count, :children).page(page, size) - cls_count = class_count(logger) - paging.page_count_set(cls_count) unless cls_count < 0 - total_pages = paging.page(1, size).all.total_pages - num_threads = [total_pages, LinkedData.settings.indexing_num_threads].min - threads = [] - page_classes = nil - - num_threads.times do |num| - threads[num] = Thread.new { - Thread.current["done"] = false - Thread.current["page_len"] = -1 - Thread.current["prev_page_len"] = -1 - - while !Thread.current["done"] - synchronize do - page = (page == 0 || page_classes.next?) ? page + 1 : nil - - if page.nil? - Thread.current["done"] = true - else - Thread.current["page"] = page || "nil" - page_classes = paging.page(page, size).all - count_classes += page_classes.length - Thread.current["page_classes"] = page_classes - Thread.current["page_len"] = page_classes.length - Thread.current["t0"] = Time.now - - # nothing retrieved even though we're expecting more records - if total_pages > 0 && page_classes.empty? && (Thread.current["prev_page_len"] == -1 || Thread.current["prev_page_len"] == size) - j = 0 - num_calls = LinkedData.settings.num_retries_4store - - while page_classes.empty? && j < num_calls do - j += 1 - logger.error("Thread #{num + 1}: Empty page encountered. Retrying #{j} times...") - sleep(2) - page_classes = paging.page(page, size).all - logger.info("Thread #{num + 1}: Success retrieving a page of #{page_classes.length} classes after retrying #{j} times...") unless page_classes.empty? - end - - if page_classes.empty? - msg = "Thread #{num + 1}: Empty page #{Thread.current["page"]} of #{total_pages} persisted after retrying #{j} times. Indexing of #{self.id.to_s} aborted..." - logger.error(msg) - raise msg - else - Thread.current["page_classes"] = page_classes - end - end - - if page_classes.empty? - if total_pages > 0 - logger.info("Thread #{num + 1}: The number of pages reported for #{self.id.to_s} - #{total_pages} is higher than expected #{page - 1}. Completing indexing...") - else - logger.info("Thread #{num + 1}: Ontology #{self.id.to_s} contains #{total_pages} pages...") - end - - break - end - - Thread.current["prev_page_len"] = Thread.current["page_len"] - end - end - - break if Thread.current["done"] - - logger.info("Thread #{num + 1}: Page #{Thread.current["page"]} of #{total_pages} - #{Thread.current["page_len"]} ontology terms retrieved in #{Time.now - Thread.current["t0"]} sec.") - Thread.current["t0"] = Time.now - - Thread.current["page_classes"].each do |c| - begin - # this cal is needed for indexing of properties - LinkedData::Models::Class.map_attributes(c, paging.equivalent_predicates) - rescue Exception => e - i = 0 - num_calls = LinkedData.settings.num_retries_4store - success = nil - - while success.nil? && i < num_calls do - i += 1 - logger.error("Thread #{num + 1}: Exception while mapping attributes for #{c.id.to_s}. Retrying #{i} times...") - sleep(2) - - begin - LinkedData::Models::Class.map_attributes(c, paging.equivalent_predicates) - logger.info("Thread #{num + 1}: Success mapping attributes for #{c.id.to_s} after retrying #{i} times...") - success = true - rescue Exception => e1 - success = nil - - if i == num_calls - logger.error("Thread #{num + 1}: Error mapping attributes for #{c.id.to_s}:") - logger.error("Thread #{num + 1}: #{e1.class}: #{e1.message} after retrying #{i} times...\n#{e1.backtrace.join("\n\t")}") - logger.flush - end - end - end - end - - synchronize do - csv_writer.write_class(c) - end - end - logger.info("Thread #{num + 1}: Page #{Thread.current["page"]} of #{total_pages} attributes mapped in #{Time.now - Thread.current["t0"]} sec.") - - Thread.current["t0"] = Time.now - LinkedData::Models::Class.indexBatch(Thread.current["page_classes"]) - logger.info("Thread #{num + 1}: Page #{Thread.current["page"]} of #{total_pages} - #{Thread.current["page_len"]} ontology terms indexed in #{Time.now - Thread.current["t0"]} sec.") - logger.flush - end - } - end - - threads.map { |t| t.join } - csv_writer.close - - begin - # index provisional classes - self.ontology.provisionalClasses.each { |pc| pc.index } - rescue Exception => e - logger.error("Error while indexing provisional classes for ontology #{self.ontology.acronym}:") - logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") - logger.flush - end - - if commit - t0 = Time.now - LinkedData::Models::Class.indexCommit() - logger.info("Ontology terms index commit in #{Time.now - t0} sec.") - end - rescue StandardError => e - csv_writer.close - logger.error("\n\n#{e.class}: #{e.message}\n") - logger.error(e.backtrace) - raise e - end - end - logger.info("Completed indexing ontology terms: #{self.ontology.acronym} in #{time} sec. #{count_classes} classes.") - logger.flush - - if optimize - logger.info("Optimizing ontology terms index...") - time = Benchmark.realtime do - LinkedData::Models::Class.indexOptimize() - end - logger.info("Completed optimizing ontology terms index in #{time} sec.") - end - end - - def index_properties(logger, commit = true, optimize = true) - page = 1 - size = 2500 - count_props = 0 - - time = Benchmark.realtime do - self.bring(:ontology) if self.bring?(:ontology) - self.ontology.bring(:acronym) if self.ontology.bring?(:acronym) - logger.info("Indexing ontology properties: #{self.ontology.acronym}...") - t0 = Time.now - self.ontology.unindex_properties(commit) - logger.info("Removed ontology properties index in #{Time.now - t0} seconds."); logger.flush - - props = self.ontology.properties - count_props = props.length - total_pages = (count_props/size.to_f).ceil - logger.info("Indexing a total of #{total_pages} pages of #{size} properties each.") - - props.each_slice(size) do |prop_batch| - t = Time.now - LinkedData::Models::Class.indexBatch(prop_batch, :property) - logger.info("Page #{page} of ontology properties indexed in #{Time.now - t} seconds."); logger.flush - page += 1 - end - - if commit - t0 = Time.now - LinkedData::Models::Class.indexCommit(nil, :property) - logger.info("Ontology properties index commit in #{Time.now - t0} seconds.") - end - end - logger.info("Completed indexing ontology properties of #{self.ontology.acronym} in #{time} sec. Total of #{count_props} properties indexed.") - logger.flush - - if optimize - logger.info("Optimizing ontology properties index...") - time = Benchmark.realtime do - LinkedData::Models::Class.indexOptimize(nil, :property) - end - logger.info("Completed optimizing ontology properties index in #{time} seconds.") - end - end - # Override delete to add removal from the search index #TODO: revise this with a better process def delete(*args) @@ -1404,7 +490,7 @@ def delete(*args) self.ontology.bring(:submissions) if self.ontology.submissions.length > 0 - prev_sub = self.ontology.latest_submission() + prev_sub = self.ontology.latest_submission if prev_sub prev_sub.index(LinkedData::Parser.logger || Logger.new($stderr)) @@ -1600,34 +686,6 @@ def parsable?(logger: Logger.new($stdout)) private - - def owlapi_parser_input - path = if zipped? - self.zip_folder - else - self.uploadFilePath - end - File.expand_path(path) - end - - - def owlapi_parser(logger: Logger.new($stdout)) - unzip_submission(logger) - LinkedData::Parser::OWLAPICommand.new( - owlapi_parser_input, - File.expand_path(self.data_folder.to_s), - master_file: self.masterFileName, - logger: logger) - end - - - def delete_and_append(triples_file_path, logger, mime_type = nil) - Goo.sparql_data_client.delete_graph(self.id) - Goo.sparql_data_client.put_triples(self.id, triples_file_path, mime_type) - logger.info("Triples #{triples_file_path} appended in #{self.id.to_ntriples}") - logger.flush - end - def check_http_file(url) session = Net::HTTP.new(url.host, url.port) session.use_ssl = true if url.port == 443 diff --git a/lib/ontologies_linked_data/services/submission_process/operations/submission_archiver.rb b/lib/ontologies_linked_data/services/submission_process/operations/submission_archiver.rb new file mode 100644 index 00000000..1e69df0f --- /dev/null +++ b/lib/ontologies_linked_data/services/submission_process/operations/submission_archiver.rb @@ -0,0 +1,42 @@ +module LinkedData + module Services + class OntologySubmissionArchiver < OntologySubmissionProcess + + FILES_TO_DELETE = %w[labels.ttl mappings.ttl obsolete.ttl owlapi.xrdf errors.log] + + + def process + submission_archive + end + + private + def submission_archive + @submission.submissionStatus = nil + status = LinkedData::Models::SubmissionStatus.find("ARCHIVED").first + @submission.add_submission_status(status) + + + # Delete everything except for original ontology file. + @submission.ontology.bring(:submissions) + submissions = @submission.ontology.submissions + unless submissions.nil? + submissions.each { |s| s.bring(:submissionId) } + submission = submissions.sort { |a, b| b.submissionId <=> a.submissionId }.first + # Don't perform deletion if this is the most recent submission. + delete_old_submission_files if @submission.submissionId < submission.submissionId + end + end + + def delete_old_submission_files + path_to_repo = @submission.data_folder + submission_files = FILES_TO_DELETE.map { |f| File.join(path_to_repo, f) } + submission_files.push(@submission.csv_path) + submission_files.push(@submission.parsing_log_path) unless @submission.parsing_log_path.nil? + FileUtils.rm(submission_files, force: true) + end + + end + + + end +end diff --git a/lib/ontologies_linked_data/services/submission_process/operations/submission_diff_generator.rb b/lib/ontologies_linked_data/services/submission_process/operations/submission_diff_generator.rb new file mode 100644 index 00000000..b6dda351 --- /dev/null +++ b/lib/ontologies_linked_data/services/submission_process/operations/submission_diff_generator.rb @@ -0,0 +1,86 @@ +module LinkedData + module Services + class SubmissionDiffGenerator < OntologySubmissionProcess + + def process(logger, options = nil) + process_diff(logger) + end + + def diff(logger, older) + generate_diff(logger, init_diff_tool(older)) + end + + private + + # accepts another submission in 'older' (it should be an 'older' ontology version) + def init_diff_tool(older) + @submission.bring(:uploadFilePath) + older.bring(:uploadFilePath) + + LinkedData::Diff::BubastisDiffCommand.new( + File.expand_path(older.uploadFilePath), + File.expand_path(@submission.uploadFilePath)) + end + + def process_diff(logger) + status = LinkedData::Models::SubmissionStatus.find('DIFF').first + # Get previous submission from ontology.submissions + @submission.ontology.bring(:submissions) + submissions = @submission.ontology.submissions + + if submissions.nil? + logger.info("Diff process: no submissions available for #{@submission.id}.") + else + submissions.each { |s| s.bring(:submissionId, :diffFilePath) } + # Sort submissions in descending order of submissionId, extract last two submissions + recent_submissions = submissions.sort { |a, b| b.submissionId <=> a.submissionId }[0..1] + + if recent_submissions.length > 1 + # validate that the most recent submission is the current submission + if @submission.submissionId == recent_submissions.first.submissionId + prev = recent_submissions.last + + # Ensure that prev is older than the current submission + if @submission.submissionId > prev.submissionId + # generate a diff + begin + diff(logger,prev) + @submission.add_submission_status(status) + rescue Exception => e + logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") + logger.flush + @submission.add_submission_status(status.get_error_status) + ensure + @submission.save + end + end + end + else + logger.info("Diff process: no older submissions available for #{@submission.id}.") + end + end + end + + + def generate_diff(logger, diff_tool) + begin + @submission.bring_remaining + @submission.bring(:diffFilePath) + + LinkedData::Diff.logger = logger + @submission.diffFilePath = diff_tool.diff + @submission.save + logger.info("Diff generated successfully for #{@submission.id}") + logger.flush + rescue StoreError => e + logger.error("Diff process for #{@submission.id} failed - #{e.class}: #{e.message}") + logger.flush + raise e + end + end + + end + end +end + + diff --git a/lib/ontologies_linked_data/services/submission_process/operations/submission_indexer.rb b/lib/ontologies_linked_data/services/submission_process/operations/submission_indexer.rb new file mode 100644 index 00000000..137a3219 --- /dev/null +++ b/lib/ontologies_linked_data/services/submission_process/operations/submission_indexer.rb @@ -0,0 +1,199 @@ +module LinkedData + module Services + class OntologySubmissionIndexer < OntologySubmissionProcess + + def process(logger, options = nil) + process_indexation(logger, options) + end + + private + + def process_indexation(logger, options) + + status = LinkedData::Models::SubmissionStatus.find('INDEXED').first + begin + index(logger, options[:commit], false) + @submission.add_submission_status(status) + rescue Exception => e + logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") + logger.flush + @submission.add_submission_status(status.get_error_status) + if File.file?(@submission.csv_path) + FileUtils.rm(@submission.csv_path) + end + ensure + @submission.save + end + end + + def index(logger, commit = true, optimize = true) + page = 0 + size = 1000 + count_classes = 0 + + time = Benchmark.realtime do + @submission.bring(:ontology) if @submission.bring?(:ontology) + @submission.ontology.bring(:acronym) if @submission.ontology.bring?(:acronym) + @submission.ontology.bring(:provisionalClasses) if @submission.ontology.bring?(:provisionalClasses) + csv_writer = LinkedData::Utils::OntologyCSVWriter.new + csv_writer.open(@submission.ontology, @submission.csv_path) + + begin + logger.info("Indexing ontology terms: #{@submission.ontology.acronym}...") + t0 = Time.now + @submission.ontology.unindex(false) + logger.info("Removed ontology terms index (#{Time.now - t0}s)"); logger.flush + + paging = LinkedData::Models::Class.in(@submission).include(:unmapped).aggregate(:count, :children).page(page, size) + cls_count = @submission.class_count(logger) + paging.page_count_set(cls_count) unless cls_count < 0 + total_pages = paging.page(1, size).all.total_pages + num_threads = [total_pages, LinkedData.settings.indexing_num_threads].min + threads = [] + page_classes = nil + + num_threads.times do |num| + threads[num] = Thread.new { + Thread.current['done'] = false + Thread.current['page_len'] = -1 + Thread.current['prev_page_len'] = -1 + + while !Thread.current['done'] + @submission.synchronize do + page = (page == 0 || page_classes.next?) ? page + 1 : nil + + if page.nil? + Thread.current['done'] = true + else + Thread.current['page'] = page || 'nil' + page_classes = paging.page(page, size).all + count_classes += page_classes.length + Thread.current['page_classes'] = page_classes + Thread.current['page_len'] = page_classes.length + Thread.current['t0'] = Time.now + + # nothing retrieved even though we're expecting more records + if total_pages > 0 && page_classes.empty? && (Thread.current['prev_page_len'] == -1 || Thread.current['prev_page_len'] == size) + j = 0 + num_calls = LinkedData.settings.num_retries_4store + + while page_classes.empty? && j < num_calls do + j += 1 + logger.error("Thread #{num + 1}: Empty page encountered. Retrying #{j} times...") + sleep(2) + page_classes = paging.page(page, size).all + logger.info("Thread #{num + 1}: Success retrieving a page of #{page_classes.length} classes after retrying #{j} times...") unless page_classes.empty? + end + + if page_classes.empty? + msg = "Thread #{num + 1}: Empty page #{Thread.current["page"]} of #{total_pages} persisted after retrying #{j} times. Indexing of #{@submission.id.to_s} aborted..." + logger.error(msg) + raise msg + else + Thread.current['page_classes'] = page_classes + end + end + + if page_classes.empty? + if total_pages > 0 + logger.info("Thread #{num + 1}: The number of pages reported for #{@submission.id.to_s} - #{total_pages} is higher than expected #{page - 1}. Completing indexing...") + else + logger.info("Thread #{num + 1}: Ontology #{@submission.id.to_s} contains #{total_pages} pages...") + end + + break + end + + Thread.current['prev_page_len'] = Thread.current['page_len'] + end + end + + break if Thread.current['done'] + + logger.info("Thread #{num + 1}: Page #{Thread.current["page"]} of #{total_pages} - #{Thread.current["page_len"]} ontology terms retrieved in #{Time.now - Thread.current["t0"]} sec.") + Thread.current['t0'] = Time.now + + Thread.current['page_classes'].each do |c| + begin + # this cal is needed for indexing of properties + LinkedData::Models::Class.map_attributes(c, paging.equivalent_predicates) + rescue Exception => e + i = 0 + num_calls = LinkedData.settings.num_retries_4store + success = nil + + while success.nil? && i < num_calls do + i += 1 + logger.error("Thread #{num + 1}: Exception while mapping attributes for #{c.id.to_s}. Retrying #{i} times...") + sleep(2) + + begin + LinkedData::Models::Class.map_attributes(c, paging.equivalent_predicates) + logger.info("Thread #{num + 1}: Success mapping attributes for #{c.id.to_s} after retrying #{i} times...") + success = true + rescue Exception => e1 + success = nil + + if i == num_calls + logger.error("Thread #{num + 1}: Error mapping attributes for #{c.id.to_s}:") + logger.error("Thread #{num + 1}: #{e1.class}: #{e1.message} after retrying #{i} times...\n#{e1.backtrace.join("\n\t")}") + logger.flush + end + end + end + end + + @submission.synchronize do + csv_writer.write_class(c) + end + end + logger.info("Thread #{num + 1}: Page #{Thread.current["page"]} of #{total_pages} attributes mapped in #{Time.now - Thread.current["t0"]} sec.") + + Thread.current['t0'] = Time.now + LinkedData::Models::Class.indexBatch(Thread.current['page_classes']) + logger.info("Thread #{num + 1}: Page #{Thread.current["page"]} of #{total_pages} - #{Thread.current["page_len"]} ontology terms indexed in #{Time.now - Thread.current["t0"]} sec.") + logger.flush + end + } + end + + threads.map { |t| t.join } + csv_writer.close + + begin + # index provisional classes + @submission.ontology.provisionalClasses.each { |pc| pc.index } + rescue Exception => e + logger.error("Error while indexing provisional classes for ontology #{@submission.ontology.acronym}:") + logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") + logger.flush + end + + if commit + t0 = Time.now + LinkedData::Models::Class.indexCommit() + logger.info("Ontology terms index commit in #{Time.now - t0} sec.") + end + rescue StandardError => e + csv_writer.close + logger.error("\n\n#{e.class}: #{e.message}\n") + logger.error(e.backtrace) + raise e + end + end + logger.info("Completed indexing ontology terms: #{@submission.ontology.acronym} in #{time} sec. #{count_classes} classes.") + logger.flush + + if optimize + logger.info('Optimizing ontology terms index...') + time = Benchmark.realtime do + LinkedData::Models::Class.indexOptimize() + end + logger.info("Completed optimizing ontology terms index in #{time} sec.") + end + end + + end + end +end + diff --git a/lib/ontologies_linked_data/services/submission_process/operations/submission_mertrics_calculator.rb b/lib/ontologies_linked_data/services/submission_process/operations/submission_mertrics_calculator.rb new file mode 100644 index 00000000..411c8194 --- /dev/null +++ b/lib/ontologies_linked_data/services/submission_process/operations/submission_mertrics_calculator.rb @@ -0,0 +1,102 @@ +module LinkedData + module Services + class SubmissionMetricsCalculator < OntologySubmissionProcess + def process(logger, options = nil) + process_metrics(logger) + end + + def generate_umls_metrics_file(tr_file_path=nil) + tr_file_path ||= @submission.triples_file_path + class_count = 0 + indiv_count = 0 + prop_count = 0 + + File.foreach(tr_file_path) do |line| + class_count += 1 if line =~ /owl:Class/ + indiv_count += 1 if line =~ /owl:NamedIndividual/ + prop_count += 1 if line =~ /owl:ObjectProperty/ + prop_count += 1 if line =~ /owl:DatatypeProperty/ + end + generate_metrics_file(class_count, indiv_count, prop_count) + end + + private + + def process_metrics(logger) + status = LinkedData::Models::SubmissionStatus.find('METRICS').first + begin + compute_metrics(logger) + @submission.add_submission_status(status) + rescue StandardError => e + logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") + logger.flush + @submission.metrics = nil + @submission.add_submission_status(status.get_error_status) + ensure + @submission.save + end + end + + def compute_metrics(logger) + metrics = metrics_for_submission(logger) + metrics.id = RDF::URI.new(@submission.id.to_s + '/metrics') + exist_metrics = LinkedData::Models::Metric.find(metrics.id).first + exist_metrics.delete if exist_metrics + metrics.save + @submission.metrics = metrics + @submission + end + + def metrics_for_submission(logger) + logger.info('metrics_for_submission start') + logger.flush + begin + @submission.bring(:submissionStatus) if @submission.bring?(:submissionStatus) + cls_metrics = LinkedData::Metrics.class_metrics(@submission, logger) + logger.info('class_metrics finished') + logger.flush + metrics = LinkedData::Models::Metric.new + + cls_metrics.each do |k,v| + unless v.instance_of?(Integer) + begin + v = Integer(v) + rescue ArgumentError + v = 0 + rescue TypeError + v = 0 + end + end + metrics.send("#{k}=",v) + end + indiv_count = LinkedData::Metrics.number_individuals(logger, @submission) + metrics.individuals = indiv_count + logger.info('individuals finished') + logger.flush + prop_count = LinkedData::Metrics.number_properties(logger, @submission) + metrics.properties = prop_count + logger.info('properties finished') + logger.flush + # re-generate metrics file + generate_metrics_file(cls_metrics[:classes], indiv_count, prop_count) + logger.info('generation of metrics file finished') + logger.flush + rescue StandardError => e + logger.error(e.message) + logger.error(e) + logger.flush + metrics = nil + end + metrics + end + + def generate_metrics_file(class_count, indiv_count, prop_count) + CSV.open(@submission.metrics_path, 'wb') do |csv| + csv << ['Class Count', 'Individual Count', 'Property Count'] + csv << [class_count, indiv_count, prop_count] + end + end + + end + end +end diff --git a/lib/ontologies_linked_data/services/submission_process/operations/submission_properties_indexer.rb b/lib/ontologies_linked_data/services/submission_process/operations/submission_properties_indexer.rb new file mode 100644 index 00000000..beefd048 --- /dev/null +++ b/lib/ontologies_linked_data/services/submission_process/operations/submission_properties_indexer.rb @@ -0,0 +1,70 @@ +module LinkedData + module Services + class SubmissionPropertiesIndexer < OntologySubmissionProcess + + def process(logger, options = nil) + process_indexation(logger, options) + end + + private + + def process_indexation(logger, options) + status = LinkedData::Models::SubmissionStatus.find('INDEXED_PROPERTIES').first + begin + index_properties(logger, commit: options[:commit], optimize: false) + @submission.add_submission_status(status) + rescue StandardError => e + logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") + logger.flush + @submission.add_submission_status(status.get_error_status) + ensure + @submission.save + end + end + + def index_properties(logger, commit: true, optimize: true) + page = 1 + size = 2500 + count_props = 0 + + time = Benchmark.realtime do + @submission.bring(:ontology) if @submission.bring?(:ontology) + @submission.ontology.bring(:acronym) if @submission.ontology.bring?(:acronym) + logger.info("Indexing ontology properties: #{@submission.ontology.acronym}...") + t0 = Time.now + @submission.ontology.unindex_properties(commit) + logger.info("Removed ontology properties index in #{Time.now - t0} seconds."); logger.flush + + props = @submission.ontology.properties + count_props = props.length + total_pages = (count_props/size.to_f).ceil + logger.info("Indexing a total of #{total_pages} pages of #{size} properties each.") + + props.each_slice(size) do |prop_batch| + t = Time.now + LinkedData::Models::Class.indexBatch(prop_batch, :property) + logger.info("Page #{page} of ontology properties indexed in #{Time.now - t} seconds."); logger.flush + page += 1 + end + + if commit + t0 = Time.now + LinkedData::Models::Class.indexCommit(nil, :property) + logger.info("Ontology properties index commit in #{Time.now - t0} seconds.") + end + end + logger.info("Completed indexing ontology properties of #{@submission.ontology.acronym} in #{time} sec. Total of #{count_props} properties indexed.") + logger.flush + + if optimize + logger.info('Optimizing ontology properties index...') + time = Benchmark.realtime do + LinkedData::Models::Class.indexOptimize(nil, :property) + end + logger.info("Completed optimizing ontology properties index in #{time} seconds.") + end + end + end + end +end + diff --git a/lib/ontologies_linked_data/services/submission_process/operations/submission_rdf_generator.rb b/lib/ontologies_linked_data/services/submission_process/operations/submission_rdf_generator.rb new file mode 100644 index 00000000..d0dbac69 --- /dev/null +++ b/lib/ontologies_linked_data/services/submission_process/operations/submission_rdf_generator.rb @@ -0,0 +1,705 @@ +module LinkedData + module Services + + class MissingLabelsHandler < OntologySubmissionProcess + + def process(logger, options = {}) + handle_missing_labels(options[:file_path], logger) + end + + private + + def handle_missing_labels(file_path, logger) + callbacks = { + missing_labels: { + op_name: 'Missing Labels Generation', + required: true, + status: LinkedData::Models::SubmissionStatus.find('RDF_LABELS').first, + artifacts: { + file_path: file_path + }, + caller_on_pre: :generate_missing_labels_pre, + caller_on_pre_page: :generate_missing_labels_pre_page, + caller_on_each: :generate_missing_labels_each, + caller_on_post_page: :generate_missing_labels_post_page, + caller_on_post: :generate_missing_labels_post + } + } + + raw_paging = LinkedData::Models::Class.in(@submission).include(:prefLabel, :synonym, :label) + loop_classes(logger, raw_paging, @submission, callbacks) + end + + def process_callbacks(logger, callbacks, action_name) + callbacks.delete_if do |_, callback| + begin + if callback[action_name] + callable = self.method(callback[action_name]) + yield(callable, callback) + end + false + rescue Exception => e + logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") + logger.flush + + if callback[:status] + @submission.add_submission_status(callback[:status].get_error_status) + @submission.save + end + + # halt the entire processing if :required is set to true + raise e if callback[:required] + # continue processing of other callbacks, but not this one + true + end + end + end + + def loop_classes(logger, raw_paging, submission, callbacks) + page = 1 + size = 2500 + count_classes = 0 + acr = submission.id.to_s.split("/")[-1] + operations = callbacks.values.map { |v| v[:op_name] }.join(", ") + + time = Benchmark.realtime do + paging = raw_paging.page(page, size) + cls_count_set = false + cls_count = submission.class_count(logger) + + if cls_count > -1 + # prevent a COUNT SPARQL query if possible + paging.page_count_set(cls_count) + cls_count_set = true + else + cls_count = 0 + end + + iterate_classes = false + # 1. init artifacts hash if not explicitly passed in the callback + # 2. determine if class-level iteration is required + callbacks.each { |_, callback| callback[:artifacts] ||= {}; + if callback[:caller_on_each] + iterate_classes = true + end } + + process_callbacks(logger, callbacks, :caller_on_pre) { + |callable, callback| callable.call(callback[:artifacts], logger, paging) } + + page_len = -1 + prev_page_len = -1 + + begin + t0 = Time.now + page_classes = paging.page(page, size).all + total_pages = page_classes.total_pages + page_len = page_classes.length + + # nothing retrieved even though we're expecting more records + if total_pages > 0 && page_classes.empty? && (prev_page_len == -1 || prev_page_len == size) + j = 0 + num_calls = LinkedData.settings.num_retries_4store + + while page_classes.empty? && j < num_calls do + j += 1 + logger.error("Empty page encountered. Retrying #{j} times...") + sleep(2) + page_classes = paging.page(page, size).all + unless page_classes.empty? + logger.info("Success retrieving a page of #{page_classes.length} classes after retrying #{j} times...") + end + end + + if page_classes.empty? + msg = "Empty page #{page} of #{total_pages} persisted after retrying #{j} times. #{operations} of #{acr} aborted..." + logger.error(msg) + raise msg + end + end + + if page_classes.empty? + if total_pages > 0 + logger.info("The number of pages reported for #{acr} - #{total_pages} is higher than expected #{page - 1}. Completing #{operations}...") + else + logger.info("Ontology #{acr} contains #{total_pages} pages...") + end + break + end + + prev_page_len = page_len + logger.info("#{acr}: page #{page} of #{total_pages} - #{page_len} ontology terms retrieved in #{Time.now - t0} sec.") + logger.flush + count_classes += page_classes.length + + process_callbacks(logger, callbacks, :caller_on_pre_page) { + |callable, callback| callable.call(callback[:artifacts], logger, paging, page_classes, page) } + + page_classes.each { |c| + process_callbacks(logger, callbacks, :caller_on_each) { + |callable, callback| callable.call(callback[:artifacts], logger, paging, page_classes, page, c) } + } if iterate_classes + + process_callbacks(logger, callbacks, :caller_on_post_page) { + |callable, callback| callable.call(callback[:artifacts], logger, paging, page_classes, page) } + cls_count += page_classes.length unless cls_count_set + + page = page_classes.next? ? page + 1 : nil + end while !page.nil? + + callbacks.each { |_, callback| callback[:artifacts][:count_classes] = cls_count } + process_callbacks(logger, callbacks, :caller_on_post) { + |callable, callback| callable.call(callback[:artifacts], logger, paging) } + end + + logger.info("Completed #{operations}: #{acr} in #{time} sec. #{count_classes} classes.") + logger.flush + + # set the status on actions that have completed successfully + callbacks.each do |_, callback| + if callback[:status] + @submission.add_submission_status(callback[:status]) + @submission.save + end + end + end + + def generate_missing_labels_pre(artifacts = {}, logger, paging) + file_path = artifacts[:file_path] + artifacts[:save_in_file] = File.join(File.dirname(file_path), "labels.ttl") + artifacts[:save_in_file_mappings] = File.join(File.dirname(file_path), "mappings.ttl") + property_triples = LinkedData::Utils::Triples.rdf_for_custom_properties(@submission) + Goo.sparql_data_client.append_triples(@submission.id, property_triples, mime_type = "application/x-turtle") + fsave = File.open(artifacts[:save_in_file], "w") + fsave.write(property_triples) + fsave_mappings = File.open(artifacts[:save_in_file_mappings], "w") + artifacts[:fsave] = fsave + artifacts[:fsave_mappings] = fsave_mappings + end + + def generate_missing_labels_pre_page(artifacts = {}, logger, paging, page_classes, page) + artifacts[:label_triples] = [] + artifacts[:mapping_triples] = [] + end + + def generate_missing_labels_each(artifacts = {}, logger, paging, page_classes, page, c) + prefLabel = nil + + if c.prefLabel.nil? + rdfs_labels = c.label + + if rdfs_labels && rdfs_labels.length > 1 && c.synonym.length > 0 + rdfs_labels = (Set.new(c.label) - Set.new(c.synonym)).to_a.first + + rdfs_labels = c.label if rdfs_labels.nil? || rdfs_labels.length == 0 + end + + rdfs_labels = [rdfs_labels] if rdfs_labels and not (rdfs_labels.instance_of? Array) + label = nil + + if rdfs_labels && rdfs_labels.length > 0 + label = rdfs_labels[0] + else + label = LinkedData::Utils::Triples.last_iri_fragment c.id.to_s + end + artifacts[:label_triples] << LinkedData::Utils::Triples.label_for_class_triple( + c.id, Goo.vocabulary(:metadata_def)[:prefLabel], label) + prefLabel = label + else + prefLabel = c.prefLabel + end + + if @submission.ontology.viewOf.nil? + loomLabel = LinkedData::Models::OntologySubmission.loom_transform_literal(prefLabel.to_s) + + if loomLabel.length > 2 + artifacts[:mapping_triples] << LinkedData::Utils::Triples.loom_mapping_triple( + c.id, Goo.vocabulary(:metadata_def)[:mappingLoom], loomLabel) + end + artifacts[:mapping_triples] << LinkedData::Utils::Triples.uri_mapping_triple( + c.id, Goo.vocabulary(:metadata_def)[:mappingSameURI], c.id) + end + end + + def generate_missing_labels_post_page(artifacts = {}, logger, paging, page_classes, page) + rest_mappings = LinkedData::Mappings.migrate_rest_mappings(@submission.ontology.acronym) + artifacts[:mapping_triples].concat(rest_mappings) + + if artifacts[:label_triples].length > 0 + logger.info("Asserting #{artifacts[:label_triples].length} labels in " + + "#{@submission.id.to_ntriples}") + logger.flush + artifacts[:label_triples] = artifacts[:label_triples].join("\n") + artifacts[:fsave].write(artifacts[:label_triples]) + t0 = Time.now + Goo.sparql_data_client.append_triples(@submission.id, artifacts[:label_triples], mime_type = "application/x-turtle") + t1 = Time.now + logger.info("Labels asserted in #{t1 - t0} sec.") + logger.flush + else + logger.info("No labels generated in page #{page}.") + logger.flush + end + + if artifacts[:mapping_triples].length > 0 + logger.info("Asserting #{artifacts[:mapping_triples].length} mappings in " + + "#{@submission.id.to_ntriples}") + logger.flush + artifacts[:mapping_triples] = artifacts[:mapping_triples].join("\n") + artifacts[:fsave_mappings].write(artifacts[:mapping_triples]) + + t0 = Time.now + Goo.sparql_data_client.append_triples(@submission.id, artifacts[:mapping_triples], mime_type = "application/x-turtle") + t1 = Time.now + logger.info("Mapping labels asserted in #{t1 - t0} sec.") + logger.flush + end + end + + def generate_missing_labels_post(artifacts = {}, logger, pagging) + logger.info("end generate_missing_labels traversed #{artifacts[:count_classes]} classes") + logger.info("Saved generated labels in #{artifacts[:save_in_file]}") + artifacts[:fsave].close() + artifacts[:fsave_mappings].close() + logger.flush + end + + end + + class SubmissionRDFGenerator < OntologySubmissionProcess + + def process(logger, options) + process_rdf(logger, options[:reasoning]) + end + + private + + def process_rdf(logger, reasoning) + # Remove processing status types before starting RDF parsing etc. + @submission.submissionStatus = nil + status = LinkedData::Models::SubmissionStatus.find('UPLOADED').first + @submission.add_submission_status(status) + @submission.save + + # Parse RDF + begin + unless @submission.valid? + error = 'Submission is not valid, it cannot be processed. Check errors.' + raise ArgumentError, error + end + unless @submission.uploadFilePath + error = 'Submission is missing an ontology file, cannot parse.' + raise ArgumentError, error + end + status = LinkedData::Models::SubmissionStatus.find('RDF').first + @submission.remove_submission_status(status) #remove RDF status before starting + zip_dst = @submission.unzip_submission(logger) + file_path = zip_dst ? zip_dst.to_s : @submission.uploadFilePath.to_s + generate_rdf(logger, file_path, reasoning: reasoning) + @submission.add_submission_status(status) + @submission.save + rescue Exception => e + logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") + logger.flush + @submission.add_submission_status(status.get_error_status) + @submission.save + # If RDF generation fails, no point of continuing + raise e + end + + MissingLabelsHandler.new(@submission).process(logger, file_path: file_path) + + status = LinkedData::Models::SubmissionStatus.find('OBSOLETE').first + begin + generate_obsolete_classes(logger, file_path) + @submission.add_submission_status(status) + @submission.save + rescue Exception => e + logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") + logger.flush + @submission.add_submission_status(status.get_error_status) + @submission.save + # if obsolete fails the parsing fails + raise e + end + end + + def generate_rdf(logger, file_path, reasoning: true) + mime_type = nil + + if @submission.hasOntologyLanguage.umls? + triples_file_path = @submission.triples_file_path + logger.info("Using UMLS turtle file found, skipping OWLAPI parse") + logger.flush + mime_type = LinkedData::MediaTypes.media_type_from_base(LinkedData::MediaTypes::TURTLE) + SubmissionMetricsCalculator.new(@submission).generate_umls_metrics_file(triples_file_path) + else + output_rdf = @submission.rdf_path + + if File.exist?(output_rdf) + logger.info("deleting old owlapi.xrdf ..") + deleted = FileUtils.rm(output_rdf) + + if deleted.length > 0 + logger.info("deleted") + else + logger.info("error deleting owlapi.rdf") + end + end + owlapi = LinkedData::Parser::OWLAPICommand.new( + File.expand_path(file_path), + File.expand_path(@submission.data_folder.to_s), + master_file: @submission.masterFileName) + + if !reasoning + owlapi.disable_reasoner + end + triples_file_path, missing_imports = owlapi.parse + + if missing_imports && missing_imports.length > 0 + @submission.missingImports = missing_imports + + missing_imports.each do |imp| + logger.info("OWL_IMPORT_MISSING: #{imp}") + end + else + @submission.missingImports = nil + end + logger.flush + end + + begin + delete_and_append(triples_file_path, logger, mime_type) + rescue => e + logger.error("Error sending data to triple store - #{e.response.code} #{e.class}: #{e.response.body}") if e.response&.body + raise e + end + version_info = extract_version + + if version_info + @submission.version = version_info + end + end + + + + def delete_and_append(triples_file_path, logger, mime_type = nil) + Goo.sparql_data_client.delete_graph(@submission.id) + Goo.sparql_data_client.put_triples(@submission.id, triples_file_path, mime_type) + logger.info("Triples #{triples_file_path} appended in #{@submission.id.to_ntriples}") + logger.flush + end + + + + + def extract_version + + query_version_info = < + ?versionInfo . +} +eos + Goo.sparql_query_client.query(query_version_info).each_solution do |sol| + return sol[:versionInfo].to_s + end + return nil + end + + def process_callbacks(logger, callbacks, action_name, &block) + callbacks.delete_if do |_, callback| + begin + if callback[action_name] + callable = @submission.method(callback[action_name]) + yield(callable, callback) + end + false + rescue Exception => e + logger.error("#{e.class}: #{e.message}\n#{e.backtrace.join("\n\t")}") + logger.flush + + if callback[:status] + add_submission_status(callback[:status].get_error_status) + @submission.save + end + + # halt the entire processing if :required is set to true + raise e if callback[:required] + # continue processing of other callbacks, but not this one + true + end + end + end + + def loop_classes(logger, raw_paging, callbacks) + page = 1 + size = 2500 + count_classes = 0 + acr = @submission.id.to_s.split("/")[-1] + operations = callbacks.values.map { |v| v[:op_name] }.join(", ") + + time = Benchmark.realtime do + paging = raw_paging.page(page, size) + cls_count_set = false + cls_count = class_count(logger) + + if cls_count > -1 + # prevent a COUNT SPARQL query if possible + paging.page_count_set(cls_count) + cls_count_set = true + else + cls_count = 0 + end + + iterate_classes = false + # 1. init artifacts hash if not explicitly passed in the callback + # 2. determine if class-level iteration is required + callbacks.each { |_, callback| callback[:artifacts] ||= {}; iterate_classes = true if callback[:caller_on_each] } + + process_callbacks(logger, callbacks, :caller_on_pre) { + |callable, callback| callable.call(callback[:artifacts], logger, paging) } + + page_len = -1 + prev_page_len = -1 + + begin + t0 = Time.now + page_classes = paging.page(page, size).all + total_pages = page_classes.total_pages + page_len = page_classes.length + + # nothing retrieved even though we're expecting more records + if total_pages > 0 && page_classes.empty? && (prev_page_len == -1 || prev_page_len == size) + j = 0 + num_calls = LinkedData.settings.num_retries_4store + + while page_classes.empty? && j < num_calls do + j += 1 + logger.error("Empty page encountered. Retrying #{j} times...") + sleep(2) + page_classes = paging.page(page, size).all + logger.info("Success retrieving a page of #{page_classes.length} classes after retrying #{j} times...") unless page_classes.empty? + end + + if page_classes.empty? + msg = "Empty page #{page} of #{total_pages} persisted after retrying #{j} times. #{operations} of #{acr} aborted..." + logger.error(msg) + raise msg + end + end + + if page_classes.empty? + if total_pages > 0 + logger.info("The number of pages reported for #{acr} - #{total_pages} is higher than expected #{page - 1}. Completing #{operations}...") + else + logger.info("Ontology #{acr} contains #{total_pages} pages...") + end + break + end + + prev_page_len = page_len + logger.info("#{acr}: page #{page} of #{total_pages} - #{page_len} ontology terms retrieved in #{Time.now - t0} sec.") + logger.flush + count_classes += page_classes.length + + process_callbacks(logger, callbacks, :caller_on_pre_page) { + |callable, callback| callable.call(callback[:artifacts], logger, paging, page_classes, page) } + + page_classes.each { |c| + process_callbacks(logger, callbacks, :caller_on_each) { + |callable, callback| callable.call(callback[:artifacts], logger, paging, page_classes, page, c) } + } if iterate_classes + + process_callbacks(logger, callbacks, :caller_on_post_page) { + |callable, callback| callable.call(callback[:artifacts], logger, paging, page_classes, page) } + cls_count += page_classes.length unless cls_count_set + + page = page_classes.next? ? page + 1 : nil + end while !page.nil? + + callbacks.each { |_, callback| callback[:artifacts][:count_classes] = cls_count } + process_callbacks(logger, callbacks, :caller_on_post) { + |callable, callback| callable.call(callback[:artifacts], logger, paging) } + end + + logger.info("Completed #{operations}: #{acr} in #{time} sec. #{count_classes} classes.") + logger.flush + + # set the status on actions that have completed successfully + callbacks.each do |_, callback| + if callback[:status] + add_submission_status(callback[:status]) + @submission.save + end + end + end + + def generate_missing_labels_pre(artifacts = {}, logger, paging) + file_path = artifacts[:file_path] + artifacts[:save_in_file] = File.join(File.dirname(file_path), "labels.ttl") + artifacts[:save_in_file_mappings] = File.join(File.dirname(file_path), "mappings.ttl") + property_triples = LinkedData::Utils::Triples.rdf_for_custom_properties(@submission) + Goo.sparql_data_client.append_triples(@submission.id, property_triples, mime_type = "application/x-turtle") + fsave = File.open(artifacts[:save_in_file], "w") + fsave.write(property_triples) + fsave_mappings = File.open(artifacts[:save_in_file_mappings], "w") + artifacts[:fsave] = fsave + artifacts[:fsave_mappings] = fsave_mappings + end + + def generate_missing_labels_pre_page(artifacts = {}, logger, paging, page_classes, page) + artifacts[:label_triples] = [] + artifacts[:mapping_triples] = [] + end + + def generate_missing_labels_each(artifacts = {}, logger, paging, page_classes, page, c) + prefLabel = nil + + if c.prefLabel.nil? + rdfs_labels = c.label + + if rdfs_labels && rdfs_labels.length > 1 && c.synonym.length > 0 + rdfs_labels = (Set.new(c.label) - Set.new(c.synonym)).to_a.first + + if rdfs_labels.nil? || rdfs_labels.length == 0 + rdfs_labels = c.label + end + end + + if rdfs_labels and not (rdfs_labels.instance_of? Array) + rdfs_labels = [rdfs_labels] + end + label = nil + + if rdfs_labels && rdfs_labels.length > 0 + label = rdfs_labels[0] + else + label = LinkedData::Utils::Triples.last_iri_fragment c.id.to_s + end + artifacts[:label_triples] << LinkedData::Utils::Triples.label_for_class_triple( + c.id, Goo.vocabulary(:metadata_def)[:prefLabel], label) + prefLabel = label + else + prefLabel = c.prefLabel + end + + if @submission.ontology.viewOf.nil? + loomLabel = OntologySubmission.loom_transform_literal(prefLabel.to_s) + + if loomLabel.length > 2 + artifacts[:mapping_triples] << LinkedData::Utils::Triples.loom_mapping_triple( + c.id, Goo.vocabulary(:metadata_def)[:mappingLoom], loomLabel) + end + artifacts[:mapping_triples] << LinkedData::Utils::Triples.uri_mapping_triple( + c.id, Goo.vocabulary(:metadata_def)[:mappingSameURI], c.id) + end + end + + def generate_missing_labels_post_page(artifacts = {}, logger, paging, page_classes, page) + rest_mappings = LinkedData::Mappings.migrate_rest_mappings(@submission.ontology.acronym) + artifacts[:mapping_triples].concat(rest_mappings) + + if artifacts[:label_triples].length > 0 + logger.info("Asserting #{artifacts[:label_triples].length} labels in " + + "#{@submission.id.to_ntriples}") + logger.flush + artifacts[:label_triples] = artifacts[:label_triples].join("\n") + artifacts[:fsave].write(artifacts[:label_triples]) + t0 = Time.now + Goo.sparql_data_client.append_triples(@submission.id, artifacts[:label_triples], mime_type = "application/x-turtle") + t1 = Time.now + logger.info("Labels asserted in #{t1 - t0} sec.") + logger.flush + else + logger.info("No labels generated in page #{page}.") + logger.flush + end + + if artifacts[:mapping_triples].length > 0 + logger.info("Asserting #{artifacts[:mapping_triples].length} mappings in " + + "#{@submission.id.to_ntriples}") + logger.flush + artifacts[:mapping_triples] = artifacts[:mapping_triples].join("\n") + artifacts[:fsave_mappings].write(artifacts[:mapping_triples]) + + t0 = Time.now + Goo.sparql_data_client.append_triples(@submission.id, artifacts[:mapping_triples], mime_type = "application/x-turtle") + t1 = Time.now + logger.info("Mapping labels asserted in #{t1 - t0} sec.") + logger.flush + end + end + + def generate_missing_labels_post(artifacts = {}, logger, paging) + logger.info("end generate_missing_labels traversed #{artifacts[:count_classes]} classes") + logger.info("Saved generated labels in #{artifacts[:save_in_file]}") + artifacts[:fsave].close() + artifacts[:fsave_mappings].close() + logger.flush + end + + def generate_obsolete_classes(logger, file_path) + @submission.bring(:obsoleteProperty) if @submission.bring?(:obsoleteProperty) + @submission.bring(:obsoleteParent) if @submission.bring?(:obsoleteParent) + classes_deprecated = [] + if @submission.obsoleteProperty && + @submission.obsoleteProperty.to_s != "http://www.w3.org/2002/07/owl#deprecated" + + predicate_obsolete = RDF::URI.new(@submission.obsoleteProperty.to_s) + query_obsolete_predicate = < 0 + classes_deprecated.uniq! + logger.info("Asserting owl:deprecated statement for #{classes_deprecated} classes") + save_in_file = File.join(File.dirname(file_path), "obsolete.ttl") + fsave = File.open(save_in_file, "w") + classes_deprecated.each do |class_id| + fsave.write(LinkedData::Utils::Triples.obselete_class_triple(class_id) + "\n") + end + fsave.close() + result = Goo.sparql_data_client.append_triples_from_file( + @submission.id, + save_in_file, + mime_type = "application/x-turtle") + end + end + + end + end +end + diff --git a/lib/ontologies_linked_data/services/submission_process/submission_process.rb b/lib/ontologies_linked_data/services/submission_process/submission_process.rb new file mode 100644 index 00000000..4e7433e8 --- /dev/null +++ b/lib/ontologies_linked_data/services/submission_process/submission_process.rb @@ -0,0 +1,14 @@ +module LinkedData + module Services + class OntologySubmissionProcess + + def initialize(submission) + @submission = submission + end + + def process(logger, options = {}) + raise NotImplementedError + end + end + end +end diff --git a/lib/ontologies_linked_data/services/submission_process/submission_processor.rb b/lib/ontologies_linked_data/services/submission_process/submission_processor.rb new file mode 100644 index 00000000..107f70cd --- /dev/null +++ b/lib/ontologies_linked_data/services/submission_process/submission_processor.rb @@ -0,0 +1,126 @@ +module LinkedData + module Services + class OntologyProcessor < OntologySubmissionProcess + + ################################################################ + # Possible options with their defaults: + # process_rdf = false + # index_search = false + # index_properties = false + # index_commit = false + # run_metrics = false + # reasoning = false + # diff = false + # archive = false + # if no options passed, ALL actions, except for archive = true + ################################################################ + def process(logger, options = nil) + process_submission(logger, options) + end + + private + + def process_submission(logger, options = {}) + # Wrap the whole process so we can email results + begin + archive, diff, index_commit, index_properties, + index_search, process_rdf, reasoning, run_metrics = get_options(options) + + @submission.bring_remaining + @submission.ontology.bring_remaining + + logger.info("Starting to process #{@submission.ontology.acronym}/submissions/#{@submission.submissionId}") + logger.flush + LinkedData::Parser.logger = logger + + if archive + @submission.archive + else + + @submission.generate_rdf(logger, reasoning: reasoning) if process_rdf + + parsed = @submission.ready?(status: [:rdf, :rdf_labels]) + + if index_search + unless parsed + raise StandardError, "The submission #{@submission.ontology.acronym}/submissions/#{@submission.submissionId} + cannot be indexed because it has not been successfully parsed" + end + @submission.index(logger, commit: index_commit) + end + + if index_properties + unless parsed + raise Exception, "The properties for the submission #{@submission.ontology.acronym}/submissions/#{@submission.submissionId} + cannot be indexed because it has not been successfully parsed" + + end + @submission.index_properties(logger, commit: index_commit) + end + + if run_metrics + unless parsed + raise StandardError, "Metrics cannot be generated on the submission + #{@submission.ontology.acronym}/submissions/#{@submission.submissionId} + because it has not been successfully parsed" + end + @submission.generate_metrics(logger) + end + @submission.generate_diff(logger) if diff + end + + @submission.save + logger.info("Submission processing of #{@submission.id} completed successfully") + logger.flush + ensure + # make sure results get emailed + notify_submission_processed(logger) + end + @submission + end + + def notify_submission_processed(logger) + begin + LinkedData::Utils::Notifications.submission_processed(@submission) + rescue StandardError => e + logger.error("Email sending failed: #{e.message}\n#{e.backtrace.join("\n\t")}"); logger.flush + end + end + + def get_options(options) + + if options.empty? + process_rdf = true + index_search = true + index_properties = true + index_commit = true + run_metrics = true + reasoning = true + diff = true + archive = false + else + process_rdf = options[:process_rdf] == true + index_search = options[:index_search] == true + index_properties = options[:index_properties] == true + run_metrics = options[:run_metrics] == true + + reasoning = if !process_rdf || options[:reasoning] == false + false + else + true + end + + index_commit = if (!index_search && !index_properties) || options[:index_commit] == false + false + else + true + end + + diff = options[:diff] == true + archive = options[:archive] == true + end + [archive, diff, index_commit, index_properties, index_search, process_rdf, reasoning, run_metrics] + end + end + end +end