Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable the plugin to assume a role #40

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,15 @@ This are the properties you can configure and what are the default values:
* **default value**: `nil`
* `profile`: The AWS profile name for authentication. This ensures that the `~/.aws/credentials` AWS auth provider is used. By default this is empty and the default chain will be used.
* **required**: false
* **default value**: `""`
* **default value**: `""`
* `role_arn`: The AWS role to assume. This can be used, for example, to access a Kinesis stream in a different AWS
account. This role will be assumed after the default credentials or profile credentials are created. By default
this is empty and a role will not be assumed.
* **required**: false
* **default value**: `""`
* `role_session_name`: Session name to use when assuming an IAM role. This is recorded in CloudTrail logs for example.
* **required**: false
* **default value**: `"logstash"`
* `initial_position_in_stream`: The value for initialPositionInStream. Accepts "TRIM_HORIZON" or "LATEST".
* **required**: false
* **default value**: `"TRIM_HORIZON"`
Expand Down
20 changes: 19 additions & 1 deletion lib/logstash/inputs/kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base
# Select AWS profile for input
config :profile, :validate => :string

# The AWS IAM Role to assume, if any.
# This is used to generate temporary credentials typically for cross-account access.
# See https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html for more information.
config :role_arn, :validate => :string

# Session name to use when assuming an IAM role
config :role_session_name, :validate => :string, :default => "logstash"

# Select initial_position_in_stream. Accepts TRIM_HORIZON or LATEST
config :initial_position_in_stream, :validate => ["TRIM_HORIZON", "LATEST"], :default => "TRIM_HORIZON"

Expand Down Expand Up @@ -85,6 +93,14 @@ def register
else
creds = com.amazonaws.auth::DefaultAWSCredentialsProviderChain.new
end

# If a role ARN is set then assume the role as a new layer over the credentials already created
unless @role_arn.nil?
kinesis_creds = com.amazonaws.auth::STSAssumeRoleSessionCredentialsProvider.new(creds, @role_arn, @role_session_name)
else
kinesis_creds = creds
end

initial_position_in_stream = if @initial_position_in_stream == "TRIM_HORIZON"
KCL::InitialPositionInStream::TRIM_HORIZON
else
Expand All @@ -94,7 +110,9 @@ def register
@kcl_config = KCL::KinesisClientLibConfiguration.new(
@application_name,
@kinesis_stream_name,
creds,
kinesis_creds, # credential provider for accessing the kinesis stream
creds, # credential provider for creating / accessing the dynamo table
creds, # credential provider for cloudwatch metrics
worker_id).
withInitialPositionInStream(initial_position_in_stream).
withRegionName(@region)
Expand Down
1 change: 1 addition & 0 deletions logstash-input-kinesis.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Gem::Specification.new do |spec|

spec.requirements << "jar 'com.amazonaws:amazon-kinesis-client', '1.9.2'"
spec.requirements << "jar 'com.amazonaws:aws-java-sdk-core', '1.11.414'"
spec.requirements << "jar 'com.amazonaws:aws-java-sdk-sts', '1.11.414'"

spec.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"

Expand Down
20 changes: 20 additions & 0 deletions spec/inputs/kinesis_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@
"profile" => "my-aws-profile"
}}

# Config hash to test assume role provider if role_arn is specified
let(:config_with_role_arn) {{
"application_name" => "my-processor",
"kinesis_stream_name" => "run-specs",
"codec" => codec,
"metrics" => metrics,
"checkpoint_interval_seconds" => 120,
"region" => "ap-southeast-1",
"role_arn" => "arn:aws:iam::???????????:role/my-role"
}}

# other config with LATEST as initial_position_in_stream
let(:config_with_latest) {{
"application_name" => "my-processor",
Expand Down Expand Up @@ -110,6 +121,15 @@
expect(kinesis_with_profile.kcl_config.get_kinesis_credentials_provider.getClass.to_s).to eq("com.amazonaws.auth.profile.ProfileCredentialsProvider")
end

subject!(:kinesis_with_role_arn) { LogStash::Inputs::Kinesis.new(config_with_role_arn) }

it "uses STS for accessing the kinesis stream if role_arn is specified" do
kinesis_with_role_arn.register
expect(kinesis_with_role_arn.kcl_config.get_kinesis_credentials_provider.getClass.to_s).to eq("com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider")
expect(kinesis_with_role_arn.kcl_config.get_dynamo_db_credentials_provider.getClass.to_s).to eq("com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
expect(kinesis_with_role_arn.kcl_config.get_cloud_watch_credentials_provider.getClass.to_s).to eq("com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
end

subject!(:kinesis_with_latest) { LogStash::Inputs::Kinesis.new(config_with_latest) }

it "configures the KCL" do
Expand Down