Skip to content

Commit

Permalink
Merge branch 'master' into support_dataproc_clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
blentz authored Aug 26, 2021
2 parents ef2cebb + 4fca7cc commit e762b83
Show file tree
Hide file tree
Showing 21 changed files with 350 additions and 144 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ vet:
GO111MODULE=on go vet -mod=vendor ./...

test:
GO111MODULE=on go test -mod=vendor -timeout 30s -coverprofile coverage -race
GO111MODULE=on go test -mod=vendor -timeout 30s -coverprofile coverage -race ./...

_build: build-darwin build-linux

Expand Down
58 changes: 56 additions & 2 deletions aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/service/cloudformation"
"github.com/hortonworks/cloud-haunter/utils"
"net/http"
"os"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/service/cloudformation"
"github.com/hortonworks/cloud-haunter/utils"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/autoscaling"
Expand Down Expand Up @@ -454,6 +455,19 @@ func deleteVolumes(ec2Clients map[string]ec2Client, volumes []*types.Disk) []err
if ctx.DryRun {
log.Infof("[AWS] Dry-run set, volume is not deleted: %s:%s, region: %s", vol.Name, vol.ID, region)
} else {
log.Infof("[AWS] Initiate delete volume: %s:%s", vol.Name, vol.ID)
var detachError error
if vol.State == types.InUse {
log.Infof("[AWS] Volume %s:%s is in-use, trying to detach", vol.Name, vol.ID)
if _, detachError = ec2Client.DetachVolume(&ec2.DetachVolumeInput{VolumeId: &vol.ID}); detachError == nil {
detachError = waitForVolumeUnusedState(ec2Client, vol)
}
}

if detachError != nil {
log.Infof("[AWS] Skip volume %s:%s as it can not be detached by [%s].", vol.Name, vol.ID, detachError)
continue
}
log.Infof("[AWS] Delete volume: %s:%s", vol.Name, vol.ID)
if _, err := ec2Client.DeleteVolume(&ec2.DeleteVolumeInput{VolumeId: &vol.ID}); err != nil {
errChan <- err
Expand All @@ -475,6 +489,26 @@ func deleteVolumes(ec2Clients map[string]ec2Client, volumes []*types.Disk) []err
return errs
}

func waitForVolumeUnusedState(ec2Client ec2Client, vol *types.Disk) error {
log.Infof("[AWS] Waiting for Volume %s:%s 'available' state...", vol.Name, vol.ID)
//Polling state max 10 times with 1 sec interval
var counter int = 0
d, e := getDisk(ec2Client, vol.ID)
for e == nil && d.State != types.Unused && counter < 10 {
time.Sleep(1 * time.Second)
d, e = getDisk(ec2Client, vol.ID)
counter++
}
if e != nil {
return errors.New(fmt.Sprintf("Detach verification failed: %s", e))
} else if d.State != types.Unused {
return errors.New(fmt.Sprintf("Detach verification failed, disk state is: %s", d.State))
} else {
log.Infof("[AWS] Volume %s:%s is detached so it can be deleted.", vol.Name, vol.ID)
}
return nil
}

func deleteImages(ec2Clients map[string]ec2Client, images []*types.Image) []error {
regionImages := map[string][]*types.Image{}
for _, image := range images {
Expand Down Expand Up @@ -535,6 +569,7 @@ type ec2Client interface {
DeleteVolume(input *ec2.DeleteVolumeInput) (*ec2.DeleteVolumeOutput, error)
DescribeImages(input *ec2.DescribeImagesInput) (*ec2.DescribeImagesOutput, error)
DeregisterImage(input *ec2.DeregisterImageInput) (*ec2.DeregisterImageOutput, error)
DetachVolume(input *ec2.DetachVolumeInput) (*ec2.VolumeAttachment, error)
}

type cfClient interface {
Expand Down Expand Up @@ -672,6 +707,21 @@ func getImages(ec2Clients map[string]ec2Client) ([]*types.Image, error) {
return images, nil
}

func getDisk(ec2Client ec2Client, volumeId string) (*types.Disk, error) {
result, err := ec2Client.DescribeVolumes(&ec2.DescribeVolumesInput{VolumeIds: []*string{&volumeId}})
if err != nil {
log.Errorf("[AWS] Failed to fetch the volume, err: %s", err)
return nil, err
}
log.Debugf("[AWS] Processing volumes (%d): [%s]", len(result.Volumes), result.Volumes)

if len(result.Volumes) == 0 {
return nil, errors.New(fmt.Sprintf("Volume not found with id '%s'", volumeId))
}
return newDisk(result.Volumes[0]), nil

}

func getDisks(ec2Clients map[string]ec2Client) ([]*types.Disk, error) {
diskChan := make(chan *types.Disk)
wg := sync.WaitGroup{}
Expand Down Expand Up @@ -768,6 +818,7 @@ func getAccesses(iamClient iamClient) ([]*types.Access, error) {
Name: name,
Owner: *akm.UserName,
Created: getCreated(akm.CreateDate),
Tags: types.Tags{},
})
}
}
Expand Down Expand Up @@ -1029,6 +1080,8 @@ func newDisk(volume *ec2.Volume) *types.Disk {
Created: getCreated(volume.CreateTime),
Size: *volume.Size,
Type: *volume.VolumeType,
Owner: tags[ctx.OwnerLabel],
Tags: tags,
}
}

Expand All @@ -1045,6 +1098,7 @@ func newImage(image *ec2.Image, region string) *types.Image {
CloudType: types.AWS,
Region: region,
Created: createdAt,
Tags: getEc2Tags(image.Tags),
}
}

Expand Down
4 changes: 4 additions & 0 deletions aws/aws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ func (t mockEc2Client) DeleteVolume(input *ec2.DeleteVolumeInput) (*ec2.DeleteVo
return nil, nil
}

func (t mockEc2Client) DetachVolume(input *ec2.DetachVolumeInput) (*ec2.VolumeAttachment, error) {
return nil, nil
}

func (t mockEc2Client) DeregisterImage(input *ec2.DeregisterImageInput) (*ec2.DeregisterImageOutput, error) {
t.deregisterImagesChannel <- *input.ImageId
return nil, nil
Expand Down
1 change: 1 addition & 0 deletions azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func newImage(image compute.Image) *types.Image {
Name: *image.Name,
Region: *image.Location,
CloudType: types.AZURE,
Tags: utils.ConvertTags(image.Tags),
}
}

Expand Down
4 changes: 2 additions & 2 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var DryRun = false
var Verbose = false

// IgnoreLabelDisabled is a global flag for enabling/disabling ignore label usage
var IgnoreLabelDisabled = true
var IgnoreLabelDisabled = false

// Operations contains all the available operations
var Operations = make(map[types.OpType]types.Operation)
Expand All @@ -51,4 +51,4 @@ var Dispatchers = make(map[string]types.Dispatcher)
var Actions = make(map[types.ActionType]types.Action)

// FilterConfig contains the include/exclude configurations from config file
var FilterConfig *types.FilterConfig
var FilterConfig types.IFilterConfig
175 changes: 56 additions & 119 deletions filter/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/hortonworks/cloud-haunter/types"
"github.com/hortonworks/cloud-haunter/utils"
log "github.com/sirupsen/logrus"
"reflect"
)

func filter(filterName string, items []types.CloudItem, filterType types.FilterConfigType, isNeeded func(types.CloudItem) bool) []types.CloudItem {
Expand All @@ -28,138 +29,74 @@ func filter(filterName string, items []types.CloudItem, filterType types.FilterC
return filtered
}

func isFilterMatch(filterName string, item types.CloudItem, filterType types.FilterConfigType, filterConfig *types.FilterConfig) bool {
switch item.GetItem().(type) {
case types.Instance:
inst := item.GetItem().(types.Instance)
name := item.GetName()
ignoreLabelFound := utils.IsAnyMatch(inst.Tags, ctx.IgnoreLabel)
if ignoreLabelFound {
log.Debugf("[%s] Found ignore label on item: %s, label: %s", filterName, name, ctx.IgnoreLabel)
if ctx.IgnoreLabelDisabled {
log.Debugf("[%s] Ignore label usage is disabled, continuing to apply filter on item: %s", filterName, name)
} else {
if filterType.IsInclusive() {
log.Debugf("[%s] inclusive filter applied on item: %s", filterName, name)
return false
}
log.Debugf("[%s] exclusive filter applied on item: %s", filterName, name)
return true
}
}
filtered, applied := applyFilterConfig(filterConfig, filterType, item, filterName, inst.Tags)
if applied {
return filtered
}
case types.Stack:
stack := item.GetItem().(types.Stack)
name := item.GetName()
ignoreLabelFound := utils.IsAnyMatch(stack.Tags, ctx.IgnoreLabel)
if ignoreLabelFound {
log.Debugf("[%s] Found ignore label on item: %s, label: %s", filterName, name, ctx.IgnoreLabel)
if ctx.IgnoreLabelDisabled {
log.Debugf("[%s] Ignore label usage is disabled, continuing to apply filter on item: %s", filterName, name)
} else {
if filterType.IsInclusive() {
log.Debugf("[%s] inclusive filter applied on item: %s", filterName, name)
return false
}
log.Debugf("[%s] exclusive filter applied on item: %s", filterName, name)
return true
func isFilterMatch(filterName string, item types.CloudItem, filterType types.FilterConfigType, filterConfig types.IFilterConfig) bool {
name := item.GetName()
ignoreLabelFound := utils.IsAnyMatch(item.GetTags(), ctx.IgnoreLabel)
if ignoreLabelFound {
log.Debugf("[%s] Found ignore label on item: %s, label: %s", filterName, name, ctx.IgnoreLabel)
if ctx.IgnoreLabelDisabled {
log.Debugf("[%s] Ignore label usage is disabled, continuing to apply filter on item: %s", filterName, name)
} else {
if filterType.IsInclusive() {
log.Debugf("[%s] inclusive filter applied on item: %s", filterName, name)
return false
}
log.Debugf("[%s] exclusive filter applied on item: %s", filterName, name)
return true
}
filtered, applied := applyFilterConfig(filterConfig, filterType, item, filterName, stack.Tags)
if applied {
return filtered
}
}

if filterConfig == nil {
return false
}

var filterEntityType types.FilterEntityType

switch item.GetItem().(type) {
case types.Access:
accessFilter, _ := getFilterConfigs(filterConfig, filterType)
if accessFilter != nil {
switch item.GetCloudType() {
case types.AWS:
return isNameOrOwnerMatch(filterName, item, accessFilter.Aws.Names, accessFilter.Aws.Owners)
case types.AZURE:
return isNameOrOwnerMatch(filterName, item, accessFilter.Azure.Names, accessFilter.Azure.Owners)
case types.GCP:
return isNameOrOwnerMatch(filterName, item, accessFilter.Gcp.Names, accessFilter.Gcp.Owners)
default:
log.Warnf("[%s] Cloud type not supported: %s", filterName, item.GetCloudType())
}
}
case types.Database:
database := item.GetItem().(types.Database)
name := item.GetName()
ignoreLabelFound := utils.IsAnyMatch(database.Tags, ctx.IgnoreLabel)
if ignoreLabelFound {
log.Debugf("[%s] Found ignore label on item: %s, label: %s", filterName, name, ctx.IgnoreLabel)
if ctx.IgnoreLabelDisabled {
log.Debugf("[%s] Ignore label usage is disabled, continuing to apply filter on item: %s", filterName, name)
} else {
if filterType.IsInclusive() {
log.Debugf("[%s] inclusive filter applied on item: %s", filterName, name)
return false
}
log.Debugf("[%s] exclusive filter applied on item: %s", filterName, name)
return true
}
}
filtered, applied := applyFilterConfig(filterConfig, filterType, item, filterName, database.Tags)
if applied {
return filtered
if filterType.IsInclusive() {
filterEntityType = types.IncludeAccess
} else {
filterEntityType = types.ExcludeAccess
}
case types.Disk:
filtered, applied := applyFilterConfig(filterConfig, filterType, item, filterName, types.Tags{})
if applied {
return filtered
case types.Instance, types.Stack, types.Database, types.Disk:
if filterType.IsInclusive() {
filterEntityType = types.IncludeInstance
} else {
filterEntityType = types.ExcludeInstance
}
default:
log.Warnf("Filtering is not implemented for type %s", reflect.TypeOf(item))
return false
}
return false
}

func applyFilterConfig(filterConfig *types.FilterConfig, filterType types.FilterConfigType, item types.CloudItem, filterName string, tags types.Tags) (applied, filtered bool) {
_, instanceFilter := getFilterConfigs(filterConfig, filterType)
if instanceFilter != nil {
switch item.GetCloudType() {
case types.AWS:
return isMatchWithIgnores(filterName, item, tags,
instanceFilter.Aws.Names, instanceFilter.Aws.Owners, instanceFilter.Aws.Labels), true
case types.AZURE:
return isMatchWithIgnores(filterName, item, tags,
instanceFilter.Azure.Names, instanceFilter.Azure.Owners, instanceFilter.Azure.Labels), true
case types.GCP:
return isMatchWithIgnores(filterName, item, tags,
instanceFilter.Gcp.Names, instanceFilter.Gcp.Owners, instanceFilter.Gcp.Labels), true
default:
log.Warnf("[%s] Cloud type not supported: %s", filterName, item.GetCloudType())
}
filtered, applied := false, false

if names := filterConfig.GetFilterValues(filterEntityType, item.GetCloudType(), types.Name); names != nil {
log.Debugf("[%s] filtering item %s to names [%s]", filterName, item.GetName(), names)
filtered, applied = filtered || utils.IsStartsWith(item.GetName(), names...), true
}
return false, false
}

func getFilterConfigs(filterConfig *types.FilterConfig, filterType types.FilterConfigType) (accessConfig *types.FilterAccessConfig, instanceConfig *types.FilterInstanceConfig) {
if filterConfig != nil {
if filterType.IsInclusive() {
return filterConfig.IncludeAccess, filterConfig.IncludeInstance
}
return filterConfig.ExcludeAccess, filterConfig.ExcludeInstance
if owners := filterConfig.GetFilterValues(filterEntityType, item.GetCloudType(), types.Owner); owners != nil {
log.Debugf("[%s] filtering item %s to owners [%s]", filterName, item.GetName(), owners)
filtered, applied = filtered || utils.IsStartsWith(item.GetOwner(), owners...), true
}
return nil, nil
}

func isMatchWithIgnores(filterName string, item types.CloudItem, tags map[string]string, names, owners []string, labels []string) bool {
if isNameOrOwnerMatch(filterName, item, names, owners) || utils.IsAnyStartsWith(tags, labels...) {
log.Debugf("[%s] item %s match with name/owner or tag %s", filterName, item.GetName(), labels)
return true
if labels := filterConfig.GetFilterValues(filterEntityType, item.GetCloudType(), types.Label); labels != nil {
log.Debugf("[%s] filtering item %s to labels [%s]", filterName, item.GetName(), labels)
filtered, applied = filtered || utils.IsAnyStartsWith(item.GetTags(), labels...), true
}
log.Debugf("[%s] item %s does not match with name/owner or tag %s", filterName, item.GetName(), labels)
return false
}

func isNameOrOwnerMatch(filterName string, item types.CloudItem, names, owners []string) bool {
if utils.IsStartsWith(item.GetName(), names...) || utils.IsStartsWith(item.GetOwner(), owners...) {
log.Debugf("[%s] item %s match with filter config name %s or owner %s", filterName, item.GetName(), names, owners)
return true
if applied {
if filtered {
log.Debugf("[%s] item %s matches filter", filterName, item.GetName())
} else {
log.Debugf("[%s] item %s does not match filter", filterName, item.GetName())
}
return filtered
} else {
log.Debugf("[%s] item %s could not be filtered", filterName, item.GetName())
}
log.Debugf("[%s] item %s does not match with filter config name %s or owner %s", filterName, item.GetName(), names, owners)

return false
}
4 changes: 4 additions & 0 deletions filter/ownerless.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func (o ownerless) Execute(items []types.CloudItem) []types.CloudItem {
clust := item.(*types.Cluster)
match := !utils.IsAnyMatch(clust.Tags, ctx.OwnerLabel)
log.Debugf("[OWNERLESS] Cluster: %s match: %v (%s)", clust.Name, match, clust.State)
case types.Disk:
disk := item.(*types.Disk)
match := len(disk.Owner) == 0
log.Debugf("[OWNERLESS] Disk: %s match: %v", disk.Name, match)
return match
default:
log.Fatalf("[OWNERLESS] Filter does not apply for cloud item: %s", item.GetName())
Expand Down
Loading

0 comments on commit e762b83

Please sign in to comment.