Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add environment variable to specify retention days of log group. #16

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ Production Usage / Inside EC2
"logs:CreateLogStream",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
"logs:PutLogEvents"
"logs:PutLogEvents",
"logs:PutRetentionPolicy"
],
"Effect": "Allow",
"Resource": "*"
Expand All @@ -87,7 +88,7 @@ The first time a message is received from a given container, its Log Group and L

By default, each Log Stream is named after its associated container, and each stream's Log Group is the hostname of the container running Logspout. These two values can be overridden by setting the Environment variables `LOGSPOUT_GROUP` and `LOGSPOUT_STREAM` on the Logspout container, or on any individual log-producing container (container-specific values take precendence). In this way, precomputed values can be set for each container.

Furthermore, when the Log Group and Log Stream names are computed, these Envinronment-based values are passed through Go's standard [template engine][3], and provided with the following render context:
Furthermore, when the Log Group name, Log Stream name and log retention are computed, these Environment-based values are passed through Go's standard [template engine][3], and provided with the following render context:


type RenderContext struct {
Expand Down Expand Up @@ -117,7 +118,13 @@ So you may use the `{{}}` template-syntax to build complex Log Group and Log Str
LOGSPOUT_GROUP={{.Lbl "com.mycompany.loggroup"}}
LOGSPOUT_STREAM={{.Lbl "com.mycompany.logstream"}}

Complex settings like this are most easily applied to contaners by putting them into a separate "environment file", and passing its path to docker at runtime: `docker run --env-file /path/to/file [...]`
# Set the logs to only be retained for a period of time (defaults to retaining forever):
# Valid values are: 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, and 3653.
# The retention policy will only be set when a log group is created, if a log group already exists its retention
# policy will not be updated.
LOGSPOUT_CLOUDWATCH_RETENTION_DAYS={{.Labels.LOG_RETENTION_DAYS}}

Complex settings like this are most easily applied to containers by putting them into a separate "environment file", and passing its path to docker at runtime: `docker run --env-file /path/to/file [...]`


----------------
Expand Down
36 changes: 25 additions & 11 deletions cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cloudwatch
import (
"log"
"os"
"strconv"
"strings"
"time"

Expand All @@ -24,10 +25,11 @@ type CloudwatchAdapter struct {
Ec2Region string
Ec2Instance string

client *docker.Client
batcher *CloudwatchBatcher // batches up messages by log group and stream
groupnames map[string]string // maps container names to log groups
streamnames map[string]string // maps container names to log streams
client *docker.Client
batcher *CloudwatchBatcher // batches up messages by log group and stream
groupnames map[string]string // maps container names to log groups
streamnames map[string]string // maps container names to log streams
retentiondays map[string]int64 // maps log groups to retention days
}

// NewCloudwatchAdapter creates a CloudwatchAdapter for the current region.
Expand All @@ -49,13 +51,14 @@ func NewCloudwatchAdapter(route *router.Route) (router.LogAdapter, error) {
return nil, err
}
adapter := CloudwatchAdapter{
Route: route,
OsHost: hostname,
Ec2Instance: ec2info.InstanceID,
Ec2Region: ec2info.Region,
client: client,
groupnames: map[string]string{},
streamnames: map[string]string{},
Route: route,
OsHost: hostname,
Ec2Instance: ec2info.InstanceID,
Ec2Region: ec2info.Region,
client: client,
groupnames: map[string]string{},
streamnames: map[string]string{},
retentiondays: map[string]int64{},
}
adapter.batcher = NewCloudwatchBatcher(&adapter)
return &adapter, nil
Expand Down Expand Up @@ -94,6 +97,17 @@ func (a *CloudwatchAdapter) Stream(logstream chan *router.Message) {
streamName = a.renderEnvValue(`LOGSPOUT_STREAM`, &context, context.Name)
a.groupnames[m.Container.ID] = groupName // cache the group name
a.streamnames[m.Container.ID] = streamName // and the stream name

retentionDays := a.renderEnvValue(`LOGSPOUT_CLOUDWATCH_RETENTION_DAYS`, &context, "")

if (retentionDays != "") {
retentionDaysInt, err := strconv.ParseInt(retentionDays, 10, 64)
if err == nil {
a.retentiondays[groupName] = retentionDaysInt
} else {
log.Printf("cloudwatch: error parsing retention days of '%s' to a int64: %s", retentionDays, err)
}
}
}
a.batcher.Input <- CloudwatchMessage{
Message: m.Data,
Expand Down
33 changes: 27 additions & 6 deletions uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (
// CloudwatchUploader receieves CloudwatchBatches on its input channel,
// and sends them on to the AWS Cloudwatch Logs endpoint.
type CloudwatchUploader struct {
Input chan CloudwatchBatch
svc *cloudwatchlogs.CloudWatchLogs
tokens map[string]string
debugSet bool
Input chan CloudwatchBatch
adapter *CloudwatchAdapter
svc *cloudwatchlogs.CloudWatchLogs
tokens map[string]string
debugSet bool
}

func NewCloudwatchUploader(adapter *CloudwatchAdapter) *CloudwatchUploader {
Expand All @@ -38,9 +39,10 @@ func NewCloudwatchUploader(adapter *CloudwatchAdapter) *CloudwatchUploader {
region)
}
uploader := CloudwatchUploader{
Input: make(chan CloudwatchBatch),
tokens: map[string]string{},
Input: make(chan CloudwatchBatch),
tokens: map[string]string{},
debugSet: debugSet,
adapter: adapter,
svc: cloudwatchlogs.New(session.New(),
&aws.Config{Region: aws.String(region)}),
}
Expand Down Expand Up @@ -123,6 +125,13 @@ func (u *CloudwatchUploader) getSequenceToken(msg CloudwatchMessage) (*string,
if err != nil {
return nil, err
}

if retentionDays, retentionDaysConfigured := u.adapter.retentiondays[group]; retentionDaysConfigured {
err = u.createGroupRetentionPolicy(group, retentionDays)
if err != nil {
return nil, err
}
}
}
params := &cloudwatchlogs.DescribeLogStreamsInput{
LogGroupName: aws.String(group),
Expand Down Expand Up @@ -174,6 +183,18 @@ func (u *CloudwatchUploader) createGroup(group string) error {
return nil
}

func (u *CloudwatchUploader) createGroupRetentionPolicy(group string, retentionInDays int64) error {
u.log("Creating group retention policy for %s, days: %d...", group, retentionInDays)
params := &cloudwatchlogs.PutRetentionPolicyInput{
LogGroupName: aws.String(group),
RetentionInDays: aws.Int64(retentionInDays),
}
if _, err := u.svc.PutRetentionPolicy(params); err != nil {
return err
}
return nil
}

func (u *CloudwatchUploader) createStream(group, stream string) error {
u.log("Creating stream for group %s, stream %s...", group, stream)
params := &cloudwatchlogs.CreateLogStreamInput{
Expand Down