diff --git a/README.md b/README.md index a92f3f7..5835058 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,8 @@ Production Usage / Inside EC2 "logs:CreateLogStream", "logs:DescribeLogGroups", "logs:DescribeLogStreams", - "logs:PutLogEvents" + "logs:PutLogEvents", + "logs:PutRetentionPolicy" ], "Effect": "Allow", "Resource": "*" @@ -87,7 +88,7 @@ The first time a message is received from a given container, its Log Group and L By default, each Log Stream is named after its associated container, and each stream's Log Group is the hostname of the container running Logspout. These two values can be overridden by setting the Environment variables `LOGSPOUT_GROUP` and `LOGSPOUT_STREAM` on the Logspout container, or on any individual log-producing container (container-specific values take precendence). In this way, precomputed values can be set for each container. -Furthermore, when the Log Group and Log Stream names are computed, these Envinronment-based values are passed through Go's standard [template engine][3], and provided with the following render context: +Furthermore, when the Log Group name, Log Stream name and log retention are computed, these Environment-based values are passed through Go's standard [template engine][3], and provided with the following render context: type RenderContext struct { @@ -117,7 +118,13 @@ So you may use the `{{}}` template-syntax to build complex Log Group and Log Str LOGSPOUT_GROUP={{.Lbl "com.mycompany.loggroup"}} LOGSPOUT_STREAM={{.Lbl "com.mycompany.logstream"}} -Complex settings like this are most easily applied to contaners by putting them into a separate "environment file", and passing its path to docker at runtime: `docker run --env-file /path/to/file [...]` + # Set the logs to only be retained for a period of time (defaults to retaining forever): + # Valid values are: 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, and 3653. + # The retention policy will only be set when a log group is created, if a log group already exists its retention + # policy will not be updated. + LOGSPOUT_CLOUDWATCH_RETENTION_DAYS={{.Labels.LOG_RETENTION_DAYS}} + +Complex settings like this are most easily applied to containers by putting them into a separate "environment file", and passing its path to docker at runtime: `docker run --env-file /path/to/file [...]` ---------------- diff --git a/cloudwatch.go b/cloudwatch.go index 54ed3c9..01f6f81 100644 --- a/cloudwatch.go +++ b/cloudwatch.go @@ -3,6 +3,7 @@ package cloudwatch import ( "log" "os" + "strconv" "strings" "time" @@ -24,10 +25,11 @@ type CloudwatchAdapter struct { Ec2Region string Ec2Instance string - client *docker.Client - batcher *CloudwatchBatcher // batches up messages by log group and stream - groupnames map[string]string // maps container names to log groups - streamnames map[string]string // maps container names to log streams + client *docker.Client + batcher *CloudwatchBatcher // batches up messages by log group and stream + groupnames map[string]string // maps container names to log groups + streamnames map[string]string // maps container names to log streams + retentiondays map[string]int64 // maps log groups to retention days } // NewCloudwatchAdapter creates a CloudwatchAdapter for the current region. @@ -49,13 +51,14 @@ func NewCloudwatchAdapter(route *router.Route) (router.LogAdapter, error) { return nil, err } adapter := CloudwatchAdapter{ - Route: route, - OsHost: hostname, - Ec2Instance: ec2info.InstanceID, - Ec2Region: ec2info.Region, - client: client, - groupnames: map[string]string{}, - streamnames: map[string]string{}, + Route: route, + OsHost: hostname, + Ec2Instance: ec2info.InstanceID, + Ec2Region: ec2info.Region, + client: client, + groupnames: map[string]string{}, + streamnames: map[string]string{}, + retentiondays: map[string]int64{}, } adapter.batcher = NewCloudwatchBatcher(&adapter) return &adapter, nil @@ -94,6 +97,17 @@ func (a *CloudwatchAdapter) Stream(logstream chan *router.Message) { streamName = a.renderEnvValue(`LOGSPOUT_STREAM`, &context, context.Name) a.groupnames[m.Container.ID] = groupName // cache the group name a.streamnames[m.Container.ID] = streamName // and the stream name + + retentionDays := a.renderEnvValue(`LOGSPOUT_CLOUDWATCH_RETENTION_DAYS`, &context, "") + + if (retentionDays != "") { + retentionDaysInt, err := strconv.ParseInt(retentionDays, 10, 64) + if err == nil { + a.retentiondays[groupName] = retentionDaysInt + } else { + log.Printf("cloudwatch: error parsing retention days of '%s' to a int64: %s", retentionDays, err) + } + } } a.batcher.Input <- CloudwatchMessage{ Message: m.Data, diff --git a/uploader.go b/uploader.go index cfafee9..86e307e 100644 --- a/uploader.go +++ b/uploader.go @@ -15,10 +15,11 @@ import ( // CloudwatchUploader receieves CloudwatchBatches on its input channel, // and sends them on to the AWS Cloudwatch Logs endpoint. type CloudwatchUploader struct { - Input chan CloudwatchBatch - svc *cloudwatchlogs.CloudWatchLogs - tokens map[string]string - debugSet bool + Input chan CloudwatchBatch + adapter *CloudwatchAdapter + svc *cloudwatchlogs.CloudWatchLogs + tokens map[string]string + debugSet bool } func NewCloudwatchUploader(adapter *CloudwatchAdapter) *CloudwatchUploader { @@ -38,9 +39,10 @@ func NewCloudwatchUploader(adapter *CloudwatchAdapter) *CloudwatchUploader { region) } uploader := CloudwatchUploader{ - Input: make(chan CloudwatchBatch), - tokens: map[string]string{}, + Input: make(chan CloudwatchBatch), + tokens: map[string]string{}, debugSet: debugSet, + adapter: adapter, svc: cloudwatchlogs.New(session.New(), &aws.Config{Region: aws.String(region)}), } @@ -123,6 +125,13 @@ func (u *CloudwatchUploader) getSequenceToken(msg CloudwatchMessage) (*string, if err != nil { return nil, err } + + if retentionDays, retentionDaysConfigured := u.adapter.retentiondays[group]; retentionDaysConfigured { + err = u.createGroupRetentionPolicy(group, retentionDays) + if err != nil { + return nil, err + } + } } params := &cloudwatchlogs.DescribeLogStreamsInput{ LogGroupName: aws.String(group), @@ -174,6 +183,18 @@ func (u *CloudwatchUploader) createGroup(group string) error { return nil } +func (u *CloudwatchUploader) createGroupRetentionPolicy(group string, retentionInDays int64) error { + u.log("Creating group retention policy for %s, days: %d...", group, retentionInDays) + params := &cloudwatchlogs.PutRetentionPolicyInput{ + LogGroupName: aws.String(group), + RetentionInDays: aws.Int64(retentionInDays), + } + if _, err := u.svc.PutRetentionPolicy(params); err != nil { + return err + } + return nil +} + func (u *CloudwatchUploader) createStream(group, stream string) error { u.log("Creating stream for group %s, stream %s...", group, stream) params := &cloudwatchlogs.CreateLogStreamInput{