diff --git a/README.md b/README.md index fd70fd6..491e4da 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,15 @@ This are the properties you can configure and what are the default values: * **default value**: `nil` * `profile`: The AWS profile name for authentication. This ensures that the `~/.aws/credentials` AWS auth provider is used. By default this is empty and the default chain will be used. * **required**: false - * **default value**: `""` + * **default value**: `""` +* `role_arn`: The AWS role to assume. This can be used, for example, to access a Kinesis stream in a different AWS +account. This role will be assumed after the default credentials or profile credentials are created. By default +this is empty and a role will not be assumed. + * **required**: false + * **default value**: `""` +* `role_session_name`: Session name to use when assuming an IAM role. This is recorded in CloudTrail logs for example. + * **required**: false + * **default value**: `"logstash"` * `initial_position_in_stream`: The value for initialPositionInStream. Accepts "TRIM_HORIZON" or "LATEST". * **required**: false * **default value**: `"TRIM_HORIZON"` diff --git a/lib/logstash/inputs/kinesis.rb b/lib/logstash/inputs/kinesis.rb index 4d983ed..88e0dda 100644 --- a/lib/logstash/inputs/kinesis.rb +++ b/lib/logstash/inputs/kinesis.rb @@ -55,6 +55,14 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base # Select AWS profile for input config :profile, :validate => :string + # The AWS IAM Role to assume, if any. + # This is used to generate temporary credentials typically for cross-account access. + # See https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html for more information. + config :role_arn, :validate => :string + + # Session name to use when assuming an IAM role + config :role_session_name, :validate => :string, :default => "logstash" + # Select initial_position_in_stream. Accepts TRIM_HORIZON or LATEST config :initial_position_in_stream, :validate => ["TRIM_HORIZON", "LATEST"], :default => "TRIM_HORIZON" @@ -85,6 +93,14 @@ def register else creds = com.amazonaws.auth::DefaultAWSCredentialsProviderChain.new end + + # If a role ARN is set then assume the role as a new layer over the credentials already created + unless @role_arn.nil? + kinesis_creds = com.amazonaws.auth::STSAssumeRoleSessionCredentialsProvider.new(creds, @role_arn, @role_session_name) + else + kinesis_creds = creds + end + initial_position_in_stream = if @initial_position_in_stream == "TRIM_HORIZON" KCL::InitialPositionInStream::TRIM_HORIZON else @@ -94,7 +110,9 @@ def register @kcl_config = KCL::KinesisClientLibConfiguration.new( @application_name, @kinesis_stream_name, - creds, + kinesis_creds, # credential provider for accessing the kinesis stream + creds, # credential provider for creating / accessing the dynamo table + creds, # credential provider for cloudwatch metrics worker_id). withInitialPositionInStream(initial_position_in_stream). withRegionName(@region) diff --git a/logstash-input-kinesis.gemspec b/logstash-input-kinesis.gemspec index 4f6b7a6..9ccba3b 100644 --- a/logstash-input-kinesis.gemspec +++ b/logstash-input-kinesis.gemspec @@ -24,6 +24,7 @@ Gem::Specification.new do |spec| spec.requirements << "jar 'com.amazonaws:amazon-kinesis-client', '1.9.2'" spec.requirements << "jar 'com.amazonaws:aws-java-sdk-core', '1.11.414'" + spec.requirements << "jar 'com.amazonaws:aws-java-sdk-sts', '1.11.414'" spec.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" diff --git a/spec/inputs/kinesis_spec.rb b/spec/inputs/kinesis_spec.rb index 2ff497d..e873c83 100644 --- a/spec/inputs/kinesis_spec.rb +++ b/spec/inputs/kinesis_spec.rb @@ -26,6 +26,17 @@ "profile" => "my-aws-profile" }} + # Config hash to test assume role provider if role_arn is specified + let(:config_with_role_arn) {{ + "application_name" => "my-processor", + "kinesis_stream_name" => "run-specs", + "codec" => codec, + "metrics" => metrics, + "checkpoint_interval_seconds" => 120, + "region" => "ap-southeast-1", + "role_arn" => "arn:aws:iam::???????????:role/my-role" + }} + # other config with LATEST as initial_position_in_stream let(:config_with_latest) {{ "application_name" => "my-processor", @@ -110,6 +121,15 @@ expect(kinesis_with_profile.kcl_config.get_kinesis_credentials_provider.getClass.to_s).to eq("com.amazonaws.auth.profile.ProfileCredentialsProvider") end + subject!(:kinesis_with_role_arn) { LogStash::Inputs::Kinesis.new(config_with_role_arn) } + + it "uses STS for accessing the kinesis stream if role_arn is specified" do + kinesis_with_role_arn.register + expect(kinesis_with_role_arn.kcl_config.get_kinesis_credentials_provider.getClass.to_s).to eq("com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider") + expect(kinesis_with_role_arn.kcl_config.get_dynamo_db_credentials_provider.getClass.to_s).to eq("com.amazonaws.auth.DefaultAWSCredentialsProviderChain") + expect(kinesis_with_role_arn.kcl_config.get_cloud_watch_credentials_provider.getClass.to_s).to eq("com.amazonaws.auth.DefaultAWSCredentialsProviderChain") + end + subject!(:kinesis_with_latest) { LogStash::Inputs::Kinesis.new(config_with_latest) } it "configures the KCL" do