Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Migrate Hive Plugin to Service Plugins Interface #132

Draft
wants to merge 39 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f188499
Proposal: Common plugin interface for all service plugins
Oct 7, 2019
c3d4a20
Updated mocks
Nov 13, 2019
3858d9f
Merge branch 'service-plugins-api' of github.com:lyft/flyteplugins in…
EngHabu Apr 13, 2020
6a0594a
PR Comments
EngHabu Apr 14, 2020
018f5a7
wip
EngHabu Apr 15, 2020
0c3a624
Add remote plugin interface and implemention
EngHabu May 1, 2020
2b3e004
make properties config-friendly
EngHabu May 1, 2020
a652a25
wip
EngHabu May 13, 2020
77a29c4
Merge branch 'master' of github.com:lyft/flyteplugins into service-pl…
EngHabu May 13, 2020
35bb472
Rename remote.PluginContext
EngHabu May 13, 2020
106424b
generate
EngHabu May 13, 2020
7f95e25
merge parent
EngHabu May 13, 2020
be93b10
wip
EngHabu May 13, 2020
140a973
wip
EngHabu May 15, 2020
63b0d29
Merge branch 'service-plugins' of github.com:lyft/flyteplugins into s…
EngHabu May 15, 2020
a373169
adapt
EngHabu May 15, 2020
8b31f2d
Simplify ResourceMeta
EngHabu May 15, 2020
f196c01
Merge branch 'service-plugins' of github.com:lyft/flyteplugins into s…
EngHabu May 15, 2020
4f9121c
more simplification
EngHabu May 15, 2020
3c0190c
delete old code
EngHabu May 15, 2020
08c7d25
build
EngHabu May 15, 2020
f5cb12b
Merge branch 'service-plugins' of github.com:lyft/flyteplugins into s…
EngHabu May 15, 2020
f298180
typo
EngHabu May 15, 2020
1c47782
Merge branch 'service-plugins' of github.com:lyft/flyteplugins into s…
EngHabu May 15, 2020
a0f02ed
wip
EngHabu May 15, 2020
8f3f3c8
wip
EngHabu May 15, 2020
d30081e
wip
EngHabu May 15, 2020
1ead74d
wip
EngHabu May 15, 2020
c4f1a3d
wip
EngHabu May 15, 2020
0e84714
Merge branch 'service-plugins' of github.com:lyft/flyteplugins into s…
EngHabu May 15, 2020
7c0828b
wip
EngHabu May 15, 2020
4b3035c
wip
EngHabu Aug 5, 2020
9283096
Merge master
EngHabu Oct 21, 2020
d5bad5c
Rebase
EngHabu Oct 21, 2020
41f9f35
Updates
EngHabu Oct 21, 2020
1942938
Updates
EngHabu Oct 21, 2020
b31f8a9
Update go mod
EngHabu Oct 22, 2020
2827018
Merge branch 'service-plugins' of github.com:lyft/flyteplugins into s…
EngHabu Oct 22, 2020
0412dd3
cleanup
EngHabu Oct 22, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
k8s.io/api v0.17.3
k8s.io/apimachinery v0.17.3
k8s.io/client-go v11.0.1-0.20190918222721-c0e3722d5cf0+incompatible
k8s.io/utils v0.0.0-20200124190032-861946025e34 // indirect
k8s.io/utils v0.0.0-20200124190032-861946025e34
sigs.k8s.io/controller-runtime v0.5.1
)

Expand Down
206 changes: 19 additions & 187 deletions go.sum

Large diffs are not rendered by default.

52 changes: 52 additions & 0 deletions go/tasks/pluginmachinery/core/allocationstatus_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions go/tasks/pluginmachinery/core/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,23 @@ import (
"context"
)

type AllocationStatus string
//go:generate enumer -type=AllocationStatus -trimprefix=AllocationStatus

type AllocationStatus int

const (
// This is the enum returned when there's an error
AllocationUndefined AllocationStatus = "ResourceGranted"
AllocationUndefined AllocationStatus = iota

// Go for it
AllocationStatusGranted AllocationStatus = "ResourceGranted"
AllocationStatusGranted

// This means that no resources are available globally. This is the only rejection message we use right now.
AllocationStatusExhausted AllocationStatus = "ResourceExhausted"
AllocationStatusExhausted

// We're not currently using this - but this would indicate that things globally are okay, but that your
// own namespace is too busy
AllocationStatusNamespaceQuotaExceeded AllocationStatus = "NamespaceQuotaExceeded"
AllocationStatusNamespaceQuotaExceeded
)

const namespaceSeparator = ":"
Expand Down
101 changes: 101 additions & 0 deletions go/tasks/pluginmachinery/internal/remote/allocation_token.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package remote

import (
"context"
"fmt"

clock2 "k8s.io/utils/clock"

"github.com/lyft/flytestdlib/logger"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/remote"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
)

var (
clock clock2.Clock
)

func SetClockForTest(clck clock2.Clock) {
clock = clck
}

func init() {
clock = clock2.RealClock{}
}

func allocateToken(ctx context.Context, p remote.Plugin, tCtx core.TaskExecutionContext, state *State, metrics Metrics) (
newState *State, phaseInfo core.PhaseInfo, err error) {
if len(p.GetPluginProperties().ResourceQuotas) == 0 {
// No quota, return success
return &State{
AllocationTokenRequestStartTime: clock.Now(),
Phase: PhaseAllocationTokenAcquired,
}, core.PhaseInfo{}, nil
}

ns, constraints, err := p.ResourceRequirements(ctx, tCtx)
if err != nil {
logger.Errorf(ctx, "Failed to calculate resource requirements for task. Error: %v", err)
return nil, core.PhaseInfo{}, err
}

token := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, ns, token, constraints)
if err != nil {
logger.Errorf(ctx, "Failed to allocate resources for task. Error: %v", err)
return nil, core.PhaseInfo{}, err
}

switch allocationStatus {
case core.AllocationStatusGranted:
metrics.AllocationGranted.Inc(ctx)
metrics.ResourceWaitTime.Observe(float64(clock.Since(state.AllocationTokenRequestStartTime).Milliseconds()))
return &State{
AllocationTokenRequestStartTime: clock.Now(),
Phase: PhaseAllocationTokenAcquired,
}, core.PhaseInfo{}, nil
case core.AllocationStatusNamespaceQuotaExceeded:
case core.AllocationStatusExhausted:
metrics.AllocationNotGranted.Inc(ctx)
logger.Infof(ctx, "Couldn't allocate token because allocation status is [%v].", allocationStatus.String())
startTime := state.AllocationTokenRequestStartTime
if startTime.IsZero() {
startTime = clock.Now()
}

return &State{
AllocationTokenRequestStartTime: startTime,
Phase: PhaseNotStarted,
}, core.PhaseInfo{}, nil
default:
return nil, core.PhaseInfo{}, fmt.Errorf("allocation status undefined")
}

return state, core.PhaseInfo{}, nil
}

func releaseToken(ctx context.Context, p remote.Plugin, tCtx core.TaskExecutionContext, metrics Metrics) error {
if len(p.GetPluginProperties().ResourceQuotas) == 0 {
// No quota, return success
return nil
}

ns, _, err := p.ResourceRequirements(ctx, tCtx)
if err != nil {
logger.Errorf(ctx, "Failed to calculate resource requirements for task. Error: %v", err)
return err
}

token := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
err = tCtx.ResourceManager().ReleaseResource(ctx, ns, token)
if err != nil {
metrics.ResourceReleaseFailed.Inc(ctx)
logger.Errorf(ctx, "Failed to release resources for task. Error: %v", err)
return err
}

metrics.ResourceReleased.Inc(ctx)
return nil
}
149 changes: 149 additions & 0 deletions go/tasks/pluginmachinery/internal/remote/allocation_token_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package remote

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/mock"

mocks2 "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core/mocks"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/remote/mocks"

"github.com/stretchr/testify/assert"

testing2 "k8s.io/utils/clock/testing"

"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/promutils/labeled"

"github.com/go-test/deep"
"github.com/lyft/flytestdlib/promutils"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/remote"
)

func init() {
labeled.SetMetricKeys(contextutils.NamespaceKey)
}

func newPluginWithProperties(properties remote.PluginProperties) *mocks.Plugin {
m := &mocks.Plugin{}
m.OnGetPluginProperties().Return(properties)
return m
}

func Test_allocateToken(t *testing.T) {
ctx := context.Background()
metrics := newMetrics(promutils.NewTestScope())

tNow := time.Now()
clck := testing2.NewFakeClock(tNow)
SetClockForTest(clck)

tID := &mocks2.TaskExecutionID{}
tID.OnGetGeneratedName().Return("abc")

tMeta := &mocks2.TaskExecutionMetadata{}
tMeta.OnGetTaskExecutionID().Return(tID)

rm := &mocks2.ResourceManager{}
rm.OnAllocateResourceMatch(ctx, core.ResourceNamespace("ns"), "abc", mock.Anything).Return(core.AllocationStatusGranted, nil)
rm.OnAllocateResourceMatch(ctx, core.ResourceNamespace("ns"), "abc2", mock.Anything).Return(core.AllocationStatusExhausted, nil)

tCtx := &mocks2.TaskExecutionContext{}
tCtx.OnTaskExecutionMetadata().Return(tMeta)
tCtx.OnResourceManager().Return(rm)

state := &State{}

p := newPluginWithProperties(remote.PluginProperties{
ResourceQuotas: map[core.ResourceNamespace]int{
"ns": 1,
},
})

t.Run("no quota", func(t *testing.T) {
p := newPluginWithProperties(remote.PluginProperties{ResourceQuotas: nil})
gotNewState, _, err := allocateToken(ctx, p, nil, nil, metrics)
assert.NoError(t, err)
if diff := deep.Equal(gotNewState, &State{
AllocationTokenRequestStartTime: tNow,
Phase: PhaseAllocationTokenAcquired,
}); len(diff) > 0 {
t.Errorf("allocateToken() gotNewState = %v, Diff: %v", gotNewState, diff)
}
})

t.Run("Allocation Successful", func(t *testing.T) {
p.OnResourceRequirements(ctx, tCtx).Return("ns", core.ResourceConstraintsSpec{}, nil)
gotNewState, _, err := allocateToken(ctx, p, tCtx, state, metrics)
assert.NoError(t, err)
if diff := deep.Equal(gotNewState, &State{
AllocationTokenRequestStartTime: tNow,
Phase: PhaseAllocationTokenAcquired,
}); len(diff) > 0 {
t.Errorf("allocateToken() gotNewState = %v, Diff: %v", gotNewState, diff)
}
})

t.Run("Allocation Failed", func(t *testing.T) {
tID := &mocks2.TaskExecutionID{}
tID.OnGetGeneratedName().Return("abc2")

tMeta := &mocks2.TaskExecutionMetadata{}
tMeta.OnGetTaskExecutionID().Return(tID)

rm := &mocks2.ResourceManager{}
rm.OnAllocateResourceMatch(ctx, core.ResourceNamespace("ns"), "abc", mock.Anything).Return(core.AllocationStatusGranted, nil)
rm.OnAllocateResourceMatch(ctx, core.ResourceNamespace("ns"), "abc2", mock.Anything).Return(core.AllocationStatusExhausted, nil)

tCtx := &mocks2.TaskExecutionContext{}
tCtx.OnTaskExecutionMetadata().Return(tMeta)
tCtx.OnResourceManager().Return(rm)

p.OnResourceRequirements(ctx, tCtx).Return("ns", core.ResourceConstraintsSpec{}, nil)
gotNewState, _, err := allocateToken(ctx, p, tCtx, state, metrics)
assert.NoError(t, err)
if diff := deep.Equal(gotNewState, &State{
AllocationTokenRequestStartTime: tNow,
Phase: PhaseNotStarted,
}); len(diff) > 0 {
t.Errorf("allocateToken() gotNewState = %v, Diff: %v", gotNewState, diff)
}
})
}

func Test_releaseToken(t *testing.T) {
ctx := context.Background()
metrics := newMetrics(promutils.NewTestScope())

tNow := time.Now()
clck := testing2.NewFakeClock(tNow)
SetClockForTest(clck)

tID := &mocks2.TaskExecutionID{}
tID.OnGetGeneratedName().Return("abc")

tMeta := &mocks2.TaskExecutionMetadata{}
tMeta.OnGetTaskExecutionID().Return(tID)

rm := &mocks2.ResourceManager{}
rm.OnAllocateResourceMatch(ctx, core.ResourceNamespace("ns"), "abc", mock.Anything).Return(core.AllocationStatusGranted, nil)
rm.OnAllocateResourceMatch(ctx, core.ResourceNamespace("ns"), "abc2", mock.Anything).Return(core.AllocationStatusExhausted, nil)
rm.OnReleaseResource(ctx, core.ResourceNamespace("ns"), "abc").Return(nil)

tCtx := &mocks2.TaskExecutionContext{}
tCtx.OnTaskExecutionMetadata().Return(tMeta)
tCtx.OnResourceManager().Return(rm)

p := newPluginWithProperties(remote.PluginProperties{
ResourceQuotas: map[core.ResourceNamespace]int{
"ns": 1,
},
})
p.OnResourceRequirements(ctx, tCtx).Return("ns", core.ResourceConstraintsSpec{}, nil)

assert.NoError(t, releaseToken(ctx, p, tCtx, metrics))
}
Loading