From e7cf04cf4ea6c8fb5c68503ba58aac28d9044c2f Mon Sep 17 00:00:00 2001 From: Kory Draughn Date: Tue, 20 Feb 2024 20:39:22 -0500 Subject: [PATCH] [#128] Replace use of elasticlient and cpr with boost.beast. --- elasticsearch.cmake | 20 +- es_mapping.json | 93 +- ...rtium_continuous_integration_build_hook.py | 4 +- libirods_rule_engine_plugin-elasticsearch.cpp | 919 ++++++++++-------- packaging/test_plugin_indexing.py | 115 +-- 5 files changed, 599 insertions(+), 552 deletions(-) diff --git a/elasticsearch.cmake b/elasticsearch.cmake index 6411690..9a145f6 100644 --- a/elasticsearch.cmake +++ b/elasticsearch.cmake @@ -9,8 +9,6 @@ string(REPLACE "_" "-" TARGET_NAME_HYPHENS ${TARGET_NAME}) include(IrodsExternals) -IRODS_MACRO_CHECK_DEPENDENCY_SET_FULLPATH_ADD_TO_IRODS_PACKAGE_DEPENDENCIES_LIST(ELASTICCLIENT elasticlientd68e30e3-0) - string(REPLACE ";" ", " ${TARGET_NAME}_PACKAGE_DEPENDENCIES_STRING "${IRODS_PACKAGE_DEPENDENCIES_LIST}") unset(IRODS_PACKAGE_DEPENDENCIES_LIST) @@ -40,10 +38,6 @@ target_include_directories( ${IRODS_INCLUDE_DIRS} ${IRODS_EXTERNALS_FULLPATH_BOOST}/include ${IRODS_EXTERNALS_FULLPATH_FMT}/include - ${IRODS_EXTERNALS_FULLPATH_JANSSON}/include - ${IRODS_EXTERNALS_FULLPATH_FMT}/include - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${IRODS_EXTERNALS_FULLPATH_ELASTICCLIENT}/include/ ) target_link_libraries( @@ -53,15 +47,21 @@ target_link_libraries( ${IRODS_EXTERNALS_FULLPATH_BOOST}/lib/libboost_filesystem.so ${IRODS_EXTERNALS_FULLPATH_BOOST}/lib/libboost_regex.so ${IRODS_EXTERNALS_FULLPATH_BOOST}/lib/libboost_system.so + ${IRODS_EXTERNALS_FULLPATH_BOOST}/lib/libboost_url.so ${IRODS_EXTERNALS_FULLPATH_FMT}/lib/libfmt.so - ${IRODS_EXTERNALS_FULLPATH_ELASTICCLIENT}/lib/libelasticlient.so - ${IRODS_EXTERNALS_FULLPATH_ELASTICCLIENT}/lib/libjsoncpp.so - ${IRODS_EXTERNALS_FULLPATH_ELASTICCLIENT}/lib/libcpr.so irods_common nlohmann_json::nlohmann_json ) -target_compile_definitions(${TARGET_NAME} PRIVATE ${IRODS_PLUGIN_POLICY_COMPILE_DEFINITIONS} ${IRODS_COMPILE_DEFINITIONS} ${IRODS_COMPILE_DEFINITIONS_PRIVATE} BOOST_SYSTEM_NO_DEPRECATED) +target_compile_definitions( + ${TARGET_NAME} + PRIVATE + ${IRODS_PLUGIN_POLICY_COMPILE_DEFINITIONS} + ${IRODS_COMPILE_DEFINITIONS} + ${IRODS_COMPILE_DEFINITIONS_PRIVATE} + BOOST_SYSTEM_NO_DEPRECATED + IRODS_PLUGIN_VERSION="${IRODS_PLUGIN_VERSION}" +) target_compile_options(${TARGET_NAME} PRIVATE -Wno-write-strings) set_property(TARGET ${TARGET_NAME} PROPERTY CXX_STANDARD ${IRODS_CXX_STANDARD}) diff --git a/es_mapping.json b/es_mapping.json index b13ffeb..e2e973a 100644 --- a/es_mapping.json +++ b/es_mapping.json @@ -1,45 +1,48 @@ -{"properties": { - "url": { - "type": "text" - }, - "zoneName": { - "type": "keyword" - }, - "absolutePath": { - "type": "keyword" - }, - "fileName": { - "type": "text" - }, - "parentPath": { - "type": "text" - }, - "isFile": { - "type": "boolean" - }, - "dataSize": { - "type": "long" - }, - "mimeType": { - "type": "keyword" - }, - "lastModifiedDate": { - "type": "date", - "format": "epoch_second" - }, - "metadataEntries": { - "type": "nested", - "properties": { - "attribute": { - "type": "keyword" - }, - "value": { - "type": "text" - }, - "unit": { - "type": "keyword" - } - } - } - } - } +{ + "mappings": { + "properties": { + "url": { + "type": "text" + }, + "zoneName": { + "type": "keyword" + }, + "absolutePath": { + "type": "keyword" + }, + "fileName": { + "type": "text" + }, + "parentPath": { + "type": "text" + }, + "isFile": { + "type": "boolean" + }, + "dataSize": { + "type": "long" + }, + "mimeType": { + "type": "keyword" + }, + "lastModifiedDate": { + "type": "date", + "format": "epoch_second" + }, + "metadataEntries": { + "type": "nested", + "properties": { + "attribute": { + "type": "keyword" + }, + "value": { + "type": "text" + }, + "unit": { + "type": "keyword" + } + } + } + } + } +} diff --git a/irods_consortium_continuous_integration_build_hook.py b/irods_consortium_continuous_integration_build_hook.py index e6e5098..2f32fba 100644 --- a/irods_consortium_continuous_integration_build_hook.py +++ b/irods_consortium_continuous_integration_build_hook.py @@ -15,12 +15,10 @@ def add_cmake_to_front_of_path(): def install_building_dependencies(externals_directory): externals_list = [ - 'irods-externals-boost1.78.0-0', + 'irods-externals-boost1.81.0-0', 'irods-externals-clang-runtime13.0.0-0', 'irods-externals-clang13.0.0-0', 'irods-externals-cmake3.21.4-0', - "irods-externals-cpr1.3.0-1", - "irods-externals-elasticlientd68e30e3-0", 'irods-externals-json3.10.4-0' ] if externals_directory == 'None' or externals_directory is None: diff --git a/libirods_rule_engine_plugin-elasticsearch.cpp b/libirods_rule_engine_plugin-elasticsearch.cpp index 2cb15e1..812addc 100644 --- a/libirods_rule_engine_plugin-elasticsearch.cpp +++ b/libirods_rule_engine_plugin-elasticsearch.cpp @@ -1,84 +1,60 @@ +#include "configuration.hpp" +#include "plugin_specific_configuration.hpp" +#include "utilities.hpp" -#define IRODS_IO_TRANSPORT_ENABLE_SERVER_SIDE_API -#define IRODS_QUERY_ENABLE_SERVER_SIDE_API -#include +#include +#include +#include #include #include -#include "utilities.hpp" -#include "plugin_specific_configuration.hpp" -#include "configuration.hpp" -#include +#include #include -#include -#include -#include -#include +#define IRODS_QUERY_ENABLE_SERVER_SIDE_API +#include + +#define IRODS_IO_TRANSPORT_ENABLE_SERVER_SIDE_API +#include #include + #define IRODS_FILESYSTEM_ENABLE_SERVER_SIDE_API #include -#include -#include -#include -#include -#include -#include - +#include #include #include -#include -#include -#include -#include -#include + #include -#include #include +#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include -namespace { +#include +#include - using HTTPMethod = elasticlient::Client::HTTPMethod; - - namespace ElasticSearch { - - cpr::Response index (const std::string & version, - elasticlient::Client &cl, - const std::string & index_name, - const std::string & mapping_type, - const std::string & doc_id, - const std::string & body) - { - if (version < "7.") - return cl.index (index_name, mapping_type, doc_id, body); - else - return cl.performRequest (HTTPMethod::PUT, - index_name + "/_doc/" + doc_id, - body); - } +#include +#include +#include +#include +#include +#include - cpr::Response remove (const std::string & version, - elasticlient::Client &cl, - const std::string & index_name, - const std::string & mapping_type, - const std::string & doc_id) - { - if (version < "7.") - return cl.remove (index_name, mapping_type, doc_id); - else - return cl.performRequest (HTTPMethod::DELETE, - index_name + "/_doc/" + doc_id, - ""); - } +namespace { - } // namespace ElasticSearch + namespace beast = boost::beast; + namespace http = beast::http; + using http_response = std::optional>; + using json = nlohmann::json; using string_t = std::string; struct configuration : irods::indexing::configuration { @@ -122,6 +98,130 @@ namespace { std::string metadata_index_policy; std::string metadata_purge_policy; + auto send_http_request(http::verb _verb, const std::string_view _target, const std::string_view _body = "") + -> http_response + { + for (auto&& host : config->hosts_) { + if (host.empty()) { + rodsLog(LOG_ERROR, "%s: empty service URL.", __func__); + continue; + } + + namespace urls = boost::urls; + + urls::result result = urls::parse_uri(host); + if (!result) { + rodsLog(LOG_ERROR, fmt::format("{}: could not parse service URL [{}].", __func__, host).c_str()); + continue; + } + + const auto use_tls = (result->has_scheme() && result->scheme_id() == urls::scheme::https); + + namespace net = boost::asio; + using tcp = net::ip::tcp; + + // This lambda encapsulates the HTTP logic that's independent of the type of stream. + const auto construct_and_send_http_request = [_verb, _target, _body](auto& _stream) { + http::request req{_verb, _target, 11}; + req.set(http::field::host, boost::asio::ip::host_name()); + req.set(http::field::user_agent, "iRODS Indexing Plugin/" IRODS_PLUGIN_VERSION); + req.set(http::field::content_type, "application/json"); + + if (!_body.empty()) { + req.body() = _body; + + std::stringstream ss; + ss << req; + const auto s = ss.str(); + + if (s.size() > 256) { + rodsLog(LOG_DEBUG, + fmt::format("{}: sending request = (truncated) [{} ...]", __func__, s.substr(0, 256)) + .c_str()); + } + else { + rodsLog(LOG_DEBUG, fmt::format("{}: sending request = [{}]", __func__, s).c_str()); + } + + req.prepare_payload(); + } + + http::write(_stream, req); + + beast::flat_buffer buffer; + http::response res; + http::read(_stream, buffer, res); + + std::stringstream ss; + ss << res; + rodsLog(LOG_DEBUG, fmt::format("{}: elasticsearch response = [{}]", __func__, ss.str()).c_str()); + + return res; + }; + + try { + net::io_context ioc; + + tcp::resolver resolver{ioc}; + const auto results = resolver.resolve(result->host(), result->port()); + + if (use_tls) { + net::ssl::context tls_ctx{net::ssl::context::tlsv12_client}; + tls_ctx.set_default_verify_paths(); + tls_ctx.set_verify_mode(net::ssl::verify_peer); + + beast::ssl_stream stream{ioc, tls_ctx}; + + // Set SNI hostname (many hosts need this to handshake successfully). + if (!::SSL_set_tlsext_host_name(stream.native_handle(), result->host().c_str())) { + beast::error_code ec{static_cast(::ERR_get_error()), net::error::get_ssl_category()}; + throw beast::system_error{ec}; + } + + beast::get_lowest_layer(stream).connect(results); + + auto res = construct_and_send_http_request(stream); + + beast::error_code ec; + stream.shutdown(ec); + + if (net::error::eof == ec) { + // Rationale: + // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error + ec = {}; + } + + if (ec) { + throw beast::system_error{ec}; + } + + return res; + } + else { + beast::tcp_stream stream{ioc}; + stream.connect(results); + + auto res = construct_and_send_http_request(stream); + + beast::error_code ec; + stream.socket().shutdown(tcp::socket::shutdown_both, ec); + + // not_connected happens sometimes, so don't bother reporting it. + if (ec && ec != beast::errc::not_connected) { + throw beast::system_error{ec}; + } + + return res; + } + } + catch (const std::exception& e) { + rodsLog(LOG_ERROR, fmt::format("{}: {}", __func__, e.what()).c_str()); + } + } + + return std::nullopt; + } // send_http_request + void apply_document_type_policy( ruleExecInfo_t* _rei, const std::string& _object_path, @@ -139,10 +239,6 @@ namespace { } // apply_document_type_policy - void log_fcn(elasticlient::LogLevel, const std::string& _msg) { - rodsLog(LOG_DEBUG, "ELASTICLIENT :: [%s]", _msg.c_str()); - } // log_fcn - std::string generate_id() { using namespace boost::archive::iterators; std::stringstream os; @@ -253,191 +349,154 @@ namespace { } } // update_object_metadata - void invoke_indexing_event_full_text( - ruleExecInfo_t* _rei, - const std::string& _object_path, - const std::string& _source_resource, - const std::string& _index_name) { - - try { - std::string doc_type{"text"}; - apply_document_type_policy( - _rei, - _object_path, - _source_resource, - &doc_type); - - const long read_size{config->read_size_}; - const int bulk_count{config->bulk_count_}; - const std::string object_id{get_object_index_id(_rei, _object_path)}; - - std::shared_ptr client = - std::make_shared( - config->hosts_); - elasticlient::Bulk bulkIndexer(client); - elasticlient::SameIndexBulkData bulk(_index_name, bulk_count); - - char read_buff[read_size]; - irods::experimental::io::server::basic_transport xport(*_rei->rsComm); - irods::experimental::io::idstream ds{xport, _object_path}; - - int chunk_counter{0}; - bool need_final_perform{false}; - while(ds) { - ds.read(read_buff, read_size); - std::string data{read_buff}; - - // filter out new line characters - data.erase( - std::remove_if( - data.begin(), - data.end(), - [](wchar_t c) {return (std::iscntrl(c) || c == '"' || c == '\'' || c == '\\');}), - data.end()); - - std::string index_id{ - boost::str( - boost::format( - "%s_%d") - % object_id - % chunk_counter)}; - ++chunk_counter; - - std::string payload{ - boost::str( - boost::format( - "{ \"absolutePath\" : \"%s\", \"data\" : \"%s\" }") - % _object_path - % data)}; - - need_final_perform = true; - bool done = bulk.indexDocument(doc_type, index_id, payload.data()); - if(done) { - need_final_perform = false; - // have reached bulk_count chunks - auto error_count = bulkIndexer.perform(bulk); - if(error_count > 0) { - rodsLog( - LOG_ERROR, - "Encountered %d errors when indexing [%s]", - error_count, - _object_path.c_str()); - } - bulk.clear(); - } - } // while - - if(need_final_perform) { - auto error_count = bulkIndexer.perform(bulk); - if(error_count > 0) { - rodsLog( - LOG_ERROR, - "Encountered %d errors when indexing [%s]", - error_count, - _object_path.c_str()); - } - bulk.clear(); - } - } - catch(const irods::exception& _e) { - rodsLog( - LOG_ERROR, - "Exception [%s]", - _e.what()); - auto irods_error = _e.code(); - if (irods_error != CAT_NO_ROWS_FOUND) { - THROW( - irods_error, - _e.what()); - } - } - catch(const std::runtime_error& _e) { - rodsLog( - LOG_ERROR, - "Exception [%s]", - _e.what()); - THROW( - SYS_INTERNAL_ERR, - _e.what()); - } - catch(const std::exception& _e) { - rodsLog( - LOG_ERROR, - "Exception [%s]", - _e.what()); - THROW( - SYS_INTERNAL_ERR, - _e.what()); - } - } // invoke_indexing_event_full_text - - void invoke_purge_event_full_text( - ruleExecInfo_t* _rei, - const std::string& _object_path, - const std::string& _source_resource, - const std::string& _index_name) { - - try { - std::string doc_type{"text"}; - apply_document_type_policy( - _rei, - _object_path, - _source_resource, - &doc_type); - - const long read_size{config->read_size_}; - const int bulk_count{config->bulk_count_}; - const std::string object_id{get_object_index_id(_rei, _object_path)}; - elasticlient::Client client{config->hosts_}; - - int chunk_counter{0}; - - bool done{false}; - while(!done) { - std::string index_id{ - boost::str( - boost::format( - "%s_%d") - % object_id - % chunk_counter)}; - ++chunk_counter; - const cpr::Response response = client.remove(_index_name, doc_type, index_id); - if(response.status_code != 200) { - done = true; - if(response.status_code == 404) { // meaningful for logging - rodsLog (LOG_NOTICE, boost::str(boost::format("elasticlient 404: no index entry for chunk (%d) of object_id '%d' " - "in index '%s'") % chunk_counter % object_id % _index_name).c_str()); - } - } - } // while - } - catch(const std::runtime_error& _e) { - rodsLog( - LOG_ERROR, - "Exception [%s]", - _e.what()); - THROW( - SYS_INTERNAL_ERR, - _e.what()); - } - catch(const irods::exception& _e) { - if (_e.code() == CAT_NO_ROWS_FOUND) { - return; - } - THROW( - SYS_INTERNAL_ERR, - _e.what()); - } - catch(const std::exception& _e) { - rodsLog( - LOG_ERROR, - "Exception [%s]", - _e.what()); - THROW( - SYS_INTERNAL_ERR, - _e.what()); - } - } // invoke_purge_event_full_text + void invoke_indexing_event_full_text(ruleExecInfo_t* _rei, + const std::string& _object_path, + const std::string& _source_resource, + const std::string& _index_name) + { + try { + const std::string object_id = get_object_index_id(_rei, _object_path); + std::vector buffer(config->read_size_); + irods::experimental::io::server::basic_transport xport(*_rei->rsComm); + irods::experimental::io::idstream in{xport, _object_path}; + + int chunk_counter{0}; + bool need_final_perform{false}; + std::stringstream ss; + + while (in) { + in.read(buffer.data(), buffer.size()); + + // The indexing instruction. + // clang-format off + ss << json{{"index", { + {"_id", fmt::format("{}_{}", object_id, chunk_counter++)} + }}}.dump() << '\n'; + // clang-format on + + // The defaults for the .dump() member function. + constexpr int indent = -1; + constexpr char indent_char = ' '; + constexpr bool ensure_ascii = false; + + // The data to index. + // The version of .dump() invoked here instructs the library to ignore + // invalid UTF-8 sequences. All bytes are copied to the output unchanged. + // clang-format off + ss << json{ + {"absolutePath", _object_path}, + {"data", std::string_view(buffer.data(), in.gcount())} + }.dump(indent, indent_char, ensure_ascii, json::error_handler_t::ignore) << '\n'; + // clang-format on + + // Send bulk request if chunk counter has reached bulk limit. + if (chunk_counter == config->bulk_count_) { + chunk_counter = 0; + + const auto res = send_http_request(http::verb::post, _index_name + "/_bulk", ss.str()); + + if (!res.has_value()) { + rodsLog(LOG_ERROR, "%s: No response from elasticsearch host.", __func__); + } + else { + rodsLog(LOG_ERROR, + "%s: Error sending request to elasticsearch host. [http_status_code=[%d]]", + __func__, + res->result_int()); + } + + ss.str(""); + } + } + + if (chunk_counter > 0) { + // Elasticsearch limits the maximum size of a HTTP request to 100mb. + // See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html. + const auto res = send_http_request(http::verb::post, _index_name + "/_bulk", ss.str()); + + if (!res.has_value()) { + rodsLog(LOG_ERROR, "%s: No response from elasticsearch host.", __func__); + } + else { + rodsLog(LOG_ERROR, + "%s: Error sending request to elasticsearch host. [http_status_code=[%d]]", + __func__, + res->result_int()); + } + } + } + catch (const irods::exception& _e) { + rodsLog(LOG_ERROR, "Exception [%s]", _e.what()); + auto irods_error = _e.code(); + if (irods_error != CAT_NO_ROWS_FOUND) { + THROW(irods_error, _e.what()); + } + } + catch (const std::runtime_error& _e) { + rodsLog(LOG_ERROR, "Exception [%s]", _e.what()); + THROW(SYS_INTERNAL_ERR, _e.what()); + } + catch (const std::exception& _e) { + rodsLog(LOG_ERROR, "Exception [%s]", _e.what()); + THROW(SYS_INTERNAL_ERR, _e.what()); + } + } // invoke_indexing_event_full_text + + void invoke_purge_event_full_text(ruleExecInfo_t* _rei, + const std::string& _object_path, + const std::string& _source_resource, + const std::string& _index_name) + { + try { + const std::string object_id{get_object_index_id(_rei, _object_path)}; + int chunk_counter{0}; + + while (true) { + const auto index_entry = fmt::format("{}/_doc/{}_{}", _index_name, object_id, chunk_counter++); + const auto response = send_http_request(http::verb::delete_, index_entry); + + if (!response.has_value()) { + rodsLog(LOG_ERROR, + "%s: No response from elasticsearch host. Index entry [%s] may not have been purged", + __func__, + index_entry.c_str()); + break; + } + + // Some objects will be split into multiple chunks. Because we don't track them, + // the only way to know when all chunks have been processed is to send requests until + // we receive a HTTP status code of 404. Here, we expand the status code range to + // anything other than 200. + if (response->result_int() != 200) { + if (response->result_int() == 404) { // meaningful for logging + rodsLog(LOG_NOTICE, + fmt::format("{}: No index entry for chunk [{}] of object_id [{}] in index [{}]", + __func__, + chunk_counter, + object_id, + _index_name) + .c_str()); + } + + break; + } + } + } + catch (const std::runtime_error& _e) { + rodsLog(LOG_ERROR, "Exception [%s]", _e.what()); + THROW(SYS_INTERNAL_ERR, _e.what()); + } + catch (const irods::exception& _e) { + if (_e.code() == CAT_NO_ROWS_FOUND) { + return; + } + THROW(SYS_INTERNAL_ERR, _e.what()); + } + catch (const std::exception& _e) { + rodsLog(LOG_ERROR, "Exception [%s]", _e.what()); + THROW(SYS_INTERNAL_ERR, _e.what()); + } + } // invoke_purge_event_full_text std::string get_metadata_index_id( const std::string& _index_id, @@ -459,135 +518,133 @@ namespace { } // get_metadata_index_id - void invoke_indexing_event_metadata( - ruleExecInfo_t* _rei, - const std::string& _object_path, - const std::string& _attribute, - const std::string& _value, - const std::string& _unit, - const std::string& _index_name, - nlohmann::json & obj_meta ) { - - try { - bool is_coll{}; - elasticlient::Client client{config->hosts_}; - auto object_id = get_object_index_id( _rei, _object_path, &is_coll); - - std::optional jsonarray; - get_metadata_for_object_index_id( _rei, object_id, is_coll, jsonarray ); - if (!jsonarray) { - irods::log( LOG_WARNING, fmt::format("In {}, function {}: Aborted indexing metadata, null AVU array returned for object [{}]", - __FILE__, __func__,_object_path)); - return; - } - obj_meta ["metadataEntries"] = *jsonarray; - - const cpr::Response response = ElasticSearch::index(config->es_version_, client, _index_name, "text", object_id, obj_meta.dump()); - - if(response.status_code != 200 && response.status_code != 201) { - THROW( - SYS_INTERNAL_ERR, - boost::format("failed to index metadata [%s] [%s] [%s] for [%s] code [%d] message [%s]") - % _attribute - % _value - % _unit - % _object_path - % response.status_code - % response.text); - } - } - catch(const irods::exception& _e) { - rodsLog( - LOG_ERROR, - "Exception [%s]", - _e.what()); - auto irods_error = _e.code(); - if (irods_error != CAT_NO_ROWS_FOUND) { - THROW( - irods_error, - _e.what()); - } - } - catch(const std::runtime_error& _e) { - rodsLog( - LOG_ERROR, - "Exception [%s]", - _e.what()); - THROW( - SYS_INTERNAL_ERR, - _e.what()); - } - catch(const std::exception& _e) { - rodsLog( - LOG_ERROR, - "Exception [%s]", - _e.what()); - THROW( - SYS_INTERNAL_ERR, - _e.what()); - } - } // invoke_indexing_event_metadata - - void invoke_purge_event_metadata( - ruleExecInfo_t* _rei, - const std::string& _object_path, - const std::string& _attribute, - const std::string& _value, - const std::string& _unit, - const std::string& _index_name, const nlohmann::json & = {} ) - { - - try { - elasticlient::Client client{config->hosts_}; - namespace fsvr = irods::experimental::filesystem; - // we now accept object id or path here, so pep_api_rm_coll_post can purge - std::string object_id { - fsvr::path{_object_path}.is_absolute() ? get_object_index_id( _rei, _object_path) - : _object_path - }; - const cpr::Response response = ElasticSearch::remove(config->es_version_, client, _index_name, "text", object_id); - switch(response.status_code) { - // either the index has been deleted, or the AVU was cleared unexpectedly - case 404: - rodsLog (LOG_NOTICE, boost::str(boost::format("elasticlient 404: no index entry for AVU (%s,%s,%s) on object '%s' in " - "index '%s'") % _attribute % _value % _unit % _object_path % _index_name).c_str()); - break; - // routinely expected return codes ( not logged ): - case 200: break; - case 201: break; - // unexpected return codes: - default: - THROW( - SYS_INTERNAL_ERR, - boost::format("failed to index metadata [%s] [%s] [%s] for [%s] code [%d] message [%s]") - % _attribute - % _value - % _unit - % _object_path - % response.status_code - % response.text); - } - } - catch(const std::runtime_error& _e) { - rodsLog( - LOG_ERROR, - "Exception [%s]", - _e.what()); - THROW( - SYS_INTERNAL_ERR, - _e.what()); - } - catch(const std::exception& _e) { - rodsLog( - LOG_ERROR, - "Exception [%s]", - _e.what()); - THROW( - SYS_INTERNAL_ERR, - _e.what()); - } - - } // invoke_purge_event_metadata + void invoke_indexing_event_metadata(ruleExecInfo_t* _rei, + const std::string& _object_path, + const std::string& _attribute, + const std::string& _value, + const std::string& _unit, + const std::string& _index_name, + nlohmann::json& obj_meta) + { + try { + bool is_coll{}; + auto object_id = get_object_index_id(_rei, _object_path, &is_coll); + + std::optional jsonarray; + get_metadata_for_object_index_id(_rei, object_id, is_coll, jsonarray); + if (!jsonarray) { + irods::log(LOG_WARNING, + fmt::format( + "In {}, function {}: Aborted indexing metadata, null AVU array returned for object [{}]", + __FILE__, + __func__, + _object_path)); + return; + } + obj_meta["metadataEntries"] = *jsonarray; + + const auto response = + send_http_request(http::verb::put, fmt::format("{}/_doc/{}", _index_name, object_id), obj_meta.dump()); + + if (!response.has_value()) { + THROW(SYS_INTERNAL_ERR, + fmt::format("failed to index metadata [{}] [{}] [{}] for [{}]. No response.", + _attribute, + _value, + _unit, + _object_path)); + } + + if (response->result_int() != 200 && response->result_int() != 201) { + THROW(SYS_INTERNAL_ERR, + fmt::format("failed to index metadata [{}] [{}] [{}] for [{}] code [{}] message [{}]", + _attribute, + _value, + _unit, + _object_path, + response->result_int(), + response->body())); + } + } + catch (const irods::exception& _e) { + rodsLog(LOG_ERROR, "Exception [%s]", _e.what()); + auto irods_error = _e.code(); + if (irods_error != CAT_NO_ROWS_FOUND) { + THROW(irods_error, _e.what()); + } + } + catch (const std::runtime_error& _e) { + rodsLog(LOG_ERROR, "Exception [%s]", _e.what()); + THROW(SYS_INTERNAL_ERR, _e.what()); + } + catch (const std::exception& _e) { + rodsLog(LOG_ERROR, "Exception [%s]", _e.what()); + THROW(SYS_INTERNAL_ERR, _e.what()); + } + } // invoke_indexing_event_metadata + + void invoke_purge_event_metadata(ruleExecInfo_t* _rei, + const std::string& _object_path, + const std::string& _attribute, + const std::string& _value, + const std::string& _unit, + const std::string& _index_name, + const nlohmann::json& = {}) + { + try { + namespace fs = irods::experimental::filesystem; + + // we now accept object id or path here, so pep_api_rm_coll_post can purge + const auto object_id = + fs::path{_object_path}.is_absolute() ? get_object_index_id(_rei, _object_path) : _object_path; + + const auto response = + send_http_request(http::verb::delete_, fmt::format("{}/_doc/{}", _index_name, object_id)); + + if (!response.has_value()) { + auto msg = fmt::format("{}: No response from elasticsearch host.", __func__); + rodsLog(LOG_ERROR, msg.c_str()); + THROW(SYS_INTERNAL_ERR, std::move(msg)); + } + + switch (response->result_int()) { + // either the index has been deleted, or the AVU was cleared unexpectedly + case 404: + rodsLog(LOG_NOTICE, + fmt::format("received HTTP status code of 404: no index entry for AVU ({}, {}, {}) on " + "object [{}] in index [{}]", + _attribute, + _value, + _unit, + _object_path, + _index_name) + .c_str()); + break; + // routinely expected return codes ( not logged ): + case 200: + case 201: + break; + // unexpected return codes: + default: + THROW(SYS_INTERNAL_ERR, + fmt::format("failed to index metadata [{}] [{}] [{}] for [{}] code [{}] message [{}]", + _attribute, + _value, + _unit, + _object_path, + response->result_int(), + response->body())); + } + } + catch (const std::runtime_error& _e) { + rodsLog(LOG_ERROR, "Exception [%s]", _e.what()); + THROW(SYS_INTERNAL_ERR, _e.what()); + } + catch (const std::exception& _e) { + rodsLog(LOG_ERROR, "Exception [%s]", _e.what()); + THROW(SYS_INTERNAL_ERR, _e.what()); + } + } // invoke_purge_event_metadata } // namespace @@ -610,9 +667,6 @@ irods::error start( irods::indexing::policy::metadata::purge, "elasticsearch"); - if (getRodsLogLevel() > LOG_NOTICE) { - elasticlient::setLogFunction(log_fcn); - } return SUCCESS(); } @@ -722,50 +776,63 @@ irods::error exec_rule( } else if(_rn == "irods_policy_recursive_rm_object_by_path") { - using nlohmann::json; - auto it = _args.begin(); - const std::string the_path{ boost::any_cast(*it) }; - std::advance( it, 2 ); - const json recurse_info = json::parse(boost::any_cast(*it)); - auto escape = [] (std::string path_) -> std::string { boost::replace_all ( path_, "\\" , "\\\\"); - boost::replace_all ( path_, "?" , "\\?"); - boost::replace_all ( path_, "*" , "\\*"); - return path_;}; - auto escaped_path = escape(the_path); - std::string JtopLevel = json{{"query",{{"match",{{"absolutePath",escaped_path}} }} }}.dump(); - std::string JsubObject{""}; - try { - if (recurse_info["is_collection"].get()) { - JsubObject = json{{"query",{{"wildcard",{{"absolutePath",{{"value",escaped_path+"/*"}} }} }} }}.dump(); - } - } - catch(const std::domain_error & e) { - return ERROR(-1,fmt::format("_delete_by_query - stopped short of performRequest - domain_error: {}",e.what())); - } - elasticlient::Client client { config->hosts_ }; - - try { - rsComm_t& comm = *rei->rsComm; - for (const std::string & e : recurse_info["indices"]) { - const std::string del_by_query_URL { e + "/_delete_by_query" } ; - for (const std::string &json_out: {JtopLevel,JsubObject}) { - if (json_out == "") { continue; } - auto response = client.performRequest( HTTPMethod::POST, del_by_query_URL, json_out); - if(response.status_code != 200) { - irods::log( LOG_WARNING, fmt::format("_delete_by_query - response code not 200" - "\n\t- for path [{}]" - "\n\t- escaped as [{}]" - "\n\t- json request body is [{}]",the_path,escaped_path,json_out)); - } - } - } - } - catch (const elasticlient::ConnectionException & e) { - irods::log(LOG_ERROR, fmt::format("Cannot reach elasticsearch on : [{}]",fmt::join(config->hosts_, ", "))); - } - catch (const nlohmann::json::parse_error & e) { - irods::log(LOG_ERROR, fmt::format("JSON parse exception : [{}]", e.what())); - } + auto it = _args.begin(); + const std::string the_path{boost::any_cast(*it)}; + std::advance(it, 2); + const json recurse_info = json::parse(boost::any_cast(*it)); + + const auto escaped_path = [p = the_path]() mutable { + boost::replace_all(p, "\\", "\\\\"); + boost::replace_all(p, "?", "\\?"); + boost::replace_all(p, "*", "\\*"); + return p; + }(); + + std::string JtopLevel = json{{"query", {{"match", {{"absolutePath", escaped_path}}}}}}.dump(); + std::string JsubObject; + + try { + if (recurse_info.at("is_collection").get()) { + JsubObject = + json{{"query", {{"wildcard", {{"absolutePath", {{"value", escaped_path + "/*"}}}}}}}} + .dump(); + } + } + catch (const json::exception& e) { + return ERROR(SYS_LIBRARY_ERROR, e.what()); + } + + try { + for (const auto& e : recurse_info.at("indices")) { + const auto& index_name = e.get_ref(); + + for (const std::string& query_input : {JtopLevel, JsubObject}) { + if (query_input.empty()) { + continue; + } + + const auto response = send_http_request( + http::verb::post, fmt::format("{}/_delete_by_query", index_name), query_input); + + if (!response.has_value()) { + rodsLog(LOG_ERROR, + fmt::format("{}: No response from elasticsearch host.", __func__).c_str()); + continue; + } + + if (response->result_int() != 200) { + rodsLog( + LOG_WARNING, + fmt::format( + "{}: _delete_by_query failed [rule=[{}], path=[{}]", __func__, _rn, the_path) + .c_str()); + } + } + } + } + catch (const nlohmann::json::parse_error& e) { + rodsLog(LOG_ERROR, fmt::format("JSON parse exception : [{}]", e.what()).c_str()); + } } // "irods_policy_recursive_rm_object_by_path" else { return ERROR( diff --git a/packaging/test_plugin_indexing.py b/packaging/test_plugin_indexing.py index 1b432bf..1511a0f 100644 --- a/packaging/test_plugin_indexing.py +++ b/packaging/test_plugin_indexing.py @@ -166,9 +166,7 @@ class BooksUnknownError(RuntimeError): pass return retvalue -ES_VERSION = '7.x' -def es7_exactly (): return '8.' > ES_VERSION >= '7.' -def es7_or_later(): return ES_VERSION >= '7.' +ES_VERSION = '7.x' # TODO We probably don't need this anymore. @contextlib.contextmanager def indexing_plugin__installed(indexing_config=(), server_env_options={}): @@ -245,23 +243,21 @@ def install_python3_virtualenv_with_python_irodsclient(PATH='~/py3',preTestPRCIn # Assuming use for metadata style of index only def search_index_for_avu_attribute_name(index_name, attr_name, port = ELASTICSEARCH_PORT): - maptype = "" if es7_or_later() else "/text" - track_num_hits_as_int = "&track_total_hits=true&rest_total_hits_as_int=true" if es7_exactly() else "" out,_,rc = lib.execute_command_permissive( dedent("""\ - curl -X GET -H'Content-Type: application/json' HTTP://localhost:{port}/{index_name}{maptype}/_search?pretty=true{track_num_hits_as_int} -d ' + curl -X GET -H'Content-Type: application/json' http://localhost:{port}/{index_name}/_search?track_total_hits=true&rest_total_hits_as_int=true -d ' {{ "from": 0, "size" : 500, "_source" : ["absolutePath", "metadataEntries"], "query" : {{ "nested": {{ - "path": "metadataEntries", - "query": {{ - "bool": {{ - "must": [ - {{ "match": {{ "metadataEntries.attribute": "{attr_name}" }} }} - ] + "path": "metadataEntries", + "query": {{ + "bool": {{ + "must": [ + {{ "match": {{ "metadataEntries.attribute": "{attr_name}" }} }} + ] + }} }} - }} }} }} }}' """).format(**locals())) @@ -273,9 +269,8 @@ def search_index_for_avu_attribute_name(index_name, attr_name, port = ELASTICSEA def search_index_for_object_path(index_name, path_component, extra_source_fields="", port = ELASTICSEARCH_PORT): path_component_matcher = ("*/" + path_component + ("/*" if not path_component.endswith("$") else "")).rstrip("$") - track_num_hits_as_int = "&track_total_hits=true&rest_total_hits_as_int=true" if es7_exactly() else "" out,_,rc = lib.execute_command_permissive( dedent("""\ - curl -X GET -H'Content-Type: application/json' HTTP://localhost:{port}/{index_name}/text/_search?pretty=true{track_num_hits_as_int} -d ' + curl -X GET -H'Content-Type: application/json' http://localhost:{port}/{index_name}/_search?track_total_hits=true&rest_total_hits_as_int=true -d ' {{ "from": 0, "size" : 500, "_source" : ["absolutePath" {extra_source_fields} ], @@ -294,9 +289,8 @@ def search_index_for_object_path(index_name, path_component, extra_source_fields # Assuming use for fulltext style of index only def search_index_for_All_object_paths(index_name, port = ELASTICSEARCH_PORT): - track_num_hits_as_int = "&track_total_hits=true&rest_total_hits_as_int=true" if es7_exactly() else "" out,_,rc = lib.execute_command_permissive( dedent("""\ - curl -X GET -H'Content-Type: application/json' HTTP://localhost:{port}/{index_name}/text/_search?pretty=true{track_num_hits_as_int} -d ' + curl -X GET -H'Content-Type: application/json' http://localhost:{port}/{index_name}/_search?track_total_hits=true&rest_total_hits_as_int=true -d ' {{ "from": 0, "size" : 500, "_source" : ["absolutePath", "data"], @@ -308,60 +302,45 @@ def search_index_for_All_object_paths(index_name, port = ELASTICSEARCH_PORT): return out def create_fulltext_index(index_name = DEFAULT_FULLTEXT_INDEX, port = ELASTICSEARCH_PORT): - OPTION = "?include_type_name" if es7_or_later() else "" #--> ES7 allows 'text' mapping but requires hint - lib.execute_command("curl -X PUT -H'Content-Type: application/json' http://localhost:{port}/{index_name}".format(**locals())) - lib.execute_command("curl -X PUT -H'Content-Type: application/json' http://localhost:{port}/{index_name}/_mapping/text{OPTION} ".format(**locals()) + - """ -d '{ "properties" : { "absolutePath" : { "type" : "keyword" }, "data" : { "type" : "text" } } }'""") + mapping = json.dumps({ + "mappings": { + "properties": { + "absolutePath": {"type": "keyword"}, + "data": {"type": "text"} + } + } + }) + lib.execute_command("curl -X PUT -H'Content-Type: application/json' http://localhost:{port}/{index_name} -d'{mapping}'".format(**locals())) return index_name def create_metadata_index(index_name = DEFAULT_METADATA_INDEX, port = ELASTICSEARCH_PORT): - OPTION = "" if es7_or_later() else "/text" #--> switch away from 'text' mapping if using >= ES7 - lib.execute_command("curl -X PUT -H'Content-Type: application/json' http://localhost:{port}/{index_name}".format(**locals())) - lib.execute_command("curl -X PUT -H'Content-Type: application/json' http://localhost:{port}/{index_name}/_mapping{OPTION} ".format(**locals()) + - """ -d '{ "properties" : { - "url": { - "type": "text" - }, - "zoneName": { - "type": "keyword" - }, - "absolutePath": { - "type": "keyword" - }, - "fileName": { - "type": "text" - }, - "parentPath": { - "type": "text" - }, - "isFile": { - "type": "boolean" - }, - "dataSize": { - "type": "long" - }, - "mimeType": { - "type": "keyword" - }, - "lastModifiedDate": { - "type": "date", - "format": "epoch_second" - }, - "metadataEntries": { - "type": "nested", - "properties": { - "attribute": { - "type": "keyword" - }, - "value": { - "type": "text" - }, - "unit": { - "type": "keyword" - } - } - } - }}' """) + mapping = json.dumps({ + "mappings": { + "properties": { + "url": {"type": "text"}, + "zoneName": {"type": "keyword"}, + "absolutePath": {"type": "keyword"}, + "fileName": {"type": "text"}, + "parentPath": {"type": "text"}, + "isFile": {"type": "boolean"}, + "dataSize": {"type": "long"}, + "mimeType": {"type": "keyword"}, + "lastModifiedDate": { + "type": "date", + "format": "epoch_second" + }, + "metadataEntries": { + "type": "nested", + "properties": { + "attribute": {"type": "keyword"}, + "value": {"type": "text"}, + "unit": {"type": "keyword"} + } + } + } + } + }) + lib.execute_command("curl -X PUT -H'Content-Type: application/json' http://localhost:{port}/{index_name} -d'{mapping}'".format(**locals())) return index_name def delete_fulltext_index(index_name = DEFAULT_FULLTEXT_INDEX, port = ELASTICSEARCH_PORT):