Skip to content

Commit

Permalink
Enable the plugin to assume a role
Browse files Browse the repository at this point in the history
  • Loading branch information
sihil committed Jan 2, 2019
1 parent 935d445 commit 5813e32
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 2 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ This are the properties you can configure and what are the default values:
* **default value**: `nil`
* `profile`: The AWS profile name for authentication. This ensures that the `~/.aws/credentials` AWS auth provider is used. By default this is empty and the default chain will be used.
* **required**: false
* **default value**: `""`
* **default value**: `""`
* `role_arn`: The AWS role to assume. This can be used, for example, to access a Kinesis stream in a different AWS
account. This role will be assumed after the default credentials or profile credentials are created. By default
this is empty and a role will not be assumed.
* **required**: false
* **default value**: `""`
* `initial_position_in_stream`: The value for initialPositionInStream. Accepts "TRIM_HORIZON" or "LATEST".
* **required**: false
* **default value**: `"TRIM_HORIZON"`
Expand Down
16 changes: 15 additions & 1 deletion lib/logstash/inputs/kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base
# Select AWS profile for input
config :profile, :validate => :string

# Assume a different role using STS, for example if the stream is in a different AWS account
config :role_arn, :validate => :string

# Select initial_position_in_stream. Accepts TRIM_HORIZON or LATEST
config :initial_position_in_stream, :validate => ["TRIM_HORIZON", "LATEST"], :default => "TRIM_HORIZON"

Expand Down Expand Up @@ -85,6 +88,15 @@ def register
else
creds = com.amazonaws.auth::DefaultAWSCredentialsProviderChain.new
end

# If a role ARN is set then assume the role as a new layer over the credentials already created
unless @role_arn.nil?
session_id = "worker" + worker_id
kinesis_creds = com.amazonaws.auth::STSAssumeRoleSessionCredentialsProvider.new(creds, @role_arn, session_id)
else
kinesis_creds = creds
end

initial_position_in_stream = if @initial_position_in_stream == "TRIM_HORIZON"
KCL::InitialPositionInStream::TRIM_HORIZON
else
Expand All @@ -94,7 +106,9 @@ def register
@kcl_config = KCL::KinesisClientLibConfiguration.new(
@application_name,
@kinesis_stream_name,
creds,
kinesis_creds, # credential provider for accessing the kinesis stream
creds, # credential provider for creating / accessing the dynamo table
creds, # credential provider for cloudwatch metrics
worker_id).
withInitialPositionInStream(initial_position_in_stream).
withRegionName(@region)
Expand Down
1 change: 1 addition & 0 deletions logstash-input-kinesis.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Gem::Specification.new do |spec|

spec.requirements << "jar 'com.amazonaws:amazon-kinesis-client', '1.9.2'"
spec.requirements << "jar 'com.amazonaws:aws-java-sdk-core', '1.11.414'"
spec.requirements << "jar 'com.amazonaws:aws-java-sdk-sts', '1.11.414'"

spec.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"

Expand Down
20 changes: 20 additions & 0 deletions spec/inputs/kinesis_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@
"profile" => "my-aws-profile"
}}

# Config hash to test assume role provider if role_arn is specified
let(:config_with_role_arn) {{
"application_name" => "my-processor",
"kinesis_stream_name" => "run-specs",
"codec" => codec,
"metrics" => metrics,
"checkpoint_interval_seconds" => 120,
"region" => "ap-southeast-1",
"role_arn" => "arn:aws:iam::???????????:role/my-role"
}}

# other config with LATEST as initial_position_in_stream
let(:config_with_latest) {{
"application_name" => "my-processor",
Expand Down Expand Up @@ -110,6 +121,15 @@
expect(kinesis_with_profile.kcl_config.get_kinesis_credentials_provider.getClass.to_s).to eq("com.amazonaws.auth.profile.ProfileCredentialsProvider")
end

subject!(:kinesis_with_role_arn) { LogStash::Inputs::Kinesis.new(config_with_role_arn) }

it "uses STS for accessing the kinesis stream if role_arn is specified" do
kinesis_with_role_arn.register
expect(kinesis_with_role_arn.kcl_config.get_kinesis_credentials_provider.getClass.to_s).to eq("com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider")
expect(kinesis_with_role_arn.kcl_config.get_dynamo_db_credentials_provider.getClass.to_s).to eq("com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
expect(kinesis_with_role_arn.kcl_config.get_cloud_watch_credentials_provider.getClass.to_s).to eq("com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
end

subject!(:kinesis_with_latest) { LogStash::Inputs::Kinesis.new(config_with_latest) }

it "configures the KCL" do
Expand Down

0 comments on commit 5813e32

Please sign in to comment.