Skip to content

Commit

Permalink
feat: breaking plugins to mods for easier composition (#47)
Browse files Browse the repository at this point in the history
Signed-off-by: Kush <[email protected]>
  • Loading branch information
kushsharma authored Jul 30, 2021
1 parent 4bbe355 commit 7aca79c
Show file tree
Hide file tree
Showing 84 changed files with 6,677 additions and 8,355 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
with:
skip-go-installation: true

test:
unit:
runs-on: ubuntu-latest
steps:
- name: Checkout
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Optimus helps your organization to build & manage data pipelines with ease.
- Dry run query: Before SQL query is scheduled for transformation, during
deployment query will be dry-run to make sure it passes basic sanity
checks
- Sink BigQuery tables to Kafka [using additional hook]
- Sink BigQuery tables to Kafka [using additional plugins]
- Extensibility to support Python transformation
- Git based specification management
- REST/GRPC based specification management
Expand Down
42 changes: 11 additions & 31 deletions api/handler/v1/adapter.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package v1

import (
"context"
"fmt"
"strings"
"time"

"google.golang.org/protobuf/types/known/timestamppb"

"github.com/odpf/optimus/core/tree"

"github.com/golang/protobuf/proto"
Expand All @@ -18,8 +19,7 @@ import (

// Note: all config keys will be converted to upper case automatically
type Adapter struct {
supportedTaskRepo models.TaskPluginRepository
supportedHookRepo models.HookRepo
pluginRepo models.PluginRepository
supportedDatastoreRepo models.DatastoreRepo
}

Expand Down Expand Up @@ -51,7 +51,7 @@ func (adapt *Adapter) FromJobProto(spec *pb.JobSpecification) (models.JobSpec, e
return models.JobSpec{}, err
}

execUnit, err := adapt.supportedTaskRepo.GetByName(spec.TaskName)
execUnit, err := adapt.pluginRepo.GetByName(spec.TaskName)
if err != nil {
return models.JobSpec{}, err
}
Expand Down Expand Up @@ -149,14 +149,6 @@ func prepareWindow(windowSize, windowOffset, truncateTo string) (models.JobSpecT
}

func (adapt *Adapter) ToJobProto(spec models.JobSpec) (*pb.JobSpecification, error) {
if spec.Task.Unit == nil {
return nil, errors.New("task unit cannot be nil")
}
taskSchema, err := spec.Task.Unit.GetTaskSchema(context.Background(), models.GetTaskSchemaRequest{})
if err != nil {
return nil, err
}

adaptedHook, err := adapt.ToHookProto(spec.Hooks)
if err != nil {
return nil, err
Expand All @@ -179,7 +171,7 @@ func (adapt *Adapter) ToJobProto(spec models.JobSpec) (*pb.JobSpecification, err
StartDate: spec.Schedule.StartDate.Format(models.JobDatetimeLayout),
DependsOnPast: spec.Behavior.DependsOnPast,
CatchUp: spec.Behavior.CatchUp,
TaskName: taskSchema.Name,
TaskName: spec.Task.Unit.Info().Name,
WindowSize: spec.Task.Window.SizeString(),
WindowOffset: spec.Task.Window.OffsetString(),
WindowTruncateTo: spec.Task.Window.TruncateTo,
Expand Down Expand Up @@ -325,10 +317,7 @@ func (adapt *Adapter) ToInstanceProto(spec models.InstanceSpec) (*pb.InstanceSpe
Type: pb.InstanceSpecData_Type(pb.InstanceSpecData_Type_value[strings.ToUpper(asset.Type)]),
})
}
schdAt, err := ptypes.TimestampProto(spec.ScheduledAt)
if err != nil {
return nil, err
}
schdAt := timestamppb.New(spec.ScheduledAt)
return &pb.InstanceSpec{
JobName: spec.Job.Name,
ScheduledAt: schdAt,
Expand Down Expand Up @@ -368,7 +357,7 @@ func (adapt *Adapter) FromInstanceProto(conf *pb.InstanceSpec) (models.InstanceS
func (adapt *Adapter) FromHookProto(hooksProto []*pb.JobSpecHook) ([]models.JobSpecHook, error) {
var hooks []models.JobSpecHook
for _, hook := range hooksProto {
hookUnit, err := adapt.supportedHookRepo.GetByName(hook.Name)
hookUnit, err := adapt.pluginRepo.GetByName(hook.Name)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -399,12 +388,8 @@ func (adapt *Adapter) ToHookProto(hooks []models.JobSpecHook) (protoHooks []*pb.
})
}

schema, err := hook.Unit.GetHookSchema(context.Background(), models.GetHookSchemaRequest{})
if err != nil {
return nil, err
}
protoHooks = append(protoHooks, &pb.JobSpecHook{
Name: schema.Name,
Name: hook.Unit.Info().Name,
Config: hookConfigs,
})
}
Expand Down Expand Up @@ -451,10 +436,7 @@ func (adapt *Adapter) ToReplayExecutionTreeNode(res *tree.TreeNode) (*pb.ReplayE
}
for _, run := range res.Runs.Values() {
runTime := run.(time.Time)
timestampPb, err := ptypes.TimestampProto(runTime)
if err != nil {
return nil, err
}
timestampPb := timestamppb.New(runTime)
response.Runs = append(response.Runs, timestampPb)
}
for _, dep := range res.Dependents {
Expand All @@ -467,11 +449,9 @@ func (adapt *Adapter) ToReplayExecutionTreeNode(res *tree.TreeNode) (*pb.ReplayE
return response, nil
}

func NewAdapter(supportedTaskRepo models.TaskPluginRepository,
supportedHookRepo models.HookRepo, datastoreRepo models.DatastoreRepo) *Adapter {
func NewAdapter(pluginRepo models.PluginRepository, datastoreRepo models.DatastoreRepo) *Adapter {
return &Adapter{
supportedTaskRepo: supportedTaskRepo,
supportedHookRepo: supportedHookRepo,
pluginRepo: pluginRepo,
supportedDatastoreRepo: datastoreRepo,
}
}
26 changes: 8 additions & 18 deletions api/handler/v1/adapter_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package v1_test

import (
"context"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -31,25 +30,17 @@ func TestAdapter(t *testing.T) {
assert.Equal(t, replayExecutionTreeNode.Dependents[0].JobName, "nested-job-name")
})
t.Run("should successfully parse job spec to and from proto", func(t *testing.T) {
execUnit1 := new(mock.TaskPlugin)
execUnit1.On("GetTaskSchema", context.Background(), models.GetTaskSchemaRequest{}).Return(models.GetTaskSchemaResponse{
execUnit1 := new(mock.BasePlugin)
execUnit1.On("PluginInfo").Return(&models.PluginInfoResponse{
Name: "sample-task",
}, nil)
defer execUnit1.AssertExpectations(t)

allTasksRepo := new(mock.SupportedTaskRepo)
allTasksRepo.On("GetByName", "sample-task").Return(execUnit1, nil)
defer allTasksRepo.AssertExpectations(t)

hookUnit1 := new(mock.HookPlugin)
hookUnit1.On("GetHookSchema", context.Background(), models.GetHookSchemaRequest{}).Return(models.GetHookSchemaResponse{
Name: "sample-hook",
pluginRepo := new(mock.SupportedPluginRepo)
pluginRepo.On("GetByName", "sample-task").Return(&models.Plugin{
Base: execUnit1,
}, nil)
defer hookUnit1.AssertExpectations(t)

allHookRepo := new(mock.SupportedHookRepo)
allHookRepo.On("GetByName", "sample-hook").Return(hookUnit1, nil)
defer allHookRepo.AssertExpectations(t)
adapter := v1.NewAdapter(pluginRepo, nil)

jobSpec := models.JobSpec{
Name: "test-job",
Expand All @@ -76,7 +67,7 @@ func TestAdapter(t *testing.T) {
},
},
Task: models.JobSpecTask{
Unit: execUnit1,
Unit: &models.Plugin{Base: execUnit1},
Config: models.JobSpecConfigs{
{
Name: "DO",
Expand Down Expand Up @@ -106,12 +97,11 @@ func TestAdapter(t *testing.T) {
Value: "this",
},
},
Unit: hookUnit1,
Unit: &models.Plugin{Base: execUnit1},
},
},
}

adapter := v1.NewAdapter(allTasksRepo, allHookRepo, nil)
inProto, err := adapter.ToJobProto(jobSpec)
assert.Nil(t, err)
original, err := adapter.FromJobProto(inProto)
Expand Down
14 changes: 5 additions & 9 deletions api/handler/v1/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"time"

"google.golang.org/protobuf/types/known/timestamppb"

"google.golang.org/protobuf/types/known/structpb"

"github.com/odpf/optimus/core/tree"
Expand Down Expand Up @@ -508,10 +510,7 @@ func (sv *RuntimeServiceServer) JobStatus(ctx context.Context, req *pb.JobStatus

var adaptedJobStatus []*pb.JobStatus
for _, jobStatus := range jobStatuses {
ts, err := ptypes.TimestampProto(jobStatus.ScheduledAt)
if err != nil {
return nil, status.Errorf(codes.Internal, "%s: failed to parse time for %s", err.Error(), req.GetJobName())
}
ts := timestamppb.New(jobStatus.ScheduledAt)
adaptedJobStatus = append(adaptedJobStatus, &pb.JobStatus{
State: jobStatus.State.String(),
ScheduledAt: ts,
Expand Down Expand Up @@ -574,11 +573,8 @@ func (sv *RuntimeServiceServer) GetWindow(ctx context.Context, req *pb.GetWindow
return nil, status.Error(codes.Internal, err.Error())
}

windowStart, err1 := ptypes.TimestampProto(window.GetStart(scheduledTime))
windowEnd, err2 := ptypes.TimestampProto(window.GetEnd(scheduledTime))
if err1 != nil || err2 != nil {
return nil, status.Errorf(codes.Internal, "%s: failed to convert timestamp %s", err.Error(), scheduledTime)
}
windowStart := timestamppb.New(window.GetStart(scheduledTime))
windowEnd := timestamppb.New(window.GetEnd(scheduledTime))

return &pb.GetWindowResponse{
Start: windowStart,
Expand Down
Loading

0 comments on commit 7aca79c

Please sign in to comment.