Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] fix: Add keyword fields to sentence and language_script on crf index #469

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion datastore/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def create(self, err_if_exists=True, **kwargs):
self._connect()

if self._engine == ELASTICSEARCH:
es_url = elastic_search.connect.get_es_url()
create_map = [ # TODO: use namedtuples
(True, ELASTICSEARCH_INDEX_1, ELASTICSEARCH_DOC_TYPE, self._store_name,
self._check_doc_type_for_elasticsearch, elastic_search.create.create_entity_index),
Expand Down
191 changes: 110 additions & 81 deletions datastore/elastic_search/create.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import List, Dict, Any
from typing import List, Any

from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
Expand All @@ -9,6 +9,8 @@
log_prefix = 'datastore.elastic_search.create'


# TODO: `connection` should be called `client`

def exists(connection, index_name):
# type: (Elasticsearch, str) -> bool
"""
Expand Down Expand Up @@ -58,16 +60,14 @@ def delete_index(connection, index_name, logger, err_if_does_not_exist=True, **k
logger.debug('%s: Delete Index %s: Operation successfully completed', log_prefix, index_name)


def _create_index(connection, index_name, doc_type, logger, mapping_body, err_if_exists=True, **kwargs):
# type: (Elasticsearch, str, str, logging.Logger, Dict[str, Any], bool, **Any) -> None
def _create_index(connection, index_name, err_if_exists=True, **kwargs):
# type: (Elasticsearch, str, bool, **Any) -> None
"""
Creates an Elasticsearch index needed for similarity based searching

Args:
connection: Elasticsearch client object
index_name: The name of the index
doc_type: The type of the documents that will be indexed
logger: logging object to log at debug and exception level
mapping_body: dict, mappings to put on the index
err_if_exists: if to raise error if the index already exists, defaults to True
kwargs:
master_timeout: Specify timeout for connection to master
Expand All @@ -90,101 +90,81 @@ def _create_index(connection, index_name, doc_type, logger, mapping_body, err_if
'Datastore().delete()'.format(index_name))
else:
return
try:
body = {
'index': {
'analysis': {
'analyzer': {
'my_analyzer': {
'tokenizer': 'standard',
'filter': ['standard', 'lowercase', 'my_stemmer']
}
},
'filter': {
'my_stemmer': {
'type': 'stemmer',
'name': 'english'
}

body = {
'index': {
'analysis': {
'analyzer': {
'my_analyzer': {
'tokenizer': 'standard',
'filter': ['standard', 'lowercase', 'my_stemmer']
}
},
'filter': {
'my_stemmer': {
'type': 'stemmer',
'name': 'english'
}
}
}
}
}

# At this point in time, elasticsearch-py doesn't accept arbitrary kwargs, so we have to filter kwargs per
# method. Refer https://github.com/elastic/elasticsearch-py/blob/master/elasticsearch/client/indices.py
create_kwargs = filter_kwargs(kwargs=kwargs,
keep_kwargs_keys=['master_timeout', 'timeout', 'update_all_types',
'wait_for_active_shards'])
connection.indices.create(index=index_name, body=body, **create_kwargs)

put_mapping_kwargs = filter_kwargs(kwargs=kwargs, keep_kwargs_keys=['allow_no_indices', 'expand_wildcards',
'ignore_unavailable',
'master_timeout', 'timeout',
'update_all_types'])
if doc_type:
connection.indices.put_mapping(body=mapping_body, index=index_name, doc_type=doc_type,
**put_mapping_kwargs)
else:
logger.debug('%s: doc_type not in arguments, skipping put_mapping on index ...' % log_prefix)
logger.debug('%s: Create Index %s: Operation successfully completed', log_prefix, index_name)
except Exception as e:
logger.exception('%s: Exception while creating index %s, Rolling back \n %s', log_prefix, index_name, e)
delete_index(connection=connection, index_name=index_name, logger=logger)
raise e
# At this point in time, elasticsearch-py doesn't accept arbitrary kwargs, so we have to filter kwargs per
# method. Refer https://github.com/elastic/elasticsearch-py/blob/master/elasticsearch/client/indices.py
valid_create_kwargs = ['master_timeout', 'timeout', 'update_all_types', 'wait_for_active_shards']
create_kwargs = filter_kwargs(kwargs=kwargs, keep_kwargs_keys=valid_create_kwargs)
connection.indices.create(index=index_name, body=body, **create_kwargs)


def create_entity_index(connection, index_name, doc_type, logger, **kwargs):
# type: (Elasticsearch, str, str, logging.Logger, **Any) -> None
def _put_index_mapping(connection, index_name, doc_type, mapping_body, **kwargs):
# type: (Elasticsearch, str, str, Dict[str, Any], **Any) -> None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F821 undefined name 'Dict'

"""
Creates an mapping specific to entity storage in elasticsearch and makes a call to create_index
to create the index with the given mapping body
Args:
connection: Elasticsearch client object
index_name: The name of the index
doc_type: The type of the documents that will be indexed
doc_type: The type of the documents that will be indexed
logger: logging object to log at debug and exception level
mapping_body: dict, mappings to put on the index
Args:
connection:
index_name:
doc_type:
**kwargs:
master_timeout: Specify timeout for connection to master
timeout: Explicit operation timeout
update_all_types: Whether to update the mapping for all fields with the same name across all types or not
wait_for_active_shards: Set the number of active shards to wait for before the operation returns.
doc_type: The name of the document type
allow_no_indices: Whether to ignore if a wildcard indices expression resolves into no concrete indices.
(This includes _all string or when no indices have been specified)
expand_wildcards: Whether to expand wildcard expression to concrete indices that are open, closed or both.,
default 'open', valid choices are: 'open', 'closed', 'none', 'all'
ignore_unavailable: Whether specified concrete indices should be ignored when unavailable
(missing or closed)

Refer https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.client.IndicesClient.create
Refer https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.client.IndicesClient.put_mapping
Returns:

"""
mapping_body = {

valid_mapping_kwargs = ['allow_no_indices', 'expand_wildcards', 'ignore_unavailable', 'master_timeout',
'timeout', 'update_all_types']
put_mapping_kwargs = filter_kwargs(kwargs=kwargs, keep_kwargs_keys=valid_mapping_kwargs)
connection.indices.put_mapping(body=mapping_body, index=index_name, doc_type=doc_type, **put_mapping_kwargs)


def _get_entity_index_mapping(doc_type):
return {
doc_type: {
'properties': {
'entity_data': {
'type': 'text',
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 32766}},
},
'language_script': {
'type': 'text',
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}},
},
'value': {
'type': 'text',
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}},
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 32766}},
},
'variants': {
'type': 'text',
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}},
'analyzer': 'my_analyzer',
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 32766}},
'norms': {'enabled': False}, # Needed if we want to give longer variants higher scores
},
# other removed/unused fields, kept only for backward compatibility
'dict_type': {
'type': 'text',
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}},
},
'entity_data': {
'type': 'text',
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}},
},
'source_language': {
'type': 'text',
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}},
Expand All @@ -193,13 +173,12 @@ def create_entity_index(connection, index_name, doc_type, logger, **kwargs):
}
}

_create_index(connection, index_name, doc_type, logger, mapping_body, **kwargs)


def create_crf_index(connection, index_name, doc_type, logger, **kwargs):
def create_entity_index(connection, index_name, doc_type, logger, **kwargs):
# type: (Elasticsearch, str, str, logging.Logger, **Any) -> None
"""
This method is used to create an index with mapping suited for story training_data
Creates an mapping specific to entity storage in elasticsearch and makes a call to create_index
to create the index with the given mapping body
Args:
connection: Elasticsearch client object
index_name: The name of the index
Expand All @@ -221,26 +200,76 @@ def create_crf_index(connection, index_name, doc_type, logger, **kwargs):
Refer https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.client.IndicesClient.create
Refer https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.client.IndicesClient.put_mapping
"""
mapping_body = {
try:
_create_index(connection, index_name, **kwargs)
mapping = _get_entity_index_mapping(doc_type)
_put_index_mapping(connection, index_name, doc_type, mapping, **kwargs)
logger.debug('%s: Create Index %s: Operation successfully completed', log_prefix, index_name)
except Exception as e:
logger.exception('%s: Exception while creating index %s, Rolling back \n %s', log_prefix, index_name, e)
delete_index(connection=connection, index_name=index_name, logger=logger)
raise e


def _get_crf_index_mapping(doc_type):
return {
doc_type: {
'properties': {
'entity_data': {
'type': 'text'
'type': 'text',
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}},
},
'sentence': {
'enabled': False
'type': 'text',
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 32766}},
},
'entities': {
'enabled': False
'type': 'text',
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 32766}},
},
'language_script': {
'type': 'text'
'type': 'text',
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}},
}
}
}
}

_create_index(connection, index_name, doc_type, logger, mapping_body, **kwargs)

def create_crf_index(connection, index_name, doc_type, logger, **kwargs):
# type: (Elasticsearch, str, str, logging.Logger, **Any) -> None
"""
This method is used to create an index with mapping suited for story training_data
Args:
connection: Elasticsearch client object
index_name: The name of the index
doc_type: The type of the documents that will be indexed
logger: logging object to log at debug and exception level
**kwargs:
master_timeout: Specify timeout for connection to master
timeout: Explicit operation timeout
update_all_types: Whether to update the mapping for all fields with the same name across all types or not
wait_for_active_shards: Set the number of active shards to wait for before the operation returns.
doc_type: The name of the document type
allow_no_indices: Whether to ignore if a wildcard indices expression resolves into no concrete indices.
(This includes _all string or when no indices have been specified)
expand_wildcards: Whether to expand wildcard expression to concrete indices that are open, closed or both.,
default 'open', valid choices are: 'open', 'closed', 'none', 'all'
ignore_unavailable: Whether specified concrete indices should be ignored when unavailable
(missing or closed)

Refer https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.client.IndicesClient.create
Refer https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.client.IndicesClient.put_mapping
"""
try:
_create_index(connection, index_name, **kwargs)
mapping = _get_crf_index_mapping(doc_type)
_put_index_mapping(connection, index_name, doc_type, mapping, **kwargs)
logger.debug('%s: Create Index %s: Operation successfully completed', log_prefix, index_name)
except Exception as e:
logger.exception('%s: Exception while creating index %s, Rolling back \n %s', log_prefix, index_name, e)
delete_index(connection=connection, index_name=index_name, logger=logger)
raise e


def create_alias(connection, index_list, alias_name, logger, **kwargs):
Expand Down