Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tolerate failure #183

Open
wants to merge 6 commits into
base: 4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/Dockerfile.elasticsearch
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ COPY --chown=elasticsearch:elasticsearch spec/fixtures/test_certs/* $es_path/con

RUN if [ "$SECURE_INTEGRATION" = "true" ] ; then echo "xpack.security.http.ssl.enabled: true" >> $es_yml; fi
RUN if [ "$SECURE_INTEGRATION" = "true" ] ; then echo "xpack.security.http.ssl.key: $es_path/config/test_certs/es.key" >> $es_yml; fi
RUN if [ "$SECURE_INTEGRATION" = "true" ] ; then echo "xpack.security.http.ssl.certificate: $es_path/config/test_certs/es.crt" >> $es_yml; fi
RUN if [ "$SECURE_INTEGRATION" = "true" ] ; then echo "xpack.security.http.ssl.certificate: $es_path/config/test_certs/es.chain.crt" >> $es_yml; fi
RUN if [ "$SECURE_INTEGRATION" = "true" ] ; then echo "xpack.security.http.ssl.certificate_authorities: [ '$es_path/config/test_certs/ca.crt' ]" >> $es_yml; fi
RUN if [ "$SECURE_INTEGRATION" = "true" ] ; then echo "xpack.security.http.ssl.verification_mode: certificate" >> $es_yml; fi
RUN if [ "$SECURE_INTEGRATION" = "true" ] && [ -n "$ES_SSL_SUPPORTED_PROTOCOLS" ] ; then echo "xpack.security.http.ssl.supported_protocols: ${ES_SSL_SUPPORTED_PROTOCOLS}" >> $es_yml; fi
Expand Down
11 changes: 11 additions & 0 deletions .ci/setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# user_agent requires /etc/protocols, which is provided by netbase.
# https://github.com/jruby/jruby/issues/3955
if [ ! -f "/etc/protocols" ]; then
if [ $(command -v apt-get) ]; then
echo "installing netbase with apt-get"
sudo apt-get install -y netbase
else
echo "installing netbase with yum"
sudo yum install -y netbase
fi
fi
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 4.21.1
- Fix: prevent plugin crash when hits contain illegal structure [#183](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/183)
- When a hit cannot be converted to an event, the input now emits an event tagged with `_elasticsearch_input_failure` with an `[event][original]` containing a JSON-encoded string representation of the entire hit.

## 4.21.0
- Add support for custom headers [#217](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/217)

Expand Down
10 changes: 10 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ The plugin logs a warning when ECS is enabled and `target` isn't set.

TIP: Set the `target` option to avoid potential schema conflicts.

[id="plugins-{type}s-{plugin}-failure-handling"]
==== Failure handling

When this input plugin cannot create a structured `Event` from a hit result, it will instead create an `Event` that is tagged with `_elasticsearch_input_failure` whose `[event][original]` is a JSON-encoded string representation of the entire hit.

Common causes are:

- When the hit result contains top-level fields that are {logstash-ref}/processing.html#reserved-fields[reserved in Logstash] but do not have the expected shape. Use the <<plugins-{type}s-{plugin}-target>> directive to avoid conflicts with the top-level namespace.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karenzone can I get 👀 on this one to make sure I'm not risking breaking the docs build?

It looks like we added {logstash-ref}/processing.html#reserved-fields in 7.7 (it doesn't exist on 7.6 and before), but is present on all of the versions of the reference available in the quick picker (master, current, 8.5, 8.4, 7.17).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we added {logstash-ref}/processing.html#reserved-fields in 7.7 (it doesn't exist on 7.6 and before), but is present on all of the versions of the reference available in the quick picker (master, current, 8.5, 8.4, 7.17).

The gemlock release file controls which plugin versions get mapped/picked up for each stack version when we run docgen. For that reason, the plugin<->stack versioning generally works itself out without any manual intervention from us.

yaauie marked this conversation as resolved.
Show resolved Hide resolved
- When <<plugins-{type}s-{plugin}-docinfo>> is enabled and the docinfo fields cannot be merged into the hit result. Combine <<plugins-{type}s-{plugin}-target>> and <<plugins-{type}s-{plugin}-docinfo_target>> to avoid conflict.
yaauie marked this conversation as resolved.
Show resolved Hide resolved

[id="plugins-{type}s-{plugin}-options"]
==== Elasticsearch Input configuration options

Expand Down
30 changes: 24 additions & 6 deletions lib/logstash/inputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -350,24 +350,42 @@ def run(output_queue)
end

##
# @param output_queue [#<<]
# @param scroll_id [String]: a scroll id to resume
# @return [Array(Boolean,String)]: a tuple representing whether the response
#
def process_next_scroll(output_queue, scroll_id)
r = scroll_request(scroll_id)
r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
[r['hits']['hits'].any?, r['_scroll_id']]
end

# This can be called externally from the query_executor
public
def push_hit(hit, output_queue, root_field = '_source')
event = targeted_event_factory.new_event hit[root_field]
set_docinfo_fields(hit, event) if @docinfo
event = event_from_hit(hit, root_field)
decorate(event)
output_queue << event
end

def event_from_hit(hit, root_field)
event = targeted_event_factory.new_event hit[root_field]
set_docinfo_fields(hit, event) if @docinfo

event
rescue => e
serialized_hit = hit.to_json
logger.warn("Event creation error, original data now in [event][original] field", message: e.message, exception: e.class, data: serialized_hit)
return event_factory.new_event('event' => { 'original' => serialized_hit }, 'tags' => ['_elasticsearch_input_failure'])
end

def set_docinfo_fields(hit, event)
# do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event.
docinfo_target = event.get(@docinfo_target) || {}

unless docinfo_target.is_a?(Hash)
@logger.error("Incompatible Event, incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :docinfo_target_type => docinfo_target.class, :event => event.to_hash_with_metadata)

# TODO: (colin) I am not sure raising is a good strategy here?
raise Exception.new("Elasticsearch input: incompatible event")
# expect error to be handled by `#event_from_hit`
fail RuntimeError, "Incompatible event; unable to merge docinfo fields into docinfo_target=`#{@docinfo_target}`"
end

@docinfo_fields.each do |field|
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-elasticsearch'
s.version = '4.21.0'
s.version = '4.21.1'
s.licenses = ['Apache License (2.0)']
s.summary = "Reads query results from an Elasticsearch cluster"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
1 change: 1 addition & 0 deletions spec/fixtures/test_certs/GENERATED_AT
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2024-12-26T22:27:15+00:00
35 changes: 17 additions & 18 deletions spec/fixtures/test_certs/ca.crt
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIDSTCCAjGgAwIBAgIUUcAg9c8B8jiliCkOEJyqoAHrmccwDQYJKoZIhvcNAQEL
BQAwNDEyMDAGA1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5l
cmF0ZWQgQ0EwHhcNMjEwODEyMDUxNDU1WhcNMjQwODExMDUxNDU1WjA0MTIwMAYD
VQQDEylFbGFzdGljIENlcnRpZmljYXRlIFRvb2wgQXV0b2dlbmVyYXRlZCBDQTCC
ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAK1HuusRuGNsztd4EQvqwcMr
8XvnNNaalerpMOorCGySEFrNf0HxDIVMGMCrOv1F8SvlcGq3XANs2MJ4F2xhhLZr
PpqVHx+QnSZ66lu5R89QVSuMh/dCMxhNBlOA/dDlvy+EJBl9H791UGy/ChhSgaBd
OKVyGkhjErRTeMIq7rR7UG6GL/fV+JGy41UiLrm1KQP7/XVD9UzZfGq/hylFkTPe
oox5BUxdxUdDZ2creOID+agtIYuJVIkelKPQ+ljBY3kWBRexqJQsvyNUs1gZpjpz
YUCzuVcXDRuJXYQXGqWXhsBPfJv+ZcSyMIBUfWT/G13cWU1iwufPy0NjajowPZsC
AwEAAaNTMFEwHQYDVR0OBBYEFMgkye5+2l+TE0I6RsXRHjGBwpBGMB8GA1UdIwQY
MBaAFMgkye5+2l+TE0I6RsXRHjGBwpBGMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZI
hvcNAQELBQADggEBAIgtJW8sy5lBpzPRHkmWSS/SCZIPsABW+cHqQ3e0udrI3CLB
G9n7yqAPWOBTbdqC2GM8dvAS/Twx4Bub/lWr84dFCu+t0mQq4l5kpJMVRS0KKXPL
DwJbUN3oPNYy4uPn5Xi+XY3BYFce5vwJUsqIxeAbIOxVTNx++k5DFnB0ESAM23QL
sgUZl7xl3/DkdO4oHj30gmTRW9bjCJ6umnHIiO3JoJatrprurUIt80vHC4Ndft36
NBQ9mZpequ4RYjpSZNLcVsxyFAYwEY4g8MvH0MoMo2RRLfehmMCzXnI/Wh2qEyYz
emHprBii/5y1HieKXlX9CZRb5qEPHckDVXW3znw=
MIIDFTCCAf2gAwIBAgIBATANBgkqhkiG9w0BAQsFADA0MTIwMAYDVQQDEylFbGFz
dGljIENlcnRpZmljYXRlIFRvb2wgQXV0b2dlbmVyYXRlZCBDQTAeFw0yNDEyMjYy
MjI3MTVaFw0yNTEyMjYyMjI3MTVaMDQxMjAwBgNVBAMTKUVsYXN0aWMgQ2VydGlm
aWNhdGUgVG9vbCBBdXRvZ2VuZXJhdGVkIENBMIIBIjANBgkqhkiG9w0BAQEFAAOC
AQ8AMIIBCgKCAQEArUe66xG4Y2zO13gRC+rBwyvxe+c01pqV6ukw6isIbJIQWs1/
QfEMhUwYwKs6/UXxK+VwardcA2zYwngXbGGEtms+mpUfH5CdJnrqW7lHz1BVK4yH
90IzGE0GU4D90OW/L4QkGX0fv3VQbL8KGFKBoF04pXIaSGMStFN4wirutHtQboYv
99X4kbLjVSIuubUpA/v9dUP1TNl8ar+HKUWRM96ijHkFTF3FR0NnZyt44gP5qC0h
i4lUiR6Uo9D6WMFjeRYFF7GolCy/I1SzWBmmOnNhQLO5VxcNG4ldhBcapZeGwE98
m/5lxLIwgFR9ZP8bXdxZTWLC58/LQ2NqOjA9mwIDAQABozIwMDAPBgNVHRMBAf8E
BTADAQH/MB0GA1UdDgQWBBTIJMnuftpfkxNCOkbF0R4xgcKQRjANBgkqhkiG9w0B
AQsFAAOCAQEAhfg/cmXc4Uh90yiXU8jOW8saQjTsq4ZMDQiLfJsNmNNYmHFN0vhv
lJRI1STdy7+GpjS5QbrMjQIxWSS8X8xysE4Rt81IrWmLuao35TRFyoiE1seBQ5sz
p/BxZUe57JvWi9dyzv2df4UfWFdGBhzdr80odZmz4i5VIv6qCKJKsGikcuLpepmp
E/UKnKHeR/dFWsxzA9P2OzHTUNBMOOA2PyAUL49pwoChwJeOWN/zAgwMWLbuHFG0
IN0u8swAmeH98QdvzbhiOatGNpqfTNvQEDc19yVjfXKpBVZQ79WtronYSqrbrUa1
T2zD8bIVP7CdddD/UmpT1SSKh4PJxudy5Q==
-----END CERTIFICATE-----
2 changes: 1 addition & 1 deletion spec/fixtures/test_certs/ca.der.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
195a7e7b1bc29f3d7913a918a44721704d27fa56facea0cd72a8093c7107c283
b1e955819b0d14f64f863adb103c248ddacf2e17bea48d04ee4b57c64814ccc4
38 changes: 38 additions & 0 deletions spec/fixtures/test_certs/es.chain.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
-----BEGIN CERTIFICATE-----
MIIDIzCCAgugAwIBAgIBATANBgkqhkiG9w0BAQsFADA0MTIwMAYDVQQDEylFbGFz
dGljIENlcnRpZmljYXRlIFRvb2wgQXV0b2dlbmVyYXRlZCBDQTAeFw0yNDEyMjYy
MjI3MTVaFw0yNTEyMjYyMjI3MTVaMA0xCzAJBgNVBAMTAmVzMIIBIjANBgkqhkiG
9w0BAQEFAAOCAQ8AMIIBCgKCAQEArZLZvLSWDK7Ul+AaBnjU81dsfaow8zOjCC5V
V21nXpYzQJoQbuWcvGYxwL7ZDs2ca4Wc8BVCj1NDduHuP7U+QIlUdQpl8kh5a0Zz
36pcFw7UyF51/AzWixJrht/Azzkb5cpZtE22ZK0KhS4oCsjJmTN0EABAsGhDI9/c
MjNrUC7iP0dvfOuzAPp7ufY83h98jKKXUYV24snbbvmqoWI6GQQNSG/sEo1+1UGH
/z07/mVKoBAa5DVoNGvxN0fCE7vW7hkhT8+frJcsYFatAbnf6ql0KzEa8lN9u0gR
hQNM3zcKKsjEMomBzVBc4SV3KXO0d/jGdDtlqsm2oXqlTMdtGwIDAQABo2cwZTAY
BgNVHREEETAPgg1lbGFzdGljc2VhcmNoMAkGA1UdEwQCMAAwHQYDVR0OBBYEFFQU
K+6Cg2kExRj1xSDzEi4kkgKXMB8GA1UdIwQYMBaAFMgkye5+2l+TE0I6RsXRHjGB
wpBGMA0GCSqGSIb3DQEBCwUAA4IBAQB6cZ7IrDzcAoOZgAt9RlOe2yzQeH+alttp
CSQVINjJotS1WvmtqjBB6ArqLpXIGU89TZsktNe/NQJzgYSaMnlIuHVLFdxJYmwU
T1cP6VC/brmqP/dd5y7VWE7Lp+Wd5CxKl/WY+9chmgc+a1fW/lnPEJJ6pca1Bo8b
byIL0yY2IUv4R2eh1IyQl9oGH1GOPLgO7cY04eajxYcOVA2eDSItoyDtrJfkFP/P
UXtC1JAkvWKuujFEiBj0AannhroWlp3gvChhBwCuCAU0KXD6g8BE8tn6oT1+FW7J
avSfHxAe+VHtYhF8sJ8jrdm0d7E4GKS9UR/pkLAL1JuRdJ1VkPx3
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIDFTCCAf2gAwIBAgIBATANBgkqhkiG9w0BAQsFADA0MTIwMAYDVQQDEylFbGFz
dGljIENlcnRpZmljYXRlIFRvb2wgQXV0b2dlbmVyYXRlZCBDQTAeFw0yNDEyMjYy
MjI3MTVaFw0yNTEyMjYyMjI3MTVaMDQxMjAwBgNVBAMTKUVsYXN0aWMgQ2VydGlm
aWNhdGUgVG9vbCBBdXRvZ2VuZXJhdGVkIENBMIIBIjANBgkqhkiG9w0BAQEFAAOC
AQ8AMIIBCgKCAQEArUe66xG4Y2zO13gRC+rBwyvxe+c01pqV6ukw6isIbJIQWs1/
QfEMhUwYwKs6/UXxK+VwardcA2zYwngXbGGEtms+mpUfH5CdJnrqW7lHz1BVK4yH
90IzGE0GU4D90OW/L4QkGX0fv3VQbL8KGFKBoF04pXIaSGMStFN4wirutHtQboYv
99X4kbLjVSIuubUpA/v9dUP1TNl8ar+HKUWRM96ijHkFTF3FR0NnZyt44gP5qC0h
i4lUiR6Uo9D6WMFjeRYFF7GolCy/I1SzWBmmOnNhQLO5VxcNG4ldhBcapZeGwE98
m/5lxLIwgFR9ZP8bXdxZTWLC58/LQ2NqOjA9mwIDAQABozIwMDAPBgNVHRMBAf8E
BTADAQH/MB0GA1UdDgQWBBTIJMnuftpfkxNCOkbF0R4xgcKQRjANBgkqhkiG9w0B
AQsFAAOCAQEAhfg/cmXc4Uh90yiXU8jOW8saQjTsq4ZMDQiLfJsNmNNYmHFN0vhv
lJRI1STdy7+GpjS5QbrMjQIxWSS8X8xysE4Rt81IrWmLuao35TRFyoiE1seBQ5sz
p/BxZUe57JvWi9dyzv2df4UfWFdGBhzdr80odZmz4i5VIv6qCKJKsGikcuLpepmp
E/UKnKHeR/dFWsxzA9P2OzHTUNBMOOA2PyAUL49pwoChwJeOWN/zAgwMWLbuHFG0
IN0u8swAmeH98QdvzbhiOatGNpqfTNvQEDc19yVjfXKpBVZQ79WtronYSqrbrUa1
T2zD8bIVP7CdddD/UmpT1SSKh4PJxudy5Q==
-----END CERTIFICATE-----
35 changes: 17 additions & 18 deletions spec/fixtures/test_certs/es.crt
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIDNjCCAh6gAwIBAgIUF9wE+oqGSbm4UVn1y9gEjzyaJFswDQYJKoZIhvcNAQEL
BQAwNDEyMDAGA1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5l
cmF0ZWQgQ0EwHhcNMjEwODEyMDUxNTI3WhcNMjQwODExMDUxNTI3WjANMQswCQYD
VQQDEwJlczCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAK2S2by0lgyu
1JfgGgZ41PNXbH2qMPMzowguVVdtZ16WM0CaEG7lnLxmMcC+2Q7NnGuFnPAVQo9T
Q3bh7j+1PkCJVHUKZfJIeWtGc9+qXBcO1MhedfwM1osSa4bfwM85G+XKWbRNtmSt
CoUuKArIyZkzdBAAQLBoQyPf3DIza1Au4j9Hb3zrswD6e7n2PN4ffIyil1GFduLJ
2275qqFiOhkEDUhv7BKNftVBh/89O/5lSqAQGuQ1aDRr8TdHwhO71u4ZIU/Pn6yX
LGBWrQG53+qpdCsxGvJTfbtIEYUDTN83CirIxDKJgc1QXOEldylztHf4xnQ7ZarJ
tqF6pUzHbRsCAwEAAaNnMGUwHQYDVR0OBBYEFFQUK+6Cg2kExRj1xSDzEi4kkgKX
MB8GA1UdIwQYMBaAFMgkye5+2l+TE0I6RsXRHjGBwpBGMBgGA1UdEQQRMA+CDWVs
YXN0aWNzZWFyY2gwCQYDVR0TBAIwADANBgkqhkiG9w0BAQsFAAOCAQEAinaknZIc
7xtQNwUwa+kdET+I4lMz+TJw9vTjGKPJqe082n81ycKU5b+a/OndG90z+dTwhShW
f0oZdIe/1rDCdiRU4ceCZA4ybKrFDIbW8gOKZOx9rsgEx9XNELj4ocZTBqxjQmNE
Ho91fli5aEm0EL2vJgejh4hcfDeElQ6go9gtvAHQ57XEADQSenvt69jOICOupnS+
LSjDVhv/VLi3CAip0B+lD5fX/DVQdrJ62eRGuQYxoouE3saCO58qUUrKB39yD9KA
qRA/sVxyLogxaU+5dLfc0NJdOqSzStxQ2vdMvAWo9tZZ2UBGFrk5SdwCQe7Yv5mX
qi02i4q6meHGcw==
MIIDIzCCAgugAwIBAgIBATANBgkqhkiG9w0BAQsFADA0MTIwMAYDVQQDEylFbGFz
dGljIENlcnRpZmljYXRlIFRvb2wgQXV0b2dlbmVyYXRlZCBDQTAeFw0yNDEyMjYy
MjI3MTVaFw0yNTEyMjYyMjI3MTVaMA0xCzAJBgNVBAMTAmVzMIIBIjANBgkqhkiG
9w0BAQEFAAOCAQ8AMIIBCgKCAQEArZLZvLSWDK7Ul+AaBnjU81dsfaow8zOjCC5V
V21nXpYzQJoQbuWcvGYxwL7ZDs2ca4Wc8BVCj1NDduHuP7U+QIlUdQpl8kh5a0Zz
36pcFw7UyF51/AzWixJrht/Azzkb5cpZtE22ZK0KhS4oCsjJmTN0EABAsGhDI9/c
MjNrUC7iP0dvfOuzAPp7ufY83h98jKKXUYV24snbbvmqoWI6GQQNSG/sEo1+1UGH
/z07/mVKoBAa5DVoNGvxN0fCE7vW7hkhT8+frJcsYFatAbnf6ql0KzEa8lN9u0gR
hQNM3zcKKsjEMomBzVBc4SV3KXO0d/jGdDtlqsm2oXqlTMdtGwIDAQABo2cwZTAY
BgNVHREEETAPgg1lbGFzdGljc2VhcmNoMAkGA1UdEwQCMAAwHQYDVR0OBBYEFFQU
K+6Cg2kExRj1xSDzEi4kkgKXMB8GA1UdIwQYMBaAFMgkye5+2l+TE0I6RsXRHjGB
wpBGMA0GCSqGSIb3DQEBCwUAA4IBAQB6cZ7IrDzcAoOZgAt9RlOe2yzQeH+alttp
CSQVINjJotS1WvmtqjBB6ArqLpXIGU89TZsktNe/NQJzgYSaMnlIuHVLFdxJYmwU
T1cP6VC/brmqP/dd5y7VWE7Lp+Wd5CxKl/WY+9chmgc+a1fW/lnPEJJ6pca1Bo8b
byIL0yY2IUv4R2eh1IyQl9oGH1GOPLgO7cY04eajxYcOVA2eDSItoyDtrJfkFP/P
UXtC1JAkvWKuujFEiBj0AannhroWlp3gvChhBwCuCAU0KXD6g8BE8tn6oT1+FW7J
avSfHxAe+VHtYhF8sJ8jrdm0d7E4GKS9UR/pkLAL1JuRdJ1VkPx3
-----END CERTIFICATE-----
15 changes: 15 additions & 0 deletions spec/fixtures/test_certs/renew.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env bash

set -e
cd "$(dirname "$0")"

openssl x509 -x509toreq -in ca.crt -copy_extensions copyall -signkey ca.key -out ca.csr
openssl x509 -req -copy_extensions copyall -days 365 -in ca.csr -set_serial 0x01 -signkey ca.key -out ca.crt && rm ca.csr
openssl x509 -in ca.crt -outform der | sha256sum | awk '{print $1}' > ca.der.sha256

openssl x509 -x509toreq -in es.crt -copy_extensions copyall -signkey es.key -out es.csr
openssl x509 -req -copy_extensions copyall -days 365 -in es.csr -set_serial 0x01 -CA ca.crt -CAkey ca.key -out es.crt && rm es.csr
cat es.crt ca.crt > es.chain.crt

# output ISO8601 timestamp to file
date -Iseconds > GENERATED_AT
105 changes: 102 additions & 3 deletions spec/inputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -653,11 +653,28 @@ def synchronize_method!(object, method_name)
context 'if the `docinfo_target` exist but is not of type hash' do
let(:config) { base_config.merge 'docinfo' => true, "docinfo_target" => 'metadata_with_string' }
let(:do_register) { false }
let(:mock_queue) { double('Queue', :<< => nil) }
let(:hit) { response.dig('hits', 'hits').first }

it 'emits a tagged event with JSON-serialized event in [event][original]' do
allow(plugin).to receive(:logger).and_return(double('Logger').as_null_object)

it 'raises an exception if the `docinfo_target` exist but is not of type hash' do
expect(client).not_to receive(:clear_scroll)
plugin.register
expect { plugin.run([]) }.to raise_error(Exception, /incompatible event/)
plugin.run(mock_queue)

expect(mock_queue).to have_received(:<<) do |event|
expect(event).to be_a_kind_of LogStash::Event

expect(event.get('tags')).to include("_elasticsearch_input_failure")
expect(event.get('[event][original]')).to be_a_kind_of String
expect(JSON.load(event.get('[event][original]'))).to eq hit
end

expect(plugin.logger)
.to have_received(:warn).with(
a_string_including("Event creation error, original data now in [event][original] field"),
a_hash_including(:message => a_string_including('unable to merge docinfo fields into docinfo_target=`metadata_with_string`'),
:data => a_string_including('"_id":"C5b2xLQwTZa76jBmHIbwHQ"')))
end

end
Expand Down Expand Up @@ -1235,6 +1252,88 @@ def wait_receive_request
end
end

context '#push_hit' do
let(:config) do
{
'docinfo' => true, # include ids
'docinfo_target' => '[@metadata][docinfo]'
}
end

let(:hit) do
JSON.load(<<~EOJSON)
{
"_index" : "test_bulk_index_2",
"_type" : "_doc",
"_id" : "sHe6A3wBesqF7ydicQvG",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2021-09-20T15:02:02.557Z",
"message" : "ping",
"@version" : "17",
"sequence" : 7,
"host" : {
"name" : "maybe.local",
"ip" : "127.0.0.1"
}
}
}
EOJSON
end

let(:mock_queue) { double('queue', :<< => nil) }

it 'pushes a generated event to the queue' do
plugin.send(:push_hit, hit, mock_queue)
expect(mock_queue).to have_received(:<<) do |event|
expect(event).to be_a_kind_of LogStash::Event

# fields overriding defaults
expect(event.timestamp.to_s).to eq("2021-09-20T15:02:02.557Z")
expect(event.get('@version')).to eq("17")

# structure from hit's _source
expect(event.get('message')).to eq("ping")
expect(event.get('sequence')).to eq(7)
expect(event.get('[host][name]')).to eq("maybe.local")
expect(event.get('[host][ip]')).to eq("127.0.0.1")

# docinfo fields
expect(event.get('[@metadata][docinfo][_index]')).to eq("test_bulk_index_2")
expect(event.get('[@metadata][docinfo][_type]')).to eq("_doc")
expect(event.get('[@metadata][docinfo][_id]')).to eq("sHe6A3wBesqF7ydicQvG")
end
end

context 'when event creation fails' do
before(:each) do
allow(plugin).to receive(:logger).and_return(double('Logger').as_null_object)

allow(plugin.event_factory).to receive(:new_event).and_call_original
allow(plugin.event_factory).to receive(:new_event).with(a_hash_including hit['_source']).and_raise(RuntimeError, 'intentional')
end

it 'pushes a tagged event containing a JSON-encoded hit in [event][original]' do
plugin.send(:push_hit, hit, mock_queue)

expect(mock_queue).to have_received(:<<) do |event|
expect(event).to be_a_kind_of LogStash::Event

expect(event.get('tags')).to include("_elasticsearch_input_failure")
expect(event.get('[event][original]')).to be_a_kind_of String
expect(JSON.load(event.get('[event][original]'))).to eq hit
end

expect(plugin.logger)
.to have_received(:warn).with(
a_string_including("Event creation error, original data now in [event][original] field"),
a_hash_including(:message => a_string_including('intentional'),
:data => a_string_including('"_id":"sHe6A3wBesqF7ydicQvG"')))

end
end
end

# @note can be removed once we depends on elasticsearch gem >= 6.x
def extract_transport(client) # on 7.x client.transport is a ES::Transport::Client
client.transport.respond_to?(:transport) ? client.transport.transport : client.transport
Expand Down
2 changes: 1 addition & 1 deletion spec/inputs/integration/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
require "logstash/inputs/elasticsearch"
require_relative "../../../spec/es_helper"

describe LogStash::Inputs::Elasticsearch, :integration => true do
describe LogStash::Inputs::Elasticsearch do

SECURE_INTEGRATION = ENV['SECURE_INTEGRATION'].eql? 'true'

Expand Down